1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #define HARNESS_DEFAULT_MIN_THREADS 2
18 #define HARNESS_DEFAULT_MAX_THREADS 4
19 
20 #include "harness.h"
21 
22 #if __TBB_TASK_GROUP_CONTEXT
23 
24 #include <limits.h> // for INT_MAX
25 #include "tbb/task_scheduler_init.h"
26 #include "tbb/tbb_exception.h"
27 #include "tbb/task.h"
28 #include "tbb/atomic.h"
29 #include "tbb/parallel_for.h"
30 #include "tbb/parallel_reduce.h"
31 #include "tbb/parallel_do.h"
32 #include "tbb/pipeline.h"
33 #include "tbb/parallel_scan.h"
34 #include "tbb/blocked_range.h"
35 #include "harness_assert.h"
36 
37 #define FLAT_RANGE  100000
38 #define FLAT_GRAIN  100
39 #define OUTER_RANGE  100
40 #define OUTER_GRAIN  10
41 #define INNER_RANGE  (FLAT_RANGE / OUTER_RANGE)
42 #define INNER_GRAIN  (FLAT_GRAIN / OUTER_GRAIN)
43 
44 tbb::atomic<intptr_t> g_FedTasksCount; // number of tasks added by parallel_do feeder
45 tbb::atomic<intptr_t> g_OuterParCalls;  // number of actual invocations of the outer construct executed.
46 tbb::atomic<intptr_t> g_TGCCancelled;  // Number of times a task sees its group cancelled at start
47 
Existed()48 inline intptr_t Existed () { return INT_MAX; }
49 
50 #include "harness_eh.h"
51 /********************************
52       Variables in test
53 
54 __ Test control variables
55       g_ExceptionInMaster -- only the master thread is allowed to throw.  If false, the master cannot throw
56       g_SolitaryException -- only one throw may be executed.
57 
58 -- controls for ThrowTestException for pipeline tests
59       g_NestedPipelines -- are inner pipelines being run?
60       g_PipelinesStarted -- how many pipelines have run their first filter at least once.
61 
62 -- Information variables
63 
64    g_Master -- Thread ID of the "master" thread
65       In pipelines sometimes the master thread does not participate, so the tests have to be resilient to this.
66 
67 -- Measurement variables
68 
69    g_OuterParCalls -- how many outer parallel ranges or filters started
70    g_TGCCancelled --  how many inner parallel ranges or filters saw task::self().is_cancelled()
71    g_ExceptionsThrown -- number of throws executed (counted in ThrowTestException)
72    g_MasterExecutedThrow -- number of times master thread actually executed a throw
73    g_NonMasterExecutedThrow -- number of times non-master thread actually executed a throw
74    g_ExceptionCaught -- one of PropagatedException or unknown exception was caught.  (Other exceptions cause assertions.)
75 
76    --  Tallies for the task bodies which have executed (counted in each inner body, sampled in ThrowTestException)
77       g_CurExecuted -- total number of inner ranges or filters which executed
78       g_ExecutedAtLastCatch -- value of g_CurExecuted when last catch was made, 0 if none.
79       g_ExecutedAtFirstCatch -- value of g_CurExecuted when first catch is made, 0 if none.
80   *********************************/
81 
ResetGlobals(bool throwException=true,bool flog=false)82 inline void ResetGlobals (  bool throwException = true, bool flog = false ) {
83     ResetEhGlobals( throwException, flog );
84     g_FedTasksCount = 0;
85     g_OuterParCalls = 0;
86     g_NestedPipelines = false;
87     g_TGCCancelled = 0;
88 }
89 
90 ////////////////////////////////////////////////////////////////////////////////
91 // Tests for tbb::parallel_for and tbb::parallel_reduce
92 
93 typedef size_t count_type;
94 typedef tbb::blocked_range<count_type> range_type;
95 
CountSubranges(range_type r)96 inline intptr_t CountSubranges(range_type r) {
97     if(!r.is_divisible()) return intptr_t(1);
98     range_type r2(r,tbb::split());
99     return CountSubranges(r) + CountSubranges(r2);
100 }
101 
NumSubranges(intptr_t length,intptr_t grain)102 inline intptr_t NumSubranges ( intptr_t length, intptr_t grain ) {
103     return CountSubranges(range_type(0,length,grain));
104 }
105 
106 template<class Body>
TestNumSubrangesCalculation(intptr_t length,intptr_t grain,intptr_t inner_length,intptr_t inner_grain)107 intptr_t TestNumSubrangesCalculation ( intptr_t length, intptr_t grain, intptr_t inner_length, intptr_t inner_grain ) {
108     ResetGlobals();
109     g_ThrowException = false;
110     intptr_t outerCalls = NumSubranges(length, grain),
111              innerCalls = NumSubranges(inner_length, inner_grain),
112              maxExecuted = outerCalls * (innerCalls + 1);
113     tbb::parallel_for( range_type(0, length, grain), Body() );
114     ASSERT (g_CurExecuted == maxExecuted, "Wrong estimation of bodies invocation count");
115     return maxExecuted;
116 }
117 
118 class NoThrowParForBody {
119 public:
operator ()(const range_type & r) const120     void operator()( const range_type& r ) const {
121         volatile count_type x = 0;
122         if(g_Master == Harness::CurrentTid()) g_MasterExecuted = true;
123         else g_NonMasterExecuted = true;
124         if( tbb::task::self().is_cancelled() ) ++g_TGCCancelled;
125         count_type end = r.end();
126         for( count_type i=r.begin(); i<end; ++i )
127             x += i;
128     }
129 };
130 
131 #if TBB_USE_EXCEPTIONS
132 
Test0()133 void Test0 () {
134     ResetGlobals();
135     tbb::simple_partitioner p;
136     for( size_t i=0; i<10; ++i ) {
137         tbb::parallel_for( range_type(0, 0, 1), NoThrowParForBody() );
138         tbb::parallel_for( range_type(0, 0, 1), NoThrowParForBody(), p );
139         tbb::parallel_for( range_type(0, 128, 8), NoThrowParForBody() );
140         tbb::parallel_for( range_type(0, 128, 8), NoThrowParForBody(), p );
141     }
142 } // void Test0 ()
143 
144 //! Template that creates a functor suitable for parallel_reduce from a functor for parallel_for.
145 template<typename ParForBody>
146 class SimpleParReduceBody: NoAssign {
147     ParForBody m_Body;
148 public:
operator ()(const range_type & r) const149     void operator()( const range_type& r ) const { m_Body(r); }
SimpleParReduceBody()150     SimpleParReduceBody() {}
SimpleParReduceBody(SimpleParReduceBody & left,tbb::split)151     SimpleParReduceBody( SimpleParReduceBody& left, tbb::split ) : m_Body(left.m_Body) {}
join(SimpleParReduceBody &)152     void join( SimpleParReduceBody& /*right*/ ) {}
153 }; // SimpleParReduceBody
154 
155 //! Test parallel_for and parallel_reduce for a given partitioner.
156 /** The Body need only be suitable for a parallel_for. */
157 template<typename ParForBody, typename Partitioner>
TestParallelLoopAux()158 void TestParallelLoopAux() {
159     Partitioner partitioner;
160     for( int i=0; i<2; ++i ) {
161         ResetGlobals();
162         TRY();
163             if( i==0 )
164                 tbb::parallel_for( range_type(0, FLAT_RANGE, FLAT_GRAIN), ParForBody(), partitioner );
165             else {
166                 SimpleParReduceBody<ParForBody> rb;
167                 tbb::parallel_reduce( range_type(0, FLAT_RANGE, FLAT_GRAIN), rb, partitioner );
168             }
169         CATCH_AND_ASSERT();
170         // two cases: g_SolitaryException and !g_SolitaryException
171         //   1) g_SolitaryException: only one thread actually threw.  There is only one context, so the exception
172         //      (when caught) will cause that context to be cancelled.  After this event, there may be one or
173         //      more threads which are "in-flight", up to g_NumThreads, but no more will be started.  The threads,
174         //      when they start, if they see they are cancelled, TGCCancelled is incremented.
175         //   2) !g_SolitaryException: more than one thread can throw.  The number of threads that actually
176         //      threw is g_MasterExecutedThrow if only the master is allowed, else g_NonMasterExecutedThrow.
177         //      Only one context, so TGCCancelled should be <= g_NumThreads.
178         //
179         // the reasoning is similar for nested algorithms in a single context (Test2).
180         //
181         // If a thread throws in a context, more than one subsequent task body may see the
182         // cancelled state (if they are scheduled before the state is propagated.) this is
183         // infrequent, but it occurs.  So what was to be an assertion must be a remark.
184         ASSERT( g_TGCCancelled <= g_NumThreads, "Too many tasks ran after exception thrown");
185         if( g_TGCCancelled > g_NumThreads) REMARK( "Too many tasks ran after exception thrown (%d vs. %d)\n",
186                 (int)g_TGCCancelled, (int)g_NumThreads);
187         ASSERT(g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
188         if ( g_SolitaryException ) {
189             ASSERT(g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test");
190             ASSERT(g_NumExceptionsCaught == (g_ExceptionInMaster ? g_MasterExecutedThrow : g_NonMasterExecutedThrow),
191                 "Not all throws were caught");
192             ASSERT(g_ExecutedAtFirstCatch == g_ExecutedAtLastCatch, "Too many exceptions occurred");
193         }
194         else {
195             ASSERT(g_NumExceptionsCaught >= 1, "No try blocks in any body expected in this test");
196         }
197     }
198 }  // TestParallelLoopAux
199 
200 //! Test with parallel_for and parallel_reduce, over all three kinds of partitioners.
201 /** The Body only needs to be suitable for tbb::parallel_for. */
202 template<typename Body>
TestParallelLoop()203 void TestParallelLoop() {
204     // The simple and auto partitioners should be const, but not the affinity partitioner.
205     TestParallelLoopAux<Body, const tbb::simple_partitioner  >();
206     TestParallelLoopAux<Body, const tbb::auto_partitioner    >();
207 #define __TBB_TEMPORARILY_DISABLED 1
208 #if !__TBB_TEMPORARILY_DISABLED
209     // TODO: Improve the test so that it tolerates delayed start of tasks with affinity_partitioner
210     TestParallelLoopAux<Body, /***/ tbb::affinity_partitioner>();
211 #endif
212 #undef __TBB_TEMPORARILY_DISABLED
213 }
214 
215 class SimpleParForBody: NoAssign {
216 public:
operator ()(const range_type & r) const217     void operator()( const range_type& r ) const {
218         Harness::ConcurrencyTracker ct;
219         volatile long x = 0;
220         ++g_CurExecuted;
221         if(g_Master == Harness::CurrentTid()) g_MasterExecuted = true;
222         else g_NonMasterExecuted = true;
223         if( tbb::task::self().is_cancelled() ) ++g_TGCCancelled;
224         for( count_type i = r.begin(); i != r.end(); ++i )
225             x += 0;
226         WaitUntilConcurrencyPeaks();
227         ThrowTestException(1);
228     }
229 };
230 
Test1()231 void Test1() {
232     // non-nested parallel_for/reduce with throwing body, one context
233     TestParallelLoop<SimpleParForBody>();
234 } // void Test1 ()
235 
236 class OuterParForBody: NoAssign {
237 public:
operator ()(const range_type &) const238     void operator()( const range_type& ) const {
239         Harness::ConcurrencyTracker ct;
240         ++g_OuterParCalls;
241         tbb::parallel_for( tbb::blocked_range<size_t>(0, INNER_RANGE, INNER_GRAIN), SimpleParForBody() );
242     }
243 };
244 
245 //! Uses parallel_for body containing an inner parallel_for with the default context not wrapped by a try-block.
246 /** Inner algorithms are spawned inside the new bound context by default. Since
247     exceptions thrown from the inner parallel_for are not handled by the caller
248     (outer parallel_for body) in this test, they will cancel all the sibling inner
249     algorithms. **/
Test2()250 void Test2 () {
251     TestParallelLoop<OuterParForBody>();
252 } // void Test2 ()
253 
254 class OuterParForBodyWithIsolatedCtx {
255 public:
operator ()(const range_type &) const256     void operator()( const range_type& ) const {
257         tbb::task_group_context ctx(tbb::task_group_context::isolated);
258         ++g_OuterParCalls;
259         tbb::parallel_for( tbb::blocked_range<size_t>(0, INNER_RANGE, INNER_GRAIN), SimpleParForBody(), tbb::simple_partitioner(), ctx );
260     }
261 };
262 
263 //! Uses parallel_for body invoking an inner parallel_for with an isolated context without a try-block.
264 /** Even though exceptions thrown from the inner parallel_for are not handled
265     by the caller in this test, they will not affect sibling inner algorithms
266     already running because of the isolated contexts. However because the first
267     exception cancels the root parallel_for only the first g_NumThreads subranges
268     will be processed (which launch inner parallel_fors) **/
Test3()269 void Test3 () {
270     ResetGlobals();
271     typedef OuterParForBodyWithIsolatedCtx body_type;
272     intptr_t  innerCalls = NumSubranges(INNER_RANGE, INNER_GRAIN),
273             // we expect one thread to throw without counting, the rest to run to completion
274             // this formula assumes g_numThreads outer pfor ranges will be started, but that is not the
275             // case; the SimpleParFor subranges are started up as part of the outer ones, and when
276             // the amount of concurrency reaches g_NumThreads no more outer Pfor ranges are started.
277             // so we have to count the number of outer Pfors actually started.
278             minExecuted = (g_NumThreads - 1) * innerCalls;
279     TRY();
280         tbb::parallel_for( range_type(0, OUTER_RANGE, OUTER_GRAIN), body_type() );
281     CATCH_AND_ASSERT();
282     minExecuted = (g_OuterParCalls - 1) * innerCalls;  // see above
283 
284     // The first formula above assumes all ranges of the outer parallel for are executed, and one
285     // cancels.  In the event, we have a smaller number of ranges that start before the exception
286     // is caught.
287     //
288     //  g_SolitaryException:One inner range throws.  Outer parallel_For is cancelled, but sibling
289     //                      parallel_fors continue to completion (unless the threads that execute
290     //                      are not allowed to throw, in which case we will not see any exceptions).
291     // !g_SolitaryException:multiple inner ranges may throw.  Any which throws will stop, and the
292     //                      corresponding range of the outer pfor will stop also.
293     //
294     // In either case, once the outer pfor gets the exception it will stop executing further ranges.
295 
296     // if the only threads executing were not allowed to throw, then not seeing an exception is okay.
297     bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecuted) || (!g_ExceptionInMaster && !g_NonMasterExecuted);
298     if ( g_SolitaryException ) {
299         ASSERT( g_TGCCancelled <= g_NumThreads, "Too many tasks survived exception");
300         ASSERT (g_CurExecuted > minExecuted, "Too few tasks survived exception");
301         ASSERT (g_CurExecuted <= minExecuted + (g_ExecutedAtLastCatch + g_NumThreads), "Too many tasks survived exception");
302         ASSERT (g_NumExceptionsCaught == 1 || okayNoExceptionsCaught, "No try_blocks in any body expected in this test");
303     }
304     else {
305         ASSERT (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
306         ASSERT (g_NumExceptionsCaught >= 1 || okayNoExceptionsCaught, "No try_blocks in any body expected in this test");
307     }
308 } // void Test3 ()
309 
310 class OuterParForExceptionSafeBody {
311 public:
operator ()(const range_type &) const312     void operator()( const range_type& ) const {
313         tbb::task_group_context ctx(tbb::task_group_context::isolated);
314         ++g_OuterParCalls;
315         TRY();
316             tbb::parallel_for( tbb::blocked_range<size_t>(0, INNER_RANGE, INNER_GRAIN), SimpleParForBody(), tbb::simple_partitioner(), ctx );
317         CATCH();  // this macro sets g_ExceptionCaught
318     }
319 };
320 
321 //! Uses parallel_for body invoking an inner parallel_for (with isolated context) inside a try-block.
322 /** Since exception(s) thrown from the inner parallel_for are handled by the caller
323     in this test, they do not affect neither other tasks of the the root parallel_for
324     nor sibling inner algorithms. **/
Test4()325 void Test4 () {
326     ResetGlobals( true, true );
327     intptr_t  innerCalls = NumSubranges(INNER_RANGE, INNER_GRAIN),
328             outerCalls = NumSubranges(OUTER_RANGE, OUTER_GRAIN);
329     TRY();
330         tbb::parallel_for( range_type(0, OUTER_RANGE, OUTER_GRAIN), OuterParForExceptionSafeBody() );
331     CATCH();
332     // g_SolitaryException  : one inner pfor will throw, the rest will execute to completion.
333     //                        so the count should be (outerCalls -1) * innerCalls, if a throw happened.
334     // !g_SolitaryException : possible multiple inner pfor throws.  Should be approximately
335     //                        (outerCalls - g_NumExceptionsCaught) * innerCalls, give or take a few
336     intptr_t  minExecuted = (outerCalls - g_NumExceptionsCaught) * innerCalls;
337     bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecuted) || (!g_ExceptionInMaster && !g_NonMasterExecuted);
338     if ( g_SolitaryException ) {
339         // only one task had exception thrown. That task had at least one execution (the one that threw).
340         // There may be an arbitrary number of ranges executed after the throw but before the exception
341         // is caught in the scheduler and cancellation is signaled.  (seen 9, 11 and 62 (!) for 8 threads)
342         ASSERT (g_NumExceptionsCaught == 1 || okayNoExceptionsCaught, "No exception registered");
343         ASSERT (g_CurExecuted >= minExecuted, "Too few tasks executed");
344         ASSERT( g_TGCCancelled <= g_NumThreads, "Too many tasks survived exception");
345         // a small number of threads can execute in a throwing sub-pfor, if the task which is
346         // to do the solitary throw swaps out after registering its intent to throw but before it
347         // actually does so.  (Or is this caused by the extra threads participating? No, the
348         // number of extra tasks is sometimes far greater than the number of extra threads.)
349         ASSERT (g_CurExecuted <= minExecuted + g_NumThreads, "Too many tasks survived exception");
350         if(g_CurExecuted > minExecuted + g_NumThreads) REMARK("Unusual number of tasks executed after signal (%d vs. %d)\n",
351                 (int)g_CurExecuted, minExecuted + g_NumThreads);
352     }
353     else {
354         ASSERT ((g_NumExceptionsCaught >= 1 && g_NumExceptionsCaught <= outerCalls) || okayNoExceptionsCaught, "Unexpected actual number of exceptions");
355         ASSERT (g_CurExecuted >= minExecuted, "Too few executed tasks reported");
356         ASSERT (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived multiple exceptions");
357         if(g_CurExecuted > g_ExecutedAtLastCatch + g_NumThreads) REMARK("Unusual number of tasks executed after signal (%d vs. %d)\n",
358                 (int)g_CurExecuted, g_ExecutedAtLastCatch + g_NumThreads);
359         ASSERT (g_CurExecuted <= outerCalls * (1 + g_NumThreads), "Too many tasks survived exception");
360     }
361 } // void Test4 ()
362 
363 #endif /* TBB_USE_EXCEPTIONS */
364 
365 class ParForBodyToCancel {
366 public:
operator ()(const range_type &) const367     void operator()( const range_type& ) const {
368         ++g_CurExecuted;
369         CancellatorTask::WaitUntilReady();
370     }
371 };
372 
373 template<class B>
374 class ParForLauncherTask : public tbb::task {
375     tbb::task_group_context &my_ctx;
376 
execute()377     tbb::task* execute () __TBB_override {
378         tbb::parallel_for( range_type(0, FLAT_RANGE, FLAT_GRAIN), B(), tbb::simple_partitioner(), my_ctx );
379         return NULL;
380     }
381 public:
ParForLauncherTask(tbb::task_group_context & ctx)382     ParForLauncherTask ( tbb::task_group_context& ctx ) : my_ctx(ctx) {}
383 };
384 
385 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm).
TestCancelation1()386 void TestCancelation1 () {
387     ResetGlobals( false );
388     RunCancellationTest<ParForLauncherTask<ParForBodyToCancel>, CancellatorTask>( NumSubranges(FLAT_RANGE, FLAT_GRAIN) / 4 );
389 }
390 
391 class CancellatorTask2 : public tbb::task {
392     tbb::task_group_context &m_GroupToCancel;
393 
execute()394     tbb::task* execute () __TBB_override {
395         Harness::ConcurrencyTracker ct;
396         WaitUntilConcurrencyPeaks();
397         m_GroupToCancel.cancel_group_execution();
398         g_ExecutedAtLastCatch = g_CurExecuted;
399         return NULL;
400     }
401 public:
CancellatorTask2(tbb::task_group_context & ctx,intptr_t)402     CancellatorTask2 ( tbb::task_group_context& ctx, intptr_t ) : m_GroupToCancel(ctx) {}
403 };
404 
405 class ParForBodyToCancel2 {
406 public:
operator ()(const range_type &) const407     void operator()( const range_type& ) const {
408         ++g_CurExecuted;
409         Harness::ConcurrencyTracker ct;
410         // The test will hang (and be timed out by the test system) if is_cancelled() is broken
411         while( !tbb::task::self().is_cancelled() )
412             __TBB_Yield();
413     }
414 };
415 
416 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm).
417 /** This version also tests task::is_cancelled() method. **/
TestCancelation2()418 void TestCancelation2 () {
419     ResetGlobals();
420     RunCancellationTest<ParForLauncherTask<ParForBodyToCancel2>, CancellatorTask2>();
421     ASSERT (g_ExecutedAtLastCatch < g_NumThreads, "Somehow worker tasks started their execution before the cancellator task");
422     ASSERT( g_TGCCancelled <= g_NumThreads, "Too many tasks survived cancellation");
423     ASSERT (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Some tasks were executed after cancellation");
424 }
425 
426 ////////////////////////////////////////////////////////////////////////////////
427 // Regression test based on the contribution by the author of the following forum post:
428 // http://softwarecommunity.intel.com/isn/Community/en-US/forums/thread/30254959.aspx
429 
430 class Worker {
431     static const int max_nesting = 3;
432     static const int reduce_range = 1024;
433     static const int reduce_grain = 256;
434 public:
435     int DoWork (int level);
Validate(int start_level)436     int Validate (int start_level) {
437         int expected = 1; // identity for multiplication
438         for(int i=start_level+1; i<max_nesting; ++i)
439              expected *= reduce_range;
440         return expected;
441     }
442 };
443 
444 class RecursiveParReduceBodyWithSharedWorker {
445     Worker * m_SharedWorker;
446     int m_NestingLevel;
447     int m_Result;
448 public:
RecursiveParReduceBodyWithSharedWorker(RecursiveParReduceBodyWithSharedWorker & src,tbb::split)449     RecursiveParReduceBodyWithSharedWorker ( RecursiveParReduceBodyWithSharedWorker& src, tbb::split )
450         : m_SharedWorker(src.m_SharedWorker)
451         , m_NestingLevel(src.m_NestingLevel)
452         , m_Result(0)
453     {}
RecursiveParReduceBodyWithSharedWorker(Worker * w,int outer)454     RecursiveParReduceBodyWithSharedWorker ( Worker *w, int outer )
455         : m_SharedWorker(w)
456         , m_NestingLevel(outer)
457         , m_Result(0)
458     {}
459 
operator ()(const tbb::blocked_range<size_t> & r)460     void operator() ( const tbb::blocked_range<size_t>& r ) {
461         if(g_Master == Harness::CurrentTid()) g_MasterExecuted = true;
462         else g_NonMasterExecuted = true;
463         if( tbb::task::self().is_cancelled() ) ++g_TGCCancelled;
464         for (size_t i = r.begin (); i != r.end (); ++i) {
465             m_Result += m_SharedWorker->DoWork (m_NestingLevel);
466         }
467     }
join(const RecursiveParReduceBodyWithSharedWorker & x)468     void join (const RecursiveParReduceBodyWithSharedWorker & x) {
469         m_Result += x.m_Result;
470     }
result()471     int result () { return m_Result; }
472 };
473 
DoWork(int level)474 int Worker::DoWork ( int level ) {
475     ++level;
476     if ( level < max_nesting ) {
477         RecursiveParReduceBodyWithSharedWorker rt (this, level);
478         tbb::parallel_reduce (tbb::blocked_range<size_t>(0, reduce_range, reduce_grain), rt);
479         return rt.result();
480     }
481     else
482         return 1;
483 }
484 
485 //! Regression test for hanging that occurred with the first version of cancellation propagation
TestCancelation3()486 void TestCancelation3 () {
487     Worker w;
488     int result   = w.DoWork (0);
489     int expected = w.Validate(0);
490     ASSERT ( result == expected, "Wrong calculation result");
491 }
492 
493 struct StatsCounters {
494     tbb::atomic<size_t> my_total_created;
495     tbb::atomic<size_t> my_total_deleted;
StatsCountersStatsCounters496     StatsCounters() {
497         my_total_created = 0;
498         my_total_deleted = 0;
499     }
500 };
501 
502 class ParReduceBody {
503     StatsCounters* my_stats;
504     size_t my_id;
505     bool my_exception;
506 
507 public:
ParReduceBody(StatsCounters & s_,bool e_)508     ParReduceBody( StatsCounters& s_, bool e_ ) : my_stats(&s_), my_exception(e_) {
509         my_id = my_stats->my_total_created++;
510     }
511 
ParReduceBody(const ParReduceBody & lhs)512     ParReduceBody( const ParReduceBody& lhs ) {
513         my_stats = lhs.my_stats;
514         my_id = my_stats->my_total_created++;
515     }
516 
ParReduceBody(ParReduceBody & lhs,tbb::split)517     ParReduceBody( ParReduceBody& lhs, tbb::split ) {
518         my_stats = lhs.my_stats;
519         my_id = my_stats->my_total_created++;
520     }
521 
~ParReduceBody()522     ~ParReduceBody(){ ++my_stats->my_total_deleted; }
523 
operator ()(const tbb::blocked_range<std::size_t> &) const524     void operator()( const tbb::blocked_range<std::size_t>& /*range*/ ) const {
525         //Do nothing, except for one task (chosen arbitrarily)
526         if( my_id >= 12 ) {
527             if( my_exception )
528                 ThrowTestException(1);
529             else
530                 tbb::task::self().cancel_group_execution();
531         }
532     }
533 
join(ParReduceBody &)534     void join( ParReduceBody& /*rhs*/ ) {}
535 };
536 
TestCancelation4()537 void TestCancelation4() {
538     StatsCounters statsObj;
539     __TBB_TRY {
540         tbb::task_group_context tgc1, tgc2;
541         ParReduceBody body_for_cancellation(statsObj, false), body_for_exception(statsObj, true);
542         tbb::parallel_reduce( tbb::blocked_range<std::size_t>(0,100000000,100), body_for_cancellation, tbb::simple_partitioner(), tgc1 );
543         tbb::parallel_reduce( tbb::blocked_range<std::size_t>(0,100000000,100), body_for_exception, tbb::simple_partitioner(), tgc2 );
544     } __TBB_CATCH(...) {}
545     ASSERT ( statsObj.my_total_created==statsObj.my_total_deleted, "Not all parallel_reduce body objects created were reclaimed");
546 }
547 
RunParForAndReduceTests()548 void RunParForAndReduceTests () {
549     REMARK( "parallel for and reduce tests\n" );
550     tbb::task_scheduler_init init (g_NumThreads);
551     g_Master = Harness::CurrentTid();
552 
553 #if TBB_USE_EXCEPTIONS && !__TBB_THROW_ACROSS_MODULE_BOUNDARY_BROKEN
554     Test0();
555     Test1();
556     Test2();
557     Test3();
558     Test4();
559 #endif /* TBB_USE_EXCEPTIONS && !__TBB_THROW_ACROSS_MODULE_BOUNDARY_BROKEN */
560     TestCancelation1();
561     TestCancelation2();
562     TestCancelation3();
563     TestCancelation4();
564 }
565 
566 ////////////////////////////////////////////////////////////////////////////////
567 // Tests for tbb::parallel_do
568 
569 #define ITER_RANGE          1000
570 #define ITEMS_TO_FEED       50
571 #define INNER_ITER_RANGE   100
572 #define OUTER_ITER_RANGE  50
573 
574 #define PREPARE_RANGE(Iterator, rangeSize)  \
575     size_t test_vector[rangeSize + 1]; \
576     for (int i =0; i < rangeSize; i++) \
577         test_vector[i] = i; \
578     Iterator begin(&test_vector[0]); \
579     Iterator end(&test_vector[rangeSize])
580 
Feed(tbb::parallel_do_feeder<size_t> & feeder,size_t val)581 void Feed ( tbb::parallel_do_feeder<size_t> &feeder, size_t val ) {
582     if (g_FedTasksCount < ITEMS_TO_FEED) {
583         ++g_FedTasksCount;
584         feeder.add(val);
585     }
586 }
587 
588 #include "harness_iterator.h"
589 
590 #if TBB_USE_EXCEPTIONS
591 
592 // Simple functor object with exception
593 class SimpleParDoBody {
594 public:
operator ()(size_t & value) const595     void operator() ( size_t &value ) const {
596         ++g_CurExecuted;
597         if(g_Master == Harness::CurrentTid()) g_MasterExecuted = true;
598         else g_NonMasterExecuted = true;
599         if( tbb::task::self().is_cancelled() ) ++g_TGCCancelled;
600         Harness::ConcurrencyTracker ct;
601         value += 1000;
602         WaitUntilConcurrencyPeaks();
603         ThrowTestException(1);
604     }
605 };
606 
607 // Simple functor object with exception and feeder
608 class SimpleParDoBodyWithFeeder : SimpleParDoBody {
609 public:
operator ()(size_t & value,tbb::parallel_do_feeder<size_t> & feeder) const610     void operator() ( size_t &value, tbb::parallel_do_feeder<size_t> &feeder ) const {
611         Feed(feeder, 0);
612         SimpleParDoBody::operator()(value);
613     }
614 };
615 
616 // Tests exceptions without nesting
617 template <class Iterator, class simple_body>
Test1_parallel_do()618 void Test1_parallel_do () {
619     ResetGlobals();
620     PREPARE_RANGE(Iterator, ITER_RANGE);
621     TRY();
622         tbb::parallel_do<Iterator, simple_body>(begin, end, simple_body() );
623     CATCH_AND_ASSERT();
624     ASSERT (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
625     ASSERT( g_TGCCancelled <= g_NumThreads, "Too many tasks survived cancellation");
626     ASSERT (g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test");
627     if ( !g_SolitaryException )
628         ASSERT (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
629 
630 } // void Test1_parallel_do ()
631 
632 template <class Iterator>
633 class OuterParDoBody {
634 public:
operator ()(size_t &) const635     void operator()( size_t& /*value*/ ) const {
636         ++g_OuterParCalls;
637         PREPARE_RANGE(Iterator, INNER_ITER_RANGE);
638         tbb::parallel_do<Iterator, SimpleParDoBody>(begin, end, SimpleParDoBody());
639     }
640 };
641 
642 template <class Iterator>
643 class OuterParDoBodyWithFeeder : OuterParDoBody<Iterator> {
644 public:
operator ()(size_t & value,tbb::parallel_do_feeder<size_t> & feeder) const645     void operator()( size_t& value, tbb::parallel_do_feeder<size_t>& feeder ) const {
646         Feed(feeder, 0);
647         OuterParDoBody<Iterator>::operator()(value);
648     }
649 };
650 
651 //! Uses parallel_do body containing an inner parallel_do with the default context not wrapped by a try-block.
652 /** Inner algorithms are spawned inside the new bound context by default. Since
653     exceptions thrown from the inner parallel_do are not handled by the caller
654     (outer parallel_do body) in this test, they will cancel all the sibling inner
655     algorithms. **/
656 template <class Iterator, class outer_body>
Test2_parallel_do()657 void Test2_parallel_do () {
658     ResetGlobals();
659     PREPARE_RANGE(Iterator, ITER_RANGE);
660     TRY();
661         tbb::parallel_do<Iterator, outer_body >(begin, end, outer_body() );
662     CATCH_AND_ASSERT();
663     //if ( g_SolitaryException )
664         ASSERT (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
665     ASSERT( g_TGCCancelled <= g_NumThreads, "Too many tasks survived cancellation");
666     ASSERT (g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test");
667     if ( !g_SolitaryException )
668         ASSERT (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
669 } // void Test2_parallel_do ()
670 
671 template <class Iterator>
672 class OuterParDoBodyWithIsolatedCtx {
673 public:
operator ()(size_t &) const674     void operator()( size_t& /*value*/ ) const {
675         tbb::task_group_context ctx(tbb::task_group_context::isolated);
676         ++g_OuterParCalls;
677         PREPARE_RANGE(Iterator, INNER_ITER_RANGE);
678         tbb::parallel_do<Iterator, SimpleParDoBody>(begin, end, SimpleParDoBody(), ctx);
679     }
680 };
681 
682 template <class Iterator>
683 class OuterParDoBodyWithIsolatedCtxWithFeeder : OuterParDoBodyWithIsolatedCtx<Iterator> {
684 public:
operator ()(size_t & value,tbb::parallel_do_feeder<size_t> & feeder) const685     void operator()( size_t& value, tbb::parallel_do_feeder<size_t> &feeder ) const {
686         Feed(feeder, 0);
687         OuterParDoBodyWithIsolatedCtx<Iterator>::operator()(value);
688     }
689 };
690 
691 //! Uses parallel_do body invoking an inner parallel_do with an isolated context without a try-block.
692 /** Even though exceptions thrown from the inner parallel_do are not handled
693     by the caller in this test, they will not affect sibling inner algorithms
694     already running because of the isolated contexts. However because the first
695     exception cancels the root parallel_do, at most the first g_NumThreads subranges
696     will be processed (which launch inner parallel_dos) **/
697 template <class Iterator, class outer_body>
Test3_parallel_do()698 void Test3_parallel_do () {
699     ResetGlobals();
700     PREPARE_RANGE(Iterator, OUTER_ITER_RANGE);
701     intptr_t innerCalls = INNER_ITER_RANGE,
702              // The assumption here is the same as in outer parallel fors.
703              minExecuted = (g_NumThreads - 1) * innerCalls;
704     g_Master = Harness::CurrentTid();
705     TRY();
706         tbb::parallel_do<Iterator, outer_body >(begin, end, outer_body());
707     CATCH_AND_ASSERT();
708     // figure actual number of expected executions given the number of outer PDos started.
709     minExecuted = (g_OuterParCalls - 1) * innerCalls;
710     // one extra thread may run a task that sees cancellation.  Infrequent but possible
711     ASSERT( g_TGCCancelled <= g_NumThreads, "Too many tasks survived exception");
712     if(g_TGCCancelled > g_NumThreads) REMARK("Extra thread(s) executed after cancel (%d vs. %d)\n",
713             (int)g_TGCCancelled, (int)g_NumThreads);
714     if ( g_SolitaryException ) {
715         ASSERT (g_CurExecuted > minExecuted, "Too few tasks survived exception");
716         ASSERT (g_CurExecuted <= minExecuted + (g_ExecutedAtLastCatch + g_NumThreads), "Too many tasks survived exception");
717     }
718     ASSERT (g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test");
719     if ( !g_SolitaryException )
720         ASSERT (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
721 } // void Test3_parallel_do ()
722 
723 template <class Iterator>
724 class OuterParDoWithEhBody {
725 public:
operator ()(size_t &) const726     void operator()( size_t& /*value*/ ) const {
727         tbb::task_group_context ctx(tbb::task_group_context::isolated);
728         ++g_OuterParCalls;
729         PREPARE_RANGE(Iterator, INNER_ITER_RANGE);
730         TRY();
731             tbb::parallel_do<Iterator, SimpleParDoBody>(begin, end, SimpleParDoBody(), ctx);
732         CATCH();
733     }
734 };
735 
736 template <class Iterator>
737 class OuterParDoWithEhBodyWithFeeder : NoAssign, OuterParDoWithEhBody<Iterator> {
738 public:
operator ()(size_t & value,tbb::parallel_do_feeder<size_t> & feeder) const739     void operator()( size_t &value, tbb::parallel_do_feeder<size_t> &feeder ) const {
740         Feed(feeder, 0);
741         OuterParDoWithEhBody<Iterator>::operator()(value);
742     }
743 };
744 
745 //! Uses parallel_for body invoking an inner parallel_for (with default bound context) inside a try-block.
746 /** Since exception(s) thrown from the inner parallel_for are handled by the caller
747     in this test, they do not affect neither other tasks of the the root parallel_for
748     nor sibling inner algorithms. **/
749 template <class Iterator, class outer_body_with_eh>
Test4_parallel_do()750 void Test4_parallel_do () {
751     ResetGlobals( true, true );
752     PREPARE_RANGE(Iterator, OUTER_ITER_RANGE);
753     g_Master = Harness::CurrentTid();
754     TRY();
755         tbb::parallel_do<Iterator, outer_body_with_eh>(begin, end, outer_body_with_eh());
756     CATCH();
757     ASSERT (!l_ExceptionCaughtAtCurrentLevel, "All exceptions must have been handled in the parallel_do body");
758     intptr_t innerCalls = INNER_ITER_RANGE,
759              outerCalls = OUTER_ITER_RANGE + g_FedTasksCount,
760              maxExecuted = outerCalls * innerCalls,
761              minExecuted = 0;
762     ASSERT( g_TGCCancelled <= g_NumThreads, "Too many tasks survived exception");
763     if ( g_SolitaryException ) {
764         minExecuted = maxExecuted - innerCalls;
765         ASSERT (g_NumExceptionsCaught == 1, "No exception registered");
766         ASSERT (g_CurExecuted >= minExecuted, "Too few tasks executed");
767         // This test has the same property as Test4 (parallel_for); the exception can be
768         // thrown, but some number of tasks from the outer Pdo can execute after the throw but
769         // before the cancellation is signaled (have seen 36).
770         ASSERT_WARNING(g_CurExecuted < maxExecuted || g_TGCCancelled, "All tasks survived exception. Oversubscription?");
771     }
772     else {
773         minExecuted = g_NumExceptionsCaught;
774         ASSERT (g_NumExceptionsCaught > 1 && g_NumExceptionsCaught <= outerCalls, "Unexpected actual number of exceptions");
775         ASSERT (g_CurExecuted >= minExecuted, "Too many executed tasks reported");
776         ASSERT (g_CurExecuted < g_ExecutedAtLastCatch + g_NumThreads + outerCalls, "Too many tasks survived multiple exceptions");
777         ASSERT (g_CurExecuted <= outerCalls * (1 + g_NumThreads), "Too many tasks survived exception");
778     }
779 } // void Test4_parallel_do ()
780 
781 // This body throws an exception only if the task was added by feeder
782 class ParDoBodyWithThrowingFeederTasks {
783 public:
784     //! This form of the function call operator can be used when the body needs to add more work during the processing
operator ()(size_t & value,tbb::parallel_do_feeder<size_t> & feeder) const785     void operator() ( size_t &value, tbb::parallel_do_feeder<size_t> &feeder ) const {
786         ++g_CurExecuted;
787         if(g_Master == Harness::CurrentTid()) g_MasterExecuted = true;
788         else g_NonMasterExecuted = true;
789         if( tbb::task::self().is_cancelled() ) ++g_TGCCancelled;
790         Feed(feeder, 1);
791         if (value == 1)
792             ThrowTestException(1);
793     }
794 }; // class ParDoBodyWithThrowingFeederTasks
795 
796 // Test exception in task, which was added by feeder.
797 template <class Iterator>
Test5_parallel_do()798 void Test5_parallel_do () {
799     ResetGlobals();
800     PREPARE_RANGE(Iterator, ITER_RANGE);
801     g_Master = Harness::CurrentTid();
802     TRY();
803         tbb::parallel_do<Iterator, ParDoBodyWithThrowingFeederTasks>(begin, end, ParDoBodyWithThrowingFeederTasks());
804     CATCH();
805     if (g_SolitaryException) {
806         // Failure occurs when g_ExceptionInMaster is false, but all the 1 values in the range
807         // are handled by the master thread.  In this case no throw occurs.
808         ASSERT (l_ExceptionCaughtAtCurrentLevel     // we saw an exception
809                 || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) // non-master throws but none tried
810                 || (g_ExceptionInMaster && !g_MasterExecutedThrow)     // master throws but master didn't try
811                 , "At least one exception should occur");
812         if(!g_ExceptionCaught) {
813             if(g_ExceptionInMaster)
814                 REMARK("PDo exception not thrown; non-masters handled all throwing values.\n");
815             else
816                 REMARK("PDo exception not thrown; master handled all throwing values.\n");
817         }
818     }
819 } // void Test5_parallel_do ()
820 
821 #endif /* TBB_USE_EXCEPTIONS */
822 
823 class ParDoBodyToCancel {
824 public:
operator ()(size_t &) const825     void operator()( size_t& /*value*/ ) const {
826         ++g_CurExecuted;
827         CancellatorTask::WaitUntilReady();
828     }
829 };
830 
831 class ParDoBodyToCancelWithFeeder : ParDoBodyToCancel {
832 public:
operator ()(size_t & value,tbb::parallel_do_feeder<size_t> & feeder) const833     void operator()( size_t& value, tbb::parallel_do_feeder<size_t> &feeder ) const {
834         Feed(feeder, 0);
835         ParDoBodyToCancel::operator()(value);
836     }
837 };
838 
839 template<class B, class Iterator>
840 class ParDoWorkerTask : public tbb::task {
841     tbb::task_group_context &my_ctx;
842 
execute()843     tbb::task* execute () __TBB_override {
844         PREPARE_RANGE(Iterator, INNER_ITER_RANGE);
845         tbb::parallel_do<Iterator, B>( begin, end, B(), my_ctx );
846         return NULL;
847     }
848 public:
ParDoWorkerTask(tbb::task_group_context & ctx)849     ParDoWorkerTask ( tbb::task_group_context& ctx ) : my_ctx(ctx) {}
850 };
851 
852 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm).
853 template <class Iterator, class body_to_cancel>
TestCancelation1_parallel_do()854 void TestCancelation1_parallel_do () {
855     ResetGlobals( false );
856     intptr_t  threshold = 10;
857     tbb::task_group_context  ctx;
858     ctx.reset();
859     tbb::empty_task &r = *new( tbb::task::allocate_root() ) tbb::empty_task;
860     r.set_ref_count(3);
861     r.spawn( *new( r.allocate_child() ) CancellatorTask(ctx, threshold) );
862     __TBB_Yield();
863     r.spawn( *new( r.allocate_child() ) ParDoWorkerTask<body_to_cancel, Iterator>(ctx) );
864     TRY();
865         r.wait_for_all();
866     CATCH_AND_FAIL();
867     ASSERT (g_CurExecuted < g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks were executed after cancellation");
868     r.destroy(r);
869 }
870 
871 class ParDoBodyToCancel2 {
872 public:
operator ()(size_t &) const873     void operator()( size_t& /*value*/ ) const {
874         ++g_CurExecuted;
875         Harness::ConcurrencyTracker ct;
876         // The test will hang (and be timed out by the test system) if is_cancelled() is broken
877         while( !tbb::task::self().is_cancelled() )
878             __TBB_Yield();
879     }
880 };
881 
882 class ParDoBodyToCancel2WithFeeder : ParDoBodyToCancel2 {
883 public:
operator ()(size_t & value,tbb::parallel_do_feeder<size_t> & feeder) const884     void operator()( size_t& value, tbb::parallel_do_feeder<size_t> &feeder ) const {
885         Feed(feeder, 0);
886         ParDoBodyToCancel2::operator()(value);
887     }
888 };
889 
890 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm).
891 /** This version also tests task::is_cancelled() method. **/
892 template <class Iterator, class body_to_cancel>
TestCancelation2_parallel_do()893 void TestCancelation2_parallel_do () {
894     ResetGlobals();
895     RunCancellationTest<ParDoWorkerTask<body_to_cancel, Iterator>, CancellatorTask2>();
896 }
897 
898 #define RunWithSimpleBody(func, body)       \
899     func<Harness::RandomIterator<size_t>, body>();           \
900     func<Harness::RandomIterator<size_t>, body##WithFeeder>();  \
901     func<Harness::ForwardIterator<size_t>, body>();         \
902     func<Harness::ForwardIterator<size_t>, body##WithFeeder>()
903 
904 #define RunWithTemplatedBody(func, body)       \
905     func<Harness::RandomIterator<size_t>, body<Harness::RandomIterator<size_t> > >();           \
906     func<Harness::RandomIterator<size_t>, body##WithFeeder<Harness::RandomIterator<size_t> > >();  \
907     func<Harness::ForwardIterator<size_t>, body<Harness::ForwardIterator<size_t> > >();         \
908     func<Harness::ForwardIterator<size_t>, body##WithFeeder<Harness::ForwardIterator<size_t> > >()
909 
RunParDoTests()910 void RunParDoTests() {
911     REMARK( "parallel do tests\n" );
912     tbb::task_scheduler_init init (g_NumThreads);
913     g_Master = Harness::CurrentTid();
914 #if TBB_USE_EXCEPTIONS && !__TBB_THROW_ACROSS_MODULE_BOUNDARY_BROKEN
915     RunWithSimpleBody(Test1_parallel_do, SimpleParDoBody);
916     RunWithTemplatedBody(Test2_parallel_do, OuterParDoBody);
917     RunWithTemplatedBody(Test3_parallel_do, OuterParDoBodyWithIsolatedCtx);
918     RunWithTemplatedBody(Test4_parallel_do, OuterParDoWithEhBody);
919     Test5_parallel_do<Harness::ForwardIterator<size_t> >();
920     Test5_parallel_do<Harness::RandomIterator<size_t> >();
921 #endif /* TBB_USE_EXCEPTIONS && !__TBB_THROW_ACROSS_MODULE_BOUNDARY_BROKEN */
922     RunWithSimpleBody(TestCancelation1_parallel_do, ParDoBodyToCancel);
923     RunWithSimpleBody(TestCancelation2_parallel_do, ParDoBodyToCancel2);
924 }
925 
926 ////////////////////////////////////////////////////////////////////////////////
927 // Tests for tbb::pipeline
928 
929 #define NUM_ITEMS   100
930 
931 const size_t c_DataEndTag = size_t(~0);
932 
933 int g_NumTokens = 0;
934 
935 // Simple input filter class, it assigns 1 to all array members
936 // It stops when it receives item equal to -1
937 class InputFilter: public tbb::filter {
938     tbb::atomic<size_t> m_Item;
939     size_t m_Buffer[NUM_ITEMS + 1];
940 public:
InputFilter()941     InputFilter() : tbb::filter(parallel) {
942         m_Item = 0;
943         for (size_t i = 0; i < NUM_ITEMS; ++i )
944             m_Buffer[i] = 1;
945         m_Buffer[NUM_ITEMS] = c_DataEndTag;
946     }
947 
operator ()(void *)948     void* operator()( void* ) __TBB_override {
949         size_t item = m_Item.fetch_and_increment();
950         if(g_Master == Harness::CurrentTid()) g_MasterExecuted = true;
951         else g_NonMasterExecuted = true;
952         if( tbb::task::self().is_cancelled() ) ++g_TGCCancelled;
953         if(item == 1) {
954             ++g_PipelinesStarted;   // count on emitting the first item.
955         }
956         if ( item >= NUM_ITEMS )
957             return NULL;
958         m_Buffer[item] = 1;
959         return &m_Buffer[item];
960     }
961 
buffer()962     size_t* buffer() { return m_Buffer; }
963 }; // class InputFilter
964 
965 // Pipeline filter, without exceptions throwing
966 class NoThrowFilter : public tbb::filter {
967     size_t m_Value;
968 public:
969     enum operation {
970         addition,
971         subtraction,
972         multiplication
973     } m_Operation;
974 
NoThrowFilter(operation _operation,size_t value,bool is_parallel)975     NoThrowFilter(operation _operation, size_t value, bool is_parallel)
976         : filter(is_parallel? tbb::filter::parallel : tbb::filter::serial_in_order),
977         m_Value(value), m_Operation(_operation)
978     {}
operator ()(void * item)979     void* operator()(void* item) __TBB_override {
980         size_t &value = *(size_t*)item;
981         if(g_Master == Harness::CurrentTid()) g_MasterExecuted = true;
982         else g_NonMasterExecuted = true;
983         if( tbb::task::self().is_cancelled() ) ++g_TGCCancelled;
984         ASSERT(value != c_DataEndTag, "terminator element is being processed");
985         switch (m_Operation){
986             case addition:
987                 value += m_Value;
988                 break;
989             case subtraction:
990                 value -= m_Value;
991                 break;
992             case multiplication:
993                 value *= m_Value;
994                 break;
995             default:
996                 ASSERT(0, "Wrong operation parameter passed to NoThrowFilter");
997         } // switch (m_Operation)
998         return item;
999     }
1000 };
1001 
1002 // Test pipeline without exceptions throwing
Test0_pipeline()1003 void Test0_pipeline () {
1004     ResetGlobals();
1005     // Run test when serial filter is the first non-input filter
1006     InputFilter inputFilter;  //Emits NUM_ITEMS items
1007     NoThrowFilter filter1(NoThrowFilter::addition, 99, false);
1008     NoThrowFilter filter2(NoThrowFilter::subtraction, 90, true);
1009     NoThrowFilter filter3(NoThrowFilter::multiplication, 5, false);
1010     // Result should be 50 for all items except the last
1011     tbb::pipeline p;
1012     p.add_filter(inputFilter);
1013     p.add_filter(filter1);
1014     p.add_filter(filter2);
1015     p.add_filter(filter3);
1016     p.run(8);
1017     for (size_t i = 0; i < NUM_ITEMS; ++i)
1018         ASSERT(inputFilter.buffer()[i] == 50, "pipeline didn't process items properly");
1019 } // void Test0_pipeline ()
1020 
1021 #if TBB_USE_EXCEPTIONS
1022 
1023 // Simple filter with exception throwing.  If parallel, will wait until
1024 // as many parallel filters start as there are threads.
1025 class SimpleFilter : public tbb::filter {
1026     bool m_canThrow;
1027 public:
SimpleFilter(tbb::filter::mode _mode,bool canThrow)1028     SimpleFilter (tbb::filter::mode _mode, bool canThrow ) : filter (_mode), m_canThrow(canThrow) {}
operator ()(void * item)1029     void* operator()(void* item) __TBB_override {
1030         ++g_CurExecuted;
1031         if(g_Master == Harness::CurrentTid()) g_MasterExecuted = true;
1032         else g_NonMasterExecuted = true;
1033         if( tbb::task::self().is_cancelled() ) ++g_TGCCancelled;
1034         if ( m_canThrow ) {
1035             if ( !is_serial() ) {
1036                 Harness::ConcurrencyTracker ct;
1037                 WaitUntilConcurrencyPeaks( min(g_NumTokens, g_NumThreads) );
1038             }
1039             ThrowTestException(1);
1040         }
1041         return item;
1042     }
1043 }; // class SimpleFilter
1044 
1045 // This enumeration represents filters order in pipeline
1046 struct FilterSet {
1047     tbb::filter::mode   mode1,
1048                         mode2;
1049     bool                throw1,
1050                         throw2;
1051 
FilterSetFilterSet1052     FilterSet( tbb::filter::mode m1, tbb::filter::mode m2, bool t1, bool t2 )
1053         : mode1(m1), mode2(m2), throw1(t1), throw2(t2)
1054     {}
1055 }; // struct FilterSet
1056 
1057 FilterSet serial_parallel( tbb::filter::serial, tbb::filter::parallel, /*throw1*/false, /*throw2*/true );
1058 
1059 template<typename InFilter, typename Filter>
1060 class CustomPipeline : protected tbb::pipeline {
1061     InFilter inputFilter;
1062     Filter filter1;
1063     Filter filter2;
1064 public:
CustomPipeline(const FilterSet & filters)1065     CustomPipeline( const FilterSet& filters )
1066         : filter1(filters.mode1, filters.throw1), filter2(filters.mode2, filters.throw2)
1067     {
1068        add_filter(inputFilter);
1069        add_filter(filter1);
1070        add_filter(filter2);
1071     }
run()1072     void run () { tbb::pipeline::run(g_NumTokens); }
run(tbb::task_group_context & ctx)1073     void run ( tbb::task_group_context& ctx ) { tbb::pipeline::run(g_NumTokens, ctx); }
1074 
1075     using tbb::pipeline::add_filter;
1076 };
1077 
1078 typedef CustomPipeline<InputFilter, SimpleFilter> SimplePipeline;
1079 
1080 // Tests exceptions without nesting
Test1_pipeline(const FilterSet & filters)1081 void Test1_pipeline ( const FilterSet& filters ) {
1082     ResetGlobals();
1083     SimplePipeline testPipeline(filters);
1084     TRY();
1085         testPipeline.run();
1086         if ( g_CurExecuted == 2 * NUM_ITEMS ) {
1087             // all the items were processed, though an exception was supposed to occur.
1088             if(!g_ExceptionInMaster && g_NonMasterExecutedThrow > 0) {
1089                 // if !g_ExceptionInMaster, the master thread is not allowed to throw.
1090                 // if g_nonMasterExcutedThrow > 0 then a thread besides the master tried to throw.
1091                 ASSERT(filters.mode1 != tbb::filter::parallel && filters.mode2 != tbb::filter::parallel, "Unusual count");
1092             }
1093             else {
1094                 REMARK("test1_Pipeline with %d threads: Only the master thread tried to throw, and it is not allowed to.\n", (int)g_NumThreads);
1095             }
1096             // In case of all serial filters they might be all executed in the thread(s)
1097             // where exceptions are not allowed by the common test logic. So we just quit.
1098             return;
1099         }
1100     CATCH_AND_ASSERT();
1101     ASSERT( g_TGCCancelled <= g_NumThreads, "Too many tasks survived exception");
1102     ASSERT (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
1103     ASSERT (g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test");
1104     if ( !g_SolitaryException )
1105         ASSERT (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
1106 
1107 } // void Test1_pipeline ()
1108 
1109 // Filter with nesting
1110 class OuterFilter : public tbb::filter {
1111 public:
OuterFilter(tbb::filter::mode _mode,bool)1112     OuterFilter (tbb::filter::mode _mode, bool ) : filter (_mode) {}
1113 
operator ()(void * item)1114     void* operator()(void* item) __TBB_override {
1115         ++g_OuterParCalls;
1116         SimplePipeline testPipeline(serial_parallel);
1117         testPipeline.run();
1118         return item;
1119     }
1120 }; // class OuterFilter
1121 
1122 //! Uses pipeline containing an inner pipeline with the default context not wrapped by a try-block.
1123 /** Inner algorithms are spawned inside the new bound context by default. Since
1124     exceptions thrown from the inner pipeline are not handled by the caller
1125     (outer pipeline body) in this test, they will cancel all the sibling inner
1126     algorithms. **/
Test2_pipeline(const FilterSet & filters)1127 void Test2_pipeline ( const FilterSet& filters ) {
1128     ResetGlobals();
1129     g_NestedPipelines = true;
1130     CustomPipeline<InputFilter, OuterFilter> testPipeline(filters);
1131     TRY();
1132         testPipeline.run();
1133     CATCH_AND_ASSERT();
1134     bool okayNoExceptionCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow);
1135     ASSERT (g_NumExceptionsCaught == 1 || okayNoExceptionCaught, "No try_blocks in any body expected in this test");
1136     if ( g_SolitaryException ) {
1137         if( g_TGCCancelled > g_NumThreads) REMARK( "Extra tasks ran after exception thrown (%d vs. %d)\n",
1138                 (int)g_TGCCancelled, (int)g_NumThreads);
1139     }
1140     else {
1141         ASSERT (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
1142     }
1143 } // void Test2_pipeline ()
1144 
1145 //! creates isolated inner pipeline and runs it.
1146 class OuterFilterWithIsolatedCtx : public tbb::filter {
1147 public:
OuterFilterWithIsolatedCtx(tbb::filter::mode m,bool)1148     OuterFilterWithIsolatedCtx(tbb::filter::mode m, bool ) : filter(m) {}
1149 
operator ()(void * item)1150     void* operator()(void* item) __TBB_override {
1151         ++g_OuterParCalls;
1152         tbb::task_group_context ctx(tbb::task_group_context::isolated);
1153         // create inner pipeline with serial input, parallel output filter, second filter throws
1154         SimplePipeline testPipeline(serial_parallel);
1155         testPipeline.run(ctx);
1156         return item;
1157     }
1158 }; // class OuterFilterWithIsolatedCtx
1159 
1160 //! Uses pipeline invoking an inner pipeline with an isolated context without a try-block.
1161 /** Even though exceptions thrown from the inner pipeline are not handled
1162     by the caller in this test, they will not affect sibling inner algorithms
1163     already running because of the isolated contexts. However because the first
1164     exception cancels the root parallel_do only the first g_NumThreads subranges
1165     will be processed (which launch inner pipelines) **/
Test3_pipeline(const FilterSet & filters)1166 void Test3_pipeline ( const FilterSet& filters ) {
1167     for( int nTries = 1; nTries <= 4; ++nTries) {
1168         ResetGlobals();
1169         g_NestedPipelines = true;
1170         g_Master = Harness::CurrentTid();
1171         intptr_t innerCalls = NUM_ITEMS,
1172                  minExecuted = (g_NumThreads - 1) * innerCalls;
1173         CustomPipeline<InputFilter, OuterFilterWithIsolatedCtx> testPipeline(filters);
1174         TRY();
1175             testPipeline.run();
1176         CATCH_AND_ASSERT();
1177 
1178         bool okayNoExceptionCaught = (g_ExceptionInMaster && !g_MasterExecuted) ||
1179             (!g_ExceptionInMaster && !g_NonMasterExecuted);
1180         // only test assertions if the test threw an exception (or we don't care)
1181         bool testSucceeded = okayNoExceptionCaught || g_NumExceptionsCaught > 0;
1182         if(testSucceeded) {
1183             if (g_SolitaryException) {
1184 
1185                 // The test is one outer pipeline with two NestedFilters that each start an inner pipeline.
1186                 // Each time the input filter of a pipeline delivers its first item, it increments
1187                 // g_PipelinesStarted.  When g_SolitaryException, the throw will not occur until
1188                 // g_PipelinesStarted >= 3.  (This is so at least a second pipeline in its own isolated
1189                 // context will start; that is what we're testing.)
1190                 //
1191                 // There are two pipelines which will NOT run to completion when a solitary throw
1192                 // happens in an isolated inner context: the outer pipeline and the pipeline which
1193                 // throws.  All the other pipelines which start should run to completion.  But only
1194                 // inner body invocations are counted.
1195                 //
1196                 // So g_CurExecuted should be about
1197                 //
1198                 //   (2*NUM_ITEMS) * (g_PipelinesStarted - 2) + 1
1199                 //   ^ executions for each completed pipeline
1200                 //                   ^ completing pipelines (remembering two will not complete)
1201                 //                                              ^ one for the inner throwing pipeline
1202 
1203                 minExecuted = (2*NUM_ITEMS) * (g_PipelinesStarted - 2) + 1;
1204                 // each failing pipeline must execute at least two tasks
1205                 ASSERT(g_CurExecuted >= minExecuted, "Too few tasks survived exception");
1206                 // no more than g_NumThreads tasks will be executed in a cancelled context.  Otherwise
1207                 // tasks not executing at throw were scheduled.
1208                 ASSERT( g_TGCCancelled <= g_NumThreads, "Tasks not in-flight were executed");
1209                 ASSERT(g_NumExceptionsCaught == 1, "Should have only one exception");
1210                 // if we're only throwing from the master thread, and that thread didn't
1211                 // participate in the pipelines, then no throw occurred.
1212                 if(g_ExceptionInMaster && !g_MasterExecuted) {
1213                     REMARK_ONCE("Master expected to throw, but didn't participate.\n");
1214                 }
1215                 else if(!g_ExceptionInMaster && !g_NonMasterExecuted) {
1216                     REMARK_ONCE("Non-master expected to throw, but didn't participate.\n");
1217                 }
1218             }
1219             ASSERT (g_NumExceptionsCaught == 1 || okayNoExceptionCaught, "No try_blocks in any body expected in this test");
1220             ASSERT ((g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads) || okayNoExceptionCaught, "Too many tasks survived exception");
1221             if(nTries > 1) REMARK("Test3_pipeline succeeeded on try %d\n", nTries);
1222             return;
1223         }
1224     }
1225     REMARK_ONCE("Test3_pipeline failed for g_NumThreads==%d, g_ExceptionInMaster==%s , g_SolitaryException==%s\n",
1226             g_NumThreads, g_ExceptionInMaster?"T":"F", g_SolitaryException?"T":"F");
1227 } // void Test3_pipeline ()
1228 
1229 class OuterFilterWithEhBody : public tbb::filter {
1230 public:
OuterFilterWithEhBody(tbb::filter::mode m,bool)1231     OuterFilterWithEhBody(tbb::filter::mode m, bool ) : filter(m) {}
1232 
operator ()(void * item)1233     void* operator()(void* item) __TBB_override {
1234         tbb::task_group_context ctx(tbb::task_group_context::isolated);
1235         ++g_OuterParCalls;
1236         SimplePipeline testPipeline(serial_parallel);
1237         TRY();
1238             testPipeline.run(ctx);
1239         CATCH();
1240         return item;
1241     }
1242 }; // class OuterFilterWithEhBody
1243 
1244 //! Uses pipeline body invoking an inner pipeline (with isolated context) inside a try-block.
1245 /** Since exception(s) thrown from the inner pipeline are handled by the caller
1246     in this test, they do not affect other tasks of the the root pipeline
1247     nor sibling inner algorithms. **/
Test4_pipeline(const FilterSet & filters)1248 void Test4_pipeline ( const FilterSet& filters ) {
1249 #if __GNUC__ && !__INTEL_COMPILER
1250     if ( strncmp(__VERSION__, "4.1.0", 5) == 0 ) {
1251         REMARK_ONCE("Known issue: one of exception handling tests is skipped.\n");
1252         return;
1253     }
1254 #endif
1255     ResetGlobals( true, true );
1256     // each outer pipeline stage will start NUM_ITEMS inner pipelines.
1257     // each inner pipeline that doesn't throw will process NUM_ITEMS items.
1258     // for solitary exception there will be one pipeline that only processes one stage, one item.
1259     // innerCalls should be 2*NUM_ITEMS
1260     intptr_t innerCalls = 2*NUM_ITEMS,
1261              outerCalls = 2 * NUM_ITEMS,
1262              maxExecuted = outerCalls * innerCalls;  // the number of invocations of the inner pipelines
1263     CustomPipeline<InputFilter, OuterFilterWithEhBody> testPipeline(filters);
1264     TRY();
1265         testPipeline.run();
1266     CATCH_AND_ASSERT();
1267     intptr_t  minExecuted = 0;
1268     bool okayNoExceptionCaught = (g_ExceptionInMaster && !g_MasterExecuted) ||
1269         (!g_ExceptionInMaster && !g_NonMasterExecuted);
1270     if ( g_SolitaryException ) {
1271         minExecuted = maxExecuted - innerCalls;  // one throwing inner pipeline
1272         ASSERT (g_NumExceptionsCaught == 1 || okayNoExceptionCaught, "No exception registered");
1273         ASSERT( g_TGCCancelled <= g_NumThreads, "Too many tasks survived exception");  // probably will assert.
1274     }
1275     else {
1276         // we assume throwing pipelines will not count
1277         minExecuted = (outerCalls - g_NumExceptionsCaught) * innerCalls;
1278         ASSERT((g_NumExceptionsCaught >= 1 && g_NumExceptionsCaught <= outerCalls)||okayNoExceptionCaught, "Unexpected actual number of exceptions");
1279         ASSERT (g_CurExecuted >= minExecuted, "Too many executed tasks reported");
1280         // too many already-scheduled tasks are started after the first exception is
1281         // thrown.  And g_ExecutedAtLastCatch is updated every time an exception is caught.
1282         // So with multiple exceptions there are a variable number of tasks that have been
1283         // discarded because of the signals.
1284         // each throw is caught, so we will see many cancelled tasks.  g_ExecutedAtLastCatch is
1285         // updated with each throw, so the value will be the number of tasks executed at the last
1286         ASSERT (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived multiple exceptions");
1287     }
1288 } // void Test4_pipeline ()
1289 
1290 //! Testing filter::finalize method
1291 #define BUFFER_SIZE     32
1292 #define NUM_BUFFERS     1024
1293 
1294 tbb::atomic<size_t> g_AllocatedCount; // Number of currently allocated buffers
1295 tbb::atomic<size_t> g_TotalCount; // Total number of allocated buffers
1296 
1297 //! Base class for all filters involved in finalize method testing
1298 class FinalizationBaseFilter : public tbb::filter {
1299 public:
FinalizationBaseFilter(tbb::filter::mode m)1300     FinalizationBaseFilter ( tbb::filter::mode m ) : filter(m) {}
1301 
1302     // Deletes buffers if exception occurred
finalize(void * item)1303     virtual void finalize( void* item ) __TBB_override {
1304         size_t* m_Item = (size_t*)item;
1305         delete[] m_Item;
1306         --g_AllocatedCount;
1307     }
1308 };
1309 
1310 //! Input filter to test finalize method
1311 class InputFilterWithFinalization: public FinalizationBaseFilter {
1312 public:
InputFilterWithFinalization()1313     InputFilterWithFinalization() : FinalizationBaseFilter(tbb::filter::serial) {
1314         g_TotalCount = 0;
1315     }
operator ()(void *)1316     void* operator()( void* ) __TBB_override {
1317         if (g_TotalCount == NUM_BUFFERS)
1318             return NULL;
1319         size_t* item = new size_t[BUFFER_SIZE];
1320         for (int i = 0; i < BUFFER_SIZE; i++)
1321             item[i] = 1;
1322         ++g_TotalCount;
1323         ++g_AllocatedCount;
1324         return item;
1325     }
1326 };
1327 
1328 // The filter multiplies each buffer item by 10.
1329 class ProcessingFilterWithFinalization : public FinalizationBaseFilter {
1330 public:
ProcessingFilterWithFinalization(tbb::filter::mode _mode,bool)1331     ProcessingFilterWithFinalization (tbb::filter::mode _mode, bool) : FinalizationBaseFilter (_mode) {}
1332 
operator ()(void * item)1333     void* operator()( void* item) __TBB_override {
1334         if(g_Master == Harness::CurrentTid()) g_MasterExecuted = true;
1335         else g_NonMasterExecuted = true;
1336         if( tbb::task::self().is_cancelled()) ++g_TGCCancelled;
1337         if (g_TotalCount > NUM_BUFFERS / 2)
1338             ThrowTestException(1);
1339         size_t* m_Item = (size_t*)item;
1340         for (int i = 0; i < BUFFER_SIZE; i++)
1341             m_Item[i] *= 10;
1342         return item;
1343     }
1344 };
1345 
1346 // Output filter deletes previously allocated buffer
1347 class OutputFilterWithFinalization : public FinalizationBaseFilter {
1348 public:
OutputFilterWithFinalization(tbb::filter::mode m)1349     OutputFilterWithFinalization (tbb::filter::mode m) : FinalizationBaseFilter (m) {}
1350 
operator ()(void * item)1351     void* operator()( void* item) __TBB_override {
1352         size_t* m_Item = (size_t*)item;
1353         delete[] m_Item;
1354         --g_AllocatedCount;
1355         return NULL;
1356     }
1357 };
1358 
1359 //! Tests filter::finalize method
Test5_pipeline(const FilterSet & filters)1360 void Test5_pipeline ( const FilterSet& filters ) {
1361     ResetGlobals();
1362     g_AllocatedCount = 0;
1363     CustomPipeline<InputFilterWithFinalization, ProcessingFilterWithFinalization> testPipeline(filters);
1364     OutputFilterWithFinalization my_output_filter(tbb::filter::parallel);
1365 
1366     testPipeline.add_filter(my_output_filter);
1367     TRY();
1368         testPipeline.run();
1369     CATCH();
1370     ASSERT (g_AllocatedCount == 0, "Memory leak: Some my_object weren't destroyed");
1371 } // void Test5_pipeline ()
1372 
1373 //! Tests pipeline function passed with different combination of filters
1374 template<void testFunc(const FilterSet&)>
TestWithDifferentFilters()1375 void TestWithDifferentFilters() {
1376     const int NumFilterTypes = 3;
1377     const tbb::filter::mode modes[NumFilterTypes] = {
1378             tbb::filter::parallel,
1379             tbb::filter::serial,
1380             tbb::filter::serial_out_of_order
1381         };
1382     for ( int i = 0; i < NumFilterTypes; ++i ) {
1383         for ( int j = 0; j < NumFilterTypes; ++j ) {
1384             for ( int k = 0; k < 2; ++k )
1385                 testFunc( FilterSet(modes[i], modes[j], k == 0, k != 0) );
1386         }
1387     }
1388 }
1389 
1390 #endif /* TBB_USE_EXCEPTIONS */
1391 
1392 class FilterToCancel : public tbb::filter {
1393 public:
FilterToCancel(bool is_parallel)1394     FilterToCancel(bool is_parallel)
1395         : filter( is_parallel ? tbb::filter::parallel : tbb::filter::serial_in_order )
1396     {}
operator ()(void * item)1397     void* operator()(void* item) __TBB_override {
1398         ++g_CurExecuted;
1399         CancellatorTask::WaitUntilReady();
1400         return item;
1401     }
1402 }; // class FilterToCancel
1403 
1404 template <class Filter_to_cancel>
1405 class PipelineLauncherTask : public tbb::task {
1406     tbb::task_group_context &my_ctx;
1407 public:
PipelineLauncherTask(tbb::task_group_context & ctx)1408     PipelineLauncherTask ( tbb::task_group_context& ctx ) : my_ctx(ctx) {}
1409 
execute()1410     tbb::task* execute () __TBB_override {
1411         // Run test when serial filter is the first non-input filter
1412         InputFilter inputFilter;
1413         Filter_to_cancel filterToCancel(true);
1414         tbb::pipeline p;
1415         p.add_filter(inputFilter);
1416         p.add_filter(filterToCancel);
1417         p.run(g_NumTokens, my_ctx);
1418         return NULL;
1419     }
1420 };
1421 
1422 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm).
TestCancelation1_pipeline()1423 void TestCancelation1_pipeline () {
1424     ResetGlobals();
1425     g_ThrowException = false;
1426     intptr_t  threshold = 10;
1427     tbb::task_group_context ctx;
1428     ctx.reset();
1429     tbb::empty_task &r = *new( tbb::task::allocate_root() ) tbb::empty_task;
1430     r.set_ref_count(3);
1431     r.spawn( *new( r.allocate_child() ) CancellatorTask(ctx, threshold) );
1432     __TBB_Yield();
1433     r.spawn( *new( r.allocate_child() ) PipelineLauncherTask<FilterToCancel>(ctx) );
1434     TRY();
1435         r.wait_for_all();
1436     CATCH_AND_FAIL();
1437     r.destroy(r);
1438     ASSERT( g_TGCCancelled <= g_NumThreads, "Too many tasks survived cancellation");
1439     ASSERT (g_CurExecuted < g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks were executed after cancellation");
1440 }
1441 
1442 class FilterToCancel2 : public tbb::filter {
1443 public:
FilterToCancel2(bool is_parallel)1444     FilterToCancel2(bool is_parallel)
1445         : filter ( is_parallel ? tbb::filter::parallel : tbb::filter::serial_in_order)
1446     {}
1447 
operator ()(void * item)1448     void* operator()(void* item) __TBB_override {
1449         ++g_CurExecuted;
1450         Harness::ConcurrencyTracker ct;
1451         // The test will hang (and be timed out by the test system) if is_cancelled() is broken
1452         while( !tbb::task::self().is_cancelled() )
1453             __TBB_Yield();
1454         return item;
1455     }
1456 };
1457 
1458 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm).
1459 /** This version also tests task::is_cancelled() method. **/
TestCancelation2_pipeline()1460 void TestCancelation2_pipeline () {
1461     ResetGlobals();
1462     RunCancellationTest<PipelineLauncherTask<FilterToCancel2>, CancellatorTask2>();
1463     // g_CurExecuted is always >= g_ExecutedAtLastCatch, because the latter is always a snapshot of the
1464     // former, and g_CurExecuted is monotonic increasing.  so the comparison should be at least ==.
1465     // If another filter is started after cancel but before cancellation is propagated, then the
1466     // number will be larger.
1467     ASSERT (g_CurExecuted <= g_ExecutedAtLastCatch, "Some tasks were executed after cancellation");
1468 }
1469 
RunPipelineTests()1470 void RunPipelineTests() {
1471     REMARK( "pipeline tests\n" );
1472     tbb::task_scheduler_init init (g_NumThreads);
1473     g_Master = Harness::CurrentTid();
1474     g_NumTokens = 2 * g_NumThreads;
1475 
1476     Test0_pipeline();
1477 #if TBB_USE_EXCEPTIONS && !__TBB_THROW_ACROSS_MODULE_BOUNDARY_BROKEN
1478     TestWithDifferentFilters<Test1_pipeline>();
1479     TestWithDifferentFilters<Test2_pipeline>();
1480     TestWithDifferentFilters<Test3_pipeline>();
1481     TestWithDifferentFilters<Test4_pipeline>();
1482     TestWithDifferentFilters<Test5_pipeline>();
1483 #endif /* TBB_USE_EXCEPTIONS && !__TBB_THROW_ACROSS_MODULE_BOUNDARY_BROKEN */
1484     TestCancelation1_pipeline();
1485     TestCancelation2_pipeline();
1486 }
1487 
1488 
1489 #if TBB_USE_EXCEPTIONS
1490 
1491 class MyCapturedException : public tbb::captured_exception {
1492 public:
1493     static int m_refCount;
1494 
MyCapturedException()1495     MyCapturedException () : tbb::captured_exception("MyCapturedException", "test") { ++m_refCount; }
~MyCapturedException()1496     ~MyCapturedException () throw() { --m_refCount; }
1497 
move()1498     MyCapturedException* move () throw() __TBB_override {
1499         MyCapturedException* movee = (MyCapturedException*)malloc(sizeof(MyCapturedException));
1500         return ::new (movee) MyCapturedException;
1501     }
destroy()1502     void destroy () throw() __TBB_override {
1503         this->~MyCapturedException();
1504         free(this);
1505     }
operator delete(void * p)1506     void operator delete ( void* p ) { free(p); }
1507 };
1508 
1509 int MyCapturedException::m_refCount = 0;
1510 
DeleteTbbException(volatile tbb::tbb_exception * pe)1511 void DeleteTbbException ( volatile tbb::tbb_exception* pe ) {
1512     delete pe;
1513 }
1514 
TestTbbExceptionAPI()1515 void TestTbbExceptionAPI () {
1516     const char *name = "Test captured exception",
1517                *reason = "Unit testing";
1518     tbb::captured_exception e(name, reason);
1519     ASSERT (strcmp(e.name(), name) == 0, "Setting captured exception name failed");
1520     ASSERT (strcmp(e.what(), reason) == 0, "Setting captured exception reason failed");
1521     tbb::captured_exception c(e);
1522     ASSERT (strcmp(c.name(), e.name()) == 0, "Copying captured exception name failed");
1523     ASSERT (strcmp(c.what(), e.what()) == 0, "Copying captured exception reason failed");
1524     tbb::captured_exception *m = e.move();
1525     ASSERT (strcmp(m->name(), name) == 0, "Moving captured exception name failed");
1526     ASSERT (strcmp(m->what(), reason) == 0, "Moving captured exception reason failed");
1527     ASSERT (!e.name() && !e.what(), "Moving semantics broken");
1528     m->destroy();
1529 
1530     MyCapturedException mce;
1531     MyCapturedException *mmce = mce.move();
1532     ASSERT( MyCapturedException::m_refCount == 2, NULL );
1533     DeleteTbbException(mmce);
1534     ASSERT( MyCapturedException::m_refCount == 1, NULL );
1535 }
1536 
1537 #endif /* TBB_USE_EXCEPTIONS */
1538 
1539 /** If min and max thread numbers specified on the command line are different,
1540     the test is run only for 2 sizes of the thread pool (MinThread and MaxThread)
1541     to be able to test the high and low contention modes while keeping the test reasonably fast **/
TestMain()1542 int TestMain () {
1543     if(tbb::task_scheduler_init::default_num_threads() == 1) {
1544         REPORT("Known issue: tests require multiple hardware threads\n");
1545         return Harness::Skipped;
1546     }
1547     REMARK ("Using %s\n", TBB_USE_CAPTURED_EXCEPTION ? "tbb:captured_exception" : "exact exception propagation");
1548     MinThread = min(tbb::task_scheduler_init::default_num_threads(), max(2, MinThread));
1549     MaxThread = max(MinThread, min(tbb::task_scheduler_init::default_num_threads(), MaxThread));
1550     ASSERT (FLAT_RANGE >= FLAT_GRAIN * MaxThread, "Fix defines");
1551     int step = max((MaxThread - MinThread + 1)/2, 1);
1552     for ( g_NumThreads = MinThread; g_NumThreads <= MaxThread; g_NumThreads += step ) {
1553         REMARK ("Number of threads %d\n", g_NumThreads);
1554         // Execute in all the possible modes
1555         for ( size_t j = 0; j < 4; ++j ) {
1556             g_ExceptionInMaster = (j & 1) != 0;
1557             g_SolitaryException = (j & 2) != 0;
1558             REMARK("g_ExceptionInMaster==%s, g_SolitaryException==%s\n", g_ExceptionInMaster?"T":"F", g_SolitaryException?"T":"F");
1559             RunParForAndReduceTests();
1560             RunParDoTests();
1561             RunPipelineTests();
1562         }
1563     }
1564 #if TBB_USE_EXCEPTIONS
1565     TestTbbExceptionAPI();
1566 #endif
1567 #if __TBB_THROW_ACROSS_MODULE_BOUNDARY_BROKEN
1568     REPORT("Known issue: exception handling tests are skipped.\n");
1569 #endif
1570     return Harness::Done;
1571 }
1572 
1573 #else /* !__TBB_TASK_GROUP_CONTEXT */
1574 
TestMain()1575 int TestMain () {
1576     return Harness::Skipped;
1577 }
1578 
1579 #endif /* !__TBB_TASK_GROUP_CONTEXT */
1580