1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef TBB_PREVIEW_FLOW_GRAPH_FEATURES
18     #define TBB_PREVIEW_FLOW_GRAPH_FEATURES 1
19 #endif
20 
21 #include "tbb/tbb_config.h"
22 
23 #if __TBB_PREVIEW_ASYNC_MSG
24 
25 #if _MSC_VER
26 #pragma warning (disable: 4503) // Suppress "decorated name length exceeded, name was truncated" warning
27 #endif
28 
29 #include "tbb/flow_graph.h"
30 #include "tbb/tbb_thread.h"
31 #include "tbb/concurrent_queue.h"
32 
33 #include "harness.h"
34 #include "harness_graph.h"
35 #include "harness_barrier.h"
36 
37 #include <sstream>      // std::ostringstream
38 #include <type_traits>  // std::is_base_of
39 
40 static const int USE_N = 1000;
41 static const int ACTIVITY_PAUSE_MS_NODE1 = 0;//500;
42 static const int ACTIVITY_PAUSE_MS_NODE2 = 0;//100;
43 
44 #define _TRACE_(msg) {                                                  \
45     if (Verbose) {                                                      \
46         std::ostringstream os;                                          \
47         os << "[TID=" << tbb::this_tbb_thread::get_id() << "] " << msg; \
48         REMARK("%s\n", os.str().c_str());                               \
49     }                                                                   \
50 }
51 
52 class UserAsyncActivity // Singleton
53 {
54 public:
create(const tbb::flow::async_msg<int> & msg,int timeoutMS)55     static UserAsyncActivity* create(const tbb::flow::async_msg<int>& msg, int timeoutMS) {
56         ASSERT(s_Activity == NULL, "created twice");
57         _TRACE_( "Create UserAsyncActivity" );
58         s_Activity = new UserAsyncActivity(msg, timeoutMS);
59         _TRACE_( "CREATED! UserAsyncActivity" );
60         return s_Activity;
61     }
62 
destroy()63     static void destroy() {
64         _TRACE_( "Start UserAsyncActivity::destroy()" );
65         ASSERT(s_Activity != NULL, "destroyed twice");
66         s_Activity->myThread.join();
67         delete s_Activity;
68         s_Activity = NULL;
69         _TRACE_( "End UserAsyncActivity::destroy()" );
70     }
71 
72     static int s_Result;
73 
74 private:
threadFunc(UserAsyncActivity * activity)75     static void threadFunc(UserAsyncActivity* activity) {
76         _TRACE_( "UserAsyncActivity::threadFunc" );
77 
78         Harness::Sleep(activity->myTimeoutMS);
79 
80         const int result = static_cast<int>(reinterpret_cast<size_t>(activity)) & 0xFF; // just different random results
81         s_Result = result;
82 
83         _TRACE_( "UserAsyncActivity::threadFunc - returned result " << result );
84 
85         activity->returnActivityResults(result);
86     }
87 
UserAsyncActivity(const tbb::flow::async_msg<int> & msg,int timeoutMS)88     UserAsyncActivity(const tbb::flow::async_msg<int>& msg, int timeoutMS) : myMsg(msg), myTimeoutMS(timeoutMS)
89         , myThread(threadFunc, this)
90     {
91         // Start local thread here...
92         _TRACE_( "Started AsyncActivity" );
93     }
94 
95     // Will be called from working thread
returnActivityResults(int result)96     void returnActivityResults(int result) {
97         myMsg.set(result);
98     }
99 
100 private: // DATA
101     tbb::flow::async_msg<int>   myMsg;
102     int                         myTimeoutMS;
103     tbb::tbb_thread             myThread;
104 
105     static UserAsyncActivity*   s_Activity;
106 };
107 
108 UserAsyncActivity* UserAsyncActivity::s_Activity = NULL;
109 int UserAsyncActivity::s_Result = -1;
110 
111 class UserAsyncMsg1 : public tbb::flow::async_msg<int>
112 {
113 public:
114     typedef tbb::flow::async_msg<int> base;
115 };
116 
117 struct F2_body : tbb::internal::no_assign
118 {
119     static int          s_FinalResult;
120 
121     int&                myI;
122     bool                myAlive;
123 
F2_bodyF2_body124     F2_body(int& i) : myI(i), myAlive(true) {}
125 
F2_bodyF2_body126     F2_body(const F2_body& b) : no_assign(), myI(b.myI), myAlive(true) {}
127 
~F2_bodyF2_body128     ~F2_body() {
129         myAlive = false;
130         _TRACE_( "~F2_body" );
131     }
132 
operator ()F2_body133     void operator () (int result) {
134         __TBB_ASSERT(myAlive, "dead node");
135 
136         // Handle async activity result here
137         s_FinalResult = result;
138         _TRACE_( "F2: Got async_msg result = " << result );
139     }
140 };
141 
142 // static
143 int F2_body::s_FinalResult = -2;
144 
testSimplestCase()145 static bool testSimplestCase() {
146     bool bOk = true;
147     _TRACE_( "--- SAMPLE 1 (simple case 3-in-1: F1(A<T>) ---> F2(T)) " );
148 
149     for (int i = 0; i <= 2; ++i) {
150         _TRACE_( "CASE " << i + 1 << ": data is " << (i > 0 ? "NOT " : "") << "ready in storage" << (i > 1 ? " NO WAITING in graph" : "") );
151         _TRACE_( "MAIN THREAD" );
152 
153         {
154             tbb::flow::graph g;
155             tbb::flow::function_node< tbb::flow::continue_msg, UserAsyncMsg1 > f1( g, tbb::flow::unlimited,
156                 [&]( tbb::flow::continue_msg ) -> UserAsyncMsg1 {
157                     _TRACE_( "F1: Created async_msg" );
158 
159                     UserAsyncMsg1 a;
160                     UserAsyncActivity::create(a, (i == 0 ? 0 : 1)*ACTIVITY_PAUSE_MS_NODE1);
161 
162                     Harness::Sleep(ACTIVITY_PAUSE_MS_NODE2); // let activity to finish
163                     return a;
164                 }
165             );
166 
167 
168             tbb::flow::function_node< int > f2( g, tbb::flow::unlimited,
169                 F2_body(i)
170             );
171 
172             make_edge(f1, f2);
173             f1.try_put( tbb::flow::continue_msg() );
174             g.wait_for_all();
175             UserAsyncActivity::destroy();
176             _TRACE_( "Done UserAsyncActivity::destroy" );
177             g.wait_for_all();
178             _TRACE_( "Done g.wait_for_all()" );
179         }
180 
181         _TRACE_( "--- THE END --- " );
182 
183         if (F2_body::s_FinalResult >= 0 && UserAsyncActivity::s_Result == F2_body::s_FinalResult) {
184             _TRACE_( "CASE " << i + 1 << ": " << "PASSED" );
185         }
186         else {
187             _TRACE_( "CASE " << i + 1 << ": " << "FAILED! " << UserAsyncActivity::s_Result << " != " << F2_body::s_FinalResult );
188             bOk = false;
189             ASSERT(0, "testSimplestCase failed");
190         }
191     }
192 
193     return bOk;
194 }
195 
196 // ========================================================
197 
198 class UserAsyncActivityChaining;
199 
200 class UserAsyncMsg : public tbb::flow::async_msg<int>
201 {
202 public:
203     typedef tbb::flow::async_msg<int> base;
204 
UserAsyncMsg()205     UserAsyncMsg() : base() {}
UserAsyncMsg(int value)206     UserAsyncMsg(int value) : base(value) {}
207 
208     // Notify AsyncActivity that it must return result because async calculation chain is over
209     void finalize() const __TBB_override;
210 };
211 
212 class UserAsyncActivityChaining // Singleton: task queue in worker thread
213 {
214 public:
instance()215     static UserAsyncActivityChaining* instance() {
216         if (s_Activity == NULL) {
217             s_Activity = new UserAsyncActivityChaining();
218         }
219 
220         return s_Activity;
221     }
222 
destroy()223     static void destroy() {
224         ASSERT(s_Activity != NULL, "destroyed twice");
225         s_Activity->myThread.join();
226         delete s_Activity;
227         s_Activity = NULL;
228     }
229 
finish(const UserAsyncMsg & msg)230     static void finish(const UserAsyncMsg& msg) {
231         ASSERT(UserAsyncActivityChaining::s_Activity != NULL, "activity must be alive");
232         UserAsyncActivityChaining::s_Activity->finishTaskQueue(msg);
233     }
234 
addWork(int addValue,int timeout=0)235     void addWork(int addValue, int timeout = 0) {
236         myQueue.push( MyTask(addValue, timeout) );
237     }
238 
finishTaskQueue(const UserAsyncMsg & msg)239     void finishTaskQueue(const UserAsyncMsg& msg) {
240         myMsg = msg;
241         myQueue.push( MyTask(0, 0, true) );
242     }
243 
244     static int s_Result;
245 
246 private:
247     struct MyTask
248     {
MyTaskUserAsyncActivityChaining::MyTask249         MyTask(int addValue = 0, int timeout = 0, bool finishFlag = false)
250             : myAddValue(addValue), myTimeout(timeout), myFinishFlag(finishFlag) {}
251 
252         int     myAddValue;
253         int     myTimeout;
254         bool    myFinishFlag;
255     };
256 
threadFunc(UserAsyncActivityChaining * activity)257     static void threadFunc(UserAsyncActivityChaining* activity)
258     {
259         _TRACE_( "UserAsyncActivityChaining::threadFunc" );
260 
261         for (;;)
262         {
263             // Process task queue
264             MyTask work;
265             activity->myQueue.pop(work); // Waits until it can succeed
266 
267             _TRACE_( "UserAsyncActivityChaining::threadFunc - work: add "
268                     << work.myAddValue << " (timeout = " << work.myTimeout << ")" << (work.myFinishFlag ? " FINAL" : "") );
269 
270             // 'finish flag' task is not real task, just end of queue flag
271             Harness::Sleep(work.myTimeout);
272 
273             if (work.myFinishFlag) {
274                 break;
275             }
276 
277             activity->myQueueSum += work.myAddValue;
278         }
279 
280         s_Result = activity->myQueueSum;
281         _TRACE_( "UserAsyncActivityChaining::threadFunc - returned result " << activity->myQueueSum );
282 
283         // Get result back to Flow Graph
284         activity->myMsg.set(activity->myQueueSum);
285     }
286 
UserAsyncActivityChaining()287     UserAsyncActivityChaining()
288         : myQueueSum(0)
289         , myThread(threadFunc, this)
290     {
291         // Start local thread here...
292         _TRACE_( "Started AsyncActivityChaining" );
293     }
294 
295 private: // DATA
296     tbb::concurrent_bounded_queue<MyTask>   myQueue;
297     int                                     myQueueSum;
298     UserAsyncMsg                            myMsg;
299 
300     tbb::tbb_thread                         myThread;
301 
302     static UserAsyncActivityChaining*       s_Activity;
303 };
304 
305 // static
306 UserAsyncActivityChaining* UserAsyncActivityChaining::s_Activity = NULL;
307 // static
308 int UserAsyncActivityChaining::s_Result = -4;
309 
310 // override
finalize() const311 void UserAsyncMsg::finalize() const {
312     _TRACE_( "UserAsyncMsg::finalize()" );
313     UserAsyncActivityChaining::finish(*this);
314 }
315 
316 struct F3_body : tbb::internal::no_assign
317 {
318     static int          s_FinalResult;
319 
320     int&                myI;
321     bool                myAlive;
322 
F3_bodyF3_body323     F3_body(int& _i) : myI(_i), myAlive(true) {}
324 
F3_bodyF3_body325     F3_body(const F3_body& b) : no_assign(), myI(b.myI), myAlive(true) {}
326 
~F3_bodyF3_body327     ~F3_body() {
328         myAlive = false;
329         _TRACE_( "~F3_body" );
330     }
331 
operator ()F3_body332     void operator () (int result) {
333         __TBB_ASSERT(myAlive, "dead node");
334         // Handle async activity result here
335         s_FinalResult = result;
336         _TRACE_( "F3: Got async_msg result = " << result );
337     }
338 };
339 
340 // static
341 int F3_body::s_FinalResult = -8;
342 
testChaining()343 static bool testChaining() {
344     bool bOk = true;
345     _TRACE_( "--- SAMPLE 2 (case with chaining: F1(A<T>) ---> F2(A<T>) ---> F3(T)) " );
346 
347     for (int i = 0; i <= 2; ++i) {
348         _TRACE_( "CASE " << i + 1 << ": data is " << (i > 0 ? "NOT " : "") << "ready in storage" << (i > 1 ? " NO WAITING in graph" : "") );
349         _TRACE_( "MAIN THREAD" );
350 
351         tbb::flow::graph g;
352         tbb::flow::function_node< tbb::flow::continue_msg, UserAsyncMsg > f1( g, tbb::flow::unlimited,
353             [&]( tbb::flow::continue_msg ) -> UserAsyncMsg {
354                 _TRACE_( "F1: Created UserAsyncMsg" );
355 
356                 UserAsyncMsg a;
357                 UserAsyncActivityChaining::instance()->addWork(11, (i == 0 ? 0 : 1)*ACTIVITY_PAUSE_MS_NODE1);
358 
359                 return a;
360             }
361         );
362 
363         tbb::flow::function_node< UserAsyncMsg, UserAsyncMsg > f2( g, tbb::flow::unlimited,
364             [&]( UserAsyncMsg a) -> UserAsyncMsg {
365                 _TRACE_( "F2: resend UserAsyncMsg" );
366 
367                 UserAsyncActivityChaining::instance()->addWork(22, (i == 0 ? 0 : 1)*ACTIVITY_PAUSE_MS_NODE1);
368 
369                 Harness::Sleep(ACTIVITY_PAUSE_MS_NODE2); // let activity to finish
370                 return a;
371             }
372         );
373 
374         tbb::flow::function_node< int > f3( g, tbb::flow::unlimited,
375             F3_body(i)
376         );
377 
378         make_edge(f1, f2);
379         make_edge(f2, f3);
380         f1.try_put( tbb::flow::continue_msg() );
381         g.wait_for_all();
382 
383         UserAsyncActivityChaining::destroy();
384         _TRACE_( "Done UserAsyncActivityChaining::destroy" );
385         g.wait_for_all();
386         _TRACE_( "Done g.wait_for_all()" );
387 
388         _TRACE_( "--- THE END ---" );
389 
390         if (F3_body::s_FinalResult >= 0 && UserAsyncActivityChaining::s_Result == F3_body::s_FinalResult) {
391             _TRACE_( "CASE " << i + 1 << ": " << "PASSED" );
392         }
393         else {
394             _TRACE_( "CASE " << i + 1 << ": " << "FAILED! " << UserAsyncActivityChaining::s_Result << " != " << F3_body::s_FinalResult );
395             bOk = false;
396             ASSERT(0, "testChaining failed");
397         }
398     }
399 
400     return bOk;
401 }
402 
403 // ========================================================
404 namespace testFunctionsAvailabilityNS {
405 
406 using namespace tbb::flow;
407 using tbb::flow::interface11::internal::untyped_sender;
408 using tbb::flow::interface11::internal::untyped_receiver;
409 
410 using tbb::internal::is_same_type;
411 using tbb::internal::strip;
412 using tbb::flow::interface11::internal::wrap_tuple_elements;
413 using tbb::flow::interface11::internal::async_helpers;
414 
415 class A {}; // Any type (usually called 'T')
416 struct ImpossibleType {};
417 
418 template <typename T>
419 struct UserAsync_T   : public async_msg<T> {
UserAsync_TtestFunctionsAvailabilityNS::UserAsync_T420     UserAsync_T() {}
UserAsync_TtestFunctionsAvailabilityNS::UserAsync_T421     UserAsync_T(const T& t) : async_msg<T>(t) {}
422 };
423 
424 typedef UserAsync_T<int  > UserAsync_int;
425 typedef UserAsync_T<float> UserAsync_float;
426 typedef UserAsync_T<A    > UserAsync_A;
427 
428 typedef tuple< UserAsync_A, UserAsync_float, UserAsync_int, async_msg<A>, async_msg<float>, async_msg<int>, A, float, int > TypeTuple;
429 
430 static int g_CheckerCounter = 0;
431 
432 template <typename T, typename U>
433 struct CheckerTryPut {
434     static ImpossibleType check( ... );
435 
436     template <typename C>
437     static auto check( C* p, U* q ) -> decltype(p->try_put(*q));
438 
439     static const bool value = !is_same_type<decltype(check(static_cast<T*>(0), 0)), ImpossibleType>::value;
440 };
441 
442 template <typename T1, typename T2>
443 struct CheckerMakeEdge {
444     static ImpossibleType checkMake( ... );
445     static ImpossibleType checkRemove( ... );
446 
447     template <typename N1, typename N2>
448     static auto checkMake( N1* n1, N2* n2 ) -> decltype(tbb::flow::make_edge(*n1, *n2));
449 
450     template <typename N1, typename N2>
451     static auto checkRemove( N1* n1, N2* n2 ) -> decltype(tbb::flow::remove_edge(*n1, *n2));
452 
453     static const bool valueMake   = !is_same_type<decltype(checkMake  (static_cast<T1*>(0), static_cast<T2*>(0))), ImpossibleType>::value;
454     static const bool valueRemove = !is_same_type<decltype(checkRemove(static_cast<T1*>(0), static_cast<T2*>(0))), ImpossibleType>::value;
455 
456     __TBB_STATIC_ASSERT( valueMake == valueRemove, "make_edge() availability is NOT equal to remove_edge() availability" );
457 
458     static const bool value = valueMake;
459 };
460 
461 template <typename T1, typename T2>
462 struct TypeChecker {
TypeCheckertestFunctionsAvailabilityNS::TypeChecker463      TypeChecker() {
464          ++g_CheckerCounter;
465 
466         REMARK("%d: %s -> %s: %s %s \n", g_CheckerCounter, typeid(T1).name(), typeid(T2).name(),
467             (bAllowed ? "YES" : "no"), (bConvertible ? " (Convertible)" : ""));
468      }
469 
470 //
471 // Check connection: function_node<continue_msg, SENDING_TYPE> <-> function_node<RECEIVING_TYPE>
472 //                                         R E C E I V I N G   T Y P E
473 // S     'bAllowed'    | int | float | A | async_msg | async_msg | async_msg | UserAsync | UserAsync | UserAsync |
474 // E       value       |     |       |   |   <int>   |  <float>  |    <A>    |   _int    |  _float   |   _A      |
475 // N   -------------------------------------------------------------------------------------------------------------
476 // D       int         |  Y  |       |   |     Y     |           |           |    Y      |           |           |
477 // I      float        |     |   Y   |   |           |    Y      |           |           |    Y      |           |
478 // N        A          |     |       | Y |           |           |     Y     |           |           |     Y     |
479 // G   async_msg<int>  |  Y  |       |   |     Y     |           |           |           |           |           |
480 //    async_msg<float> |     |   Y   |   |           |    Y      |           |           |           |           |
481 // T   async_msg<A>    |     |       | Y |           |           |     Y     |           |           |           |
482 // Y   UserAsync_int   |  Y  |       |   |           |           |           |    Y      |           |           |
483 // P  UserAsync_float  |     |   Y   |   |           |           |           |           |    Y      |           |
484 // E   UserAsync_A     |     |       | Y |           |           |           |           |           |    Y      |
485 //
486     // Test make_edge() & remove_edge() availability
487     static const bool bAllowed = is_same_type<T1, T2>::value
488         || is_same_type<typename async_helpers<T1>::filtered_type, T2>::value
489         || is_same_type<T1, typename async_helpers<T2>::filtered_type>::value;
490 
491     static const bool bConvertible = bAllowed
492         || std::is_base_of<T1, T2>::value
493         || (is_same_type<typename async_helpers<T1>::filtered_type, int>::value && is_same_type<T2, float>::value)
494         || (is_same_type<typename async_helpers<T1>::filtered_type, float>::value && is_same_type<T2, int>::value);
495 
496     __TBB_STATIC_ASSERT( (bAllowed == CheckerMakeEdge<function_node<continue_msg, T1>, function_node<T2> >::value), "invalid connection Fn<T1> -> Fn<T2>" );
497     __TBB_STATIC_ASSERT( (bAllowed == CheckerMakeEdge<queue_node<T1>, function_node<T2> >::value), "invalid connection Queue<T1> -> Fn<T2>" );
498 
499     // Test make_edge() & remove_edge() availability with output_port<N>(node&)
500     __TBB_STATIC_ASSERT( (bAllowed == CheckerMakeEdge<typename strip< decltype(
501         output_port<0>( *static_cast<multifunction_node< continue_msg, tuple<T1, int> >*>(0) ) ) >::type,
502         function_node<T2> >::value), "invalid connection MultuFn<0><T1,int> -> Fn<T2>" );
503 
504     __TBB_STATIC_ASSERT( (bAllowed == CheckerMakeEdge<typename strip< decltype(
505         output_port<1>( *static_cast<multifunction_node< continue_msg, tuple<int, T1> >*>(0) ) ) >::type,
506         function_node<T2> >::value), "invalid connection MultuFn<1><int, T1> -> Fn<T2>" );
507 
508     // Test untyped_sender connections
509     __TBB_STATIC_ASSERT( (true == CheckerMakeEdge< untyped_sender, function_node<T1> >::value), "cannot connect UntypedSender -> Fn<T1>" );
510     // Test untyped_receiver connections
511     __TBB_STATIC_ASSERT( (true == CheckerMakeEdge< function_node<continue_msg, T1>, untyped_receiver >::value), "cannot connect F<.., T1> -> UntypedReceiver" );
512 
513     // Test untyped_receiver->try_put(T2) availability
514     __TBB_STATIC_ASSERT( (true  == CheckerTryPut<untyped_receiver, T2>::value), "untyped_receiver cannot try_put(T2)" );
515     // Test receiver<T1>->try_put(T2) availability
516     __TBB_STATIC_ASSERT( (bConvertible == CheckerTryPut<receiver<T1>, T2>::value), "invalid availability of receiver<T1>->try_put(T2)" );
517 };
518 
519 template <typename T1>
520 struct WrappedChecker {
WrappedCheckertestFunctionsAvailabilityNS::WrappedChecker521     WrappedChecker() {} // Workaround for compilation error
522 
523     template <typename T2>
524     struct T1T2Checker : TypeChecker<T1, T2> {};
525 
526     typename wrap_tuple_elements< tuple_size<TypeTuple>::value, T1T2Checker, TypeTuple >::type a;
527 };
528 
529 typedef wrap_tuple_elements< tuple_size<TypeTuple>::value, WrappedChecker, TypeTuple >::type Checker;
530 
531 } // namespace testFunctionsAvailabilityNS
532 
testTryPut()533 static void testTryPut() {
534     {
535         tbb::flow::graph g;
536         tbb::flow::function_node< int > f(g, tbb::flow::unlimited, [&](int) {});
537 
538         ASSERT(f.try_put(5), "try_put(int) must return true");
539         ASSERT(f.try_put(7), "try_put(int) must return true");
540 
541         tbb::flow::async_msg<int> a1, a2;
542         a1.set(5);
543         ASSERT(f.try_put(a1), "try_put(async_msg) must return true");
544         ASSERT(f.try_put(a2), "try_put(async_msg) must return true");
545         a2.set(7);
546         g.wait_for_all();
547     }
548     {
549         tbb::flow::graph g;
550         typedef tbb::flow::indexer_node< int >::output_type output_type;
551         tbb::flow::indexer_node< int > i(g);
552         tbb::flow::function_node< output_type > f(g, tbb::flow::unlimited, [&](output_type) {});
553         make_edge(i, f);
554 
555         ASSERT(tbb::flow::input_port<0>(i).try_put(5), "try_put(int) must return true");
556         ASSERT(tbb::flow::input_port<0>(i).try_put(7), "try_put(int) must return true");
557 
558         tbb::flow::async_msg<int> a1(5), a2(7);
559         ASSERT(tbb::flow::input_port<0>(i).try_put(a1), "try_put(async_msg) must return true");
560         ASSERT(tbb::flow::input_port<0>(i).try_put(a2), "try_put(async_msg) must return true");
561         g.wait_for_all();
562     }
563 }
564 
TestMain()565 int TestMain() {
566     REMARK(" *** CHECKING FUNCTIONS: make_edge/remove_edge(node<.., T1>, node<T2>) & node<T1>->try_put(T2) ***\n");
567     testFunctionsAvailabilityNS::Checker a;
568     const int typeTupleSize = tbb::flow::tuple_size<testFunctionsAvailabilityNS::TypeTuple>::value;
569     ASSERT(testFunctionsAvailabilityNS::g_CheckerCounter == typeTupleSize*typeTupleSize, "Type checker counter value is incorrect");
570 
571     testTryPut();
572 
573     // NOTE: Use '-v' command line argument to get traces & remarks
574     tbb::task_scheduler_init init(4);
575     bool bOk = true;
576 
577     for (int i = 0; i < USE_N; ++i) {
578         if (i > 0 && i%1000 == 0) {
579             REPORT(" *** Starting TEST %d... ***\n", i);
580         }
581 
582         REMARK(" *** TEST %d ***\n", i);
583         bOk = bOk && testSimplestCase();
584         bOk = bOk && testChaining();
585     }
586 
587     _TRACE_( " *** " << USE_N << " tests: " << (bOk ? "all tests passed" : "TESTS FAILED !!!") << " ***" );
588     return (bOk ? Harness::Done : Harness::Unknown);
589 }
590 
591 #else // __TBB_PREVIEW_ASYNC_MSG
592 
593 #include "harness.h"
594 
TestMain()595 int TestMain() {
596     return Harness::Skipped;
597 }
598 
599 #endif // __TBB_PREVIEW_ASYNC_MSG
600