1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "tbb/parallel_scan.h"
18 #include "tbb/blocked_range.h"
19 #include "harness_assert.h"
20 #include <vector>
21 
22 typedef tbb::blocked_range<long> Range;
23 
24 static volatile bool ScanIsRunning = false;
25 
26 //! Sum of 0..i with wrap around on overflow.
TriangularSum(int i)27 inline int TriangularSum( int i ) {
28     return i&1 ? ((i>>1)+1)*i : (i>>1)*(i+1);
29 }
30 
31 #include "harness.h"
32 
33 //! Verify that sum is init plus sum of integers in closed interval [0..finish_index].
34 /** line should be the source line of the caller */
VerifySum(int init,long finish_index,int sum,int line)35 void VerifySum( int init, long finish_index, int sum, int line ) {
36     int expected = init + TriangularSum(finish_index);
37     if (expected != sum) {
38         REPORT("line %d: sum[0..%ld] should be = %d, but was computed as %d\n",
39             line, finish_index, expected, sum);
40         abort();
41     }
42 }
43 
44 const int MAXN = 2000;
45 
46 enum AddendFlag {
47     UNUSED=0,
48     USED_NONFINAL=1,
49     USED_FINAL=2
50 };
51 
52 //! Array recording how each addend was used.
53 /** 'unsigned char' instead of AddendFlag for sake of compactness. */
54 static unsigned char AddendHistory[MAXN];
55 
56 //! Set to 1 for debugging output
57 #define PRINT_DEBUG 0
58 
59 #include "tbb/atomic.h"
60 #if PRINT_DEBUG
61 #include <stdio.h>
62 #include "harness_report.h"
63 tbb::atomic<long> NextBodyId;
64 #endif /* PRINT_DEBUG */
65 
66 struct BodyId {
67 #if PRINT_DEBUG
68     const int id;
BodyIdBodyId69     BodyId() : id(NextBodyId++) {}
70 #endif /* PRINT_DEBUG */
71 };
72 
73 tbb::atomic<long> NumberOfLiveStorage;
74 
Snooze(bool scan_should_be_running)75 static void Snooze( bool scan_should_be_running ) {
76     ASSERT( ScanIsRunning==scan_should_be_running, NULL );
77 }
78 
79 template<typename T>
80 struct Storage {
81     T my_total;
82     Range my_range;
StorageStorage83     Storage(T init) :
84         my_total(init), my_range(-1, -1, 1) {
85         ++NumberOfLiveStorage;
86     }
~StorageStorage87     ~Storage() {
88         --NumberOfLiveStorage;
89     }
StorageStorage90     Storage(const Storage& strg) :
91         my_total(strg.my_total), my_range(strg.my_range) {
92         ++NumberOfLiveStorage;
93     }
operator =Storage94     Storage & operator=(const Storage& strg) {
95         my_total = strg.my_total;
96         my_range = strg.my_range;
97         return *this;
98     }
99 };
100 
101 template<typename T>
JoinStorages(const Storage<T> & left,Storage<T> & right)102 void JoinStorages(const Storage<T>& left, Storage<T>& right) {
103     Snooze(true);
104     ASSERT(ScanIsRunning, NULL);
105     ASSERT(left.my_range.end() == right.my_range.begin(), NULL);
106     right.my_total += left.my_total;
107     right.my_range = Range(left.my_range.begin(), right.my_range.end(), 1);
108     ASSERT(ScanIsRunning, NULL);
109     Snooze(true);
110     ASSERT(ScanIsRunning, NULL);
111 }
112 
113 template<typename T>
Scan(const Range & r,bool is_final,Storage<T> & storage,std::vector<T> & sum,const std::vector<T> & addend)114 void Scan(const Range & r, bool is_final, Storage<T> & storage, std::vector<T> & sum, const std::vector<T> & addend) {
115     ASSERT(!is_final || (storage.my_range.begin() == 0 && storage.my_range.end() == r.begin()) || (storage.my_range.empty() && r.begin() == 0), NULL);
116     for (long i = r.begin(); i < r.end(); ++i) {
117         storage.my_total += addend[i];
118         if (is_final) {
119             ASSERT(AddendHistory[i] < USED_FINAL, "addend used 'finally' twice?");
120             AddendHistory[i] |= USED_FINAL;
121             sum[i] = storage.my_total;
122             VerifySum(42, i, int(sum[i]), __LINE__);
123         }
124         else {
125             ASSERT(AddendHistory[i] == UNUSED, "addend used too many times");
126             AddendHistory[i] |= USED_NONFINAL;
127         }
128     }
129     if (storage.my_range.empty())
130         storage.my_range = r;
131     else
132         storage.my_range = Range(storage.my_range.begin(), r.end(), 1);
133     Snooze(true);
134 }
135 
136 template<typename T>
ScanWithInit(const Range & r,T init,bool is_final,Storage<T> & storage,std::vector<T> & sum,const std::vector<T> & addend)137 Storage<T> ScanWithInit(const Range & r, T init, bool is_final, Storage<T> & storage, std::vector<T> & sum, const std::vector<T> & addend) {
138     if (r.begin() == 0)
139         storage.my_total = init;
140     Scan(r, is_final, storage, sum, addend);
141     return storage;
142 }
143 
144 template<typename T>
145 class Accumulator: BodyId {
146     const  std::vector<T> &my_array;
147     std::vector<T> & my_sum;
148     Storage<T> storage;
149     enum state_type {
150         full,       // Accumulator has sufficient information for final scan,
151                     // i.e. has seen all iterations to its left.
152                     // It's either the original Accumulator provided by the user
153                     // or a Accumulator constructed by a splitting constructor *and* subsequently
154                     // subjected to a reverse_join with a full accumulator.
155 
156         partial,    // Accumulator has only enough information for pre_scan.
157                     // i.e. has not seen all iterations to its left.
158                     // It's an Accumulator created by a splitting constructor that
159                     // has not yet been subjected to a reverse_join with a full accumulator.
160 
161         summary,    // Accumulator has summary of iterations processed, but not necessarily
162                     // the information required for a final_scan or pre_scan.
163                     // It's the result of "assign".
164 
165         trash       // Accumulator with possibly no useful information.
166                     // It was the source for "assign".
167 
168     };
169     mutable state_type my_state;
170     //! Equals this while object is fully constructed, NULL otherwise.
171     /** Used to detect premature destruction and accidental bitwise copy. */
172     Accumulator* self;
173     Accumulator& operator= (const Accumulator& other);
174 public:
Accumulator(T init,const std::vector<T> & array,std::vector<T> & sum)175     Accumulator( T init, const std::vector<T> & array, std::vector<T> & sum ) :
176         my_array(array), my_sum(sum), storage(init), my_state(full)
177     {
178         // Set self as last action of constructor, to indicate that object is fully constructed.
179         self = this;
180     }
181 #if PRINT_DEBUG
print() const182     void print() const {
183         REPORT("%d [%ld..%ld)\n", id, storage.my_range.begin(), storage.my_range.end() );
184     }
185 #endif /* PRINT_DEBUG */
~Accumulator()186     ~Accumulator() {
187 #if PRINT_DEBUG
188         REPORT("%d [%ld..%ld) destroyed\n",id, storage.my_range.begin(), storage.my_range.end() );
189 #endif /* PRINT_DEBUG */
190         // Clear self as first action of destructor, to indicate that object is not fully constructed.
191         self = 0;
192     }
Accumulator(Accumulator & a,tbb::split)193     Accumulator( Accumulator& a, tbb::split ) :
194         my_array(a.my_array), my_sum(a.my_sum), storage(0), my_state(partial)
195     {
196         ASSERT(a.my_state==full || a.my_state==partial, NULL);
197 #if PRINT_DEBUG
198         REPORT("%d forked from %d\n",id,a.id);
199 #endif /* PRINT_DEBUG */
200         Snooze(true);
201         // Set self as last action of constructor, to indicate that object is fully constructed.
202         self = this;
203     }
204     template<typename Tag>
operator ()(const Range & r,Tag)205     void operator()( const Range& r, Tag /*tag*/ ) {
206         ASSERT( Tag::is_final_scan() ? my_state==full : my_state==partial, NULL );
207 #if PRINT_DEBUG
208         if(storage.my_range.empty() )
209             REPORT("%d computing %s [%ld..%ld)\n",id,Tag::is_final_scan()?"final":"lookahead",r.begin(),r.end() );
210         else
211             REPORT("%d computing %s [%ld..%ld) [%ld..%ld)\n",id,Tag::is_final_scan()?"final":"lookahead", storage.my_range.begin(), storage.my_range.end(),r.begin(),r.end());
212 #endif /* PRINT_DEBUG */
213         Scan(r, Tag::is_final_scan(), storage, my_sum, my_array);
214         ASSERT( self==this, "this Accumulator corrupted or prematurely destroyed" );
215     }
reverse_join(const Accumulator & left_body)216     void reverse_join( const Accumulator& left_body) {
217 #if PRINT_DEBUG
218         REPORT("reverse join %d [%ld..%ld) %d [%ld..%ld)\n",
219                left_body.id, left_body.storage.my_range.begin(), left_body.storage.my_range.end(),
220                id, storage.my_range.begin(), storage.my_range.end());
221 #endif /* PRINT_DEBUG */
222         const Storage<T> & left = left_body.storage;
223         Storage<T> & right = storage;
224         ASSERT(my_state==partial, NULL );
225         ASSERT(left_body.my_state==full || left_body.my_state==partial, NULL );
226 
227         JoinStorages(left, right);
228 
229         ASSERT(left_body.self==&left_body, NULL );
230         my_state = left_body.my_state;
231     }
assign(const Accumulator & other)232     void assign( const Accumulator& other ) {
233         ASSERT(other.my_state==full, NULL);
234         ASSERT(my_state==full, NULL);
235         storage.my_total = other.storage.my_total;
236         storage.my_range = other.storage.my_range;
237         ASSERT( self==this, NULL );
238         ASSERT( other.self==&other, "other Accumulator corrupted or prematurely destroyed" );
239         my_state = summary;
240         other.my_state = trash;
241     }
get_total()242     T get_total() {
243         return storage.my_total;
244     }
245 };
246 
247 #include "tbb/tick_count.h"
248 
249 template<typename T, typename Scan, typename ReverseJoin>
250 T ParallelScanFunctionalInvoker(const Range& range, T idx, const Scan& scan, const ReverseJoin& reverse_join, int mode) {
251     switch (mode%3) {
252     case 0:
253         return tbb::parallel_scan(range, idx, scan, reverse_join);
254         break;
255     case 1:
256         return tbb::parallel_scan(range, idx, scan, reverse_join, tbb::simple_partitioner());
257         break;
258     default:
259         return tbb::parallel_scan(range, idx, scan, reverse_join, tbb::auto_partitioner());
260     }
261 }
262 
263 template<typename T>
264 class ScanBody {
265     const std::vector<T> &my_addend;
266     std::vector<T> &my_sum;
267     const T my_init;
268     ScanBody& operator= (const ScanBody&);
269 public:
ScanBody(T init,const std::vector<T> & addend,std::vector<T> & sum)270     ScanBody(T init, const std::vector<T> &addend, std::vector<T> &sum) :my_addend(addend), my_sum(sum), my_init(init) {}
271     template<typename Tag>
operator ()(const Range & r,Storage<T> storage,Tag) const272     Storage<T> operator()(const Range& r, Storage<T> storage, Tag) const {
273         return ScanWithInit(r, my_init, Tag::is_final_scan(), storage, my_sum, my_addend);
274     }
275 };
276 
277 template<typename T>
278 class JoinBody {
279 public:
operator ()(const Storage<T> & left,Storage<T> & right) const280     Storage<T> operator()(const Storage<T>& left, Storage<T>& right) const {
281         JoinStorages(left, right);
282         return right;
283     }
284 };
285 
286 template<typename T>
ParallelScanTemplateFunctor(Range range,T init,const std::vector<T> & addend,std::vector<T> & sum,int mode)287 T ParallelScanTemplateFunctor(Range range, T init, const std::vector<T> &addend, std::vector<T> &sum, int mode) {
288     for (long i = 0; i<MAXN; ++i) {
289         AddendHistory[i] = UNUSED;
290     }
291     ScanIsRunning = true;
292     ScanBody<T> sb(init, addend, sum);
293     JoinBody<T> jb;
294     Storage<T> res = ParallelScanFunctionalInvoker(range, Storage<T>(0), sb, jb, mode);
295     ScanIsRunning = false;
296     if (range.empty())
297         res.my_total = init;
298     return res.my_total;
299 }
300 
301 #if __TBB_CPP11_LAMBDAS_PRESENT
302 template<typename T>
ParallelScanLambda(Range range,T init,const std::vector<T> & addend,std::vector<T> & sum,int mode)303 T ParallelScanLambda(Range range, T init, const std::vector<T> &addend, std::vector<T> &sum, int mode) {
304     for (long i = 0; i<MAXN; ++i) {
305         AddendHistory[i] = UNUSED;
306     }
307     ScanIsRunning = true;
308     Storage<T> res = ParallelScanFunctionalInvoker(range, Storage<T>(0),
309         [&addend, &sum, init](const Range& r, Storage<T> storage, bool is_final_scan /*tag*/) -> Storage<T> {
310             return ScanWithInit(r, init, is_final_scan, storage, sum, addend);
311         },
312         [](const Storage<T>& left, Storage<T>& right) -> Storage<T> {
313             JoinStorages(left, right);
314             return right;
315         },
316         mode);
317     ScanIsRunning = false;
318     if (range.empty())
319         res.my_total = init;
320     return res.my_total;
321 }
322 
323 #if __TBB_CPP14_GENERIC_LAMBDAS_PRESENT
324 template<typename T>
ParallelScanGenericLambda(Range range,T init,const std::vector<T> & addend,std::vector<T> & sum,int mode)325 T ParallelScanGenericLambda(Range range, T init, const std::vector<T> &addend, std::vector<T> &sum, int mode) {
326     for (long i = 0; i<MAXN; ++i) {
327         AddendHistory[i] = UNUSED;
328     }
329     ScanIsRunning = true;
330     Storage<T> res = ParallelScanFunctionalInvoker(range, Storage<T>(0),
331         [&addend, &sum, init](const Range& rng, Storage<T> storage, auto scan_tag) {
332             return ScanWithInit(rng, init, scan_tag.is_final_scan(), storage, sum, addend);
333         },
334         [](const Storage<T>& left, Storage<T>& right) {
335             JoinStorages(left, right);
336             return right;
337         },
338         mode);
339     ScanIsRunning = false;
340     if (range.empty())
341         res.my_total = init;
342     return res.my_total;
343 }
344 #endif/* GENERIC_LAMBDAS */
345 #endif/* LAMBDAS */
346 
TestAccumulator(int mode,int nthread)347 void TestAccumulator( int mode, int nthread ) {
348     typedef int T;
349     std::vector<T> addend(MAXN);
350     std::vector<T> sum(MAXN);
351     for( long n=0; n<=MAXN; ++n ) {
352         for( long i=0; i<MAXN; ++i ) {
353             addend[i] = -1;
354             sum[i] = -2;
355             AddendHistory[i] = UNUSED;
356         }
357         for( long i=0; i<n; ++i )
358             addend[i] = i;
359 
360         Accumulator<T> acc( 42, addend, sum );
361         tbb::tick_count t0 = tbb::tick_count::now();
362 #if PRINT_DEBUG
363         REPORT("--------- mode=%d range=[0..%ld)\n",mode,n);
364 #endif /* PRINT_DEBUG */
365         ScanIsRunning = true;
366 
367         switch (mode) {
368             case 0:
369                 tbb::parallel_scan( Range( 0, n, 1 ), acc );
370             break;
371             case 1:
372                 tbb::parallel_scan( Range( 0, n, 1 ), acc, tbb::simple_partitioner() );
373             break;
374             case 2:
375                 tbb::parallel_scan( Range( 0, n, 1 ), acc, tbb::auto_partitioner() );
376             break;
377         }
378 
379         ScanIsRunning = false;
380 #if PRINT_DEBUG
381         REPORT("=========\n");
382 #endif /* PRINT_DEBUG */
383         Snooze(false);
384         tbb::tick_count t1 = tbb::tick_count::now();
385         long used_once_count = 0;
386         for( long i=0; i<n; ++i )
387             if( !(AddendHistory[i]&USED_FINAL) ) {
388                 REPORT("failed to use addend[%ld] %s\n",i,AddendHistory[i]&USED_NONFINAL?"(but used nonfinal)":"");
389             }
390         for( long i=0; i<n; ++i ) {
391             VerifySum( 42, i, sum[i], __LINE__ );
392             used_once_count += AddendHistory[i]==USED_FINAL;
393         }
394         if( n )
395             ASSERT( acc.get_total()==sum[n-1], NULL );
396         else
397             ASSERT( acc.get_total()==42, NULL );
398         REMARK("time [n=%ld] = %g\tused_once%% = %g\tnthread=%d\n",n,(t1-t0).seconds(), n==0 ? 0 : 100.0*used_once_count/n,nthread);
399 
400 
401        std::vector<T> sum_tmplt(MAXN);
402         for (long i = 0; i<MAXN; ++i)
403             sum_tmplt[i] = -2;
404         T total_tmplt = ParallelScanTemplateFunctor(Range(0, n, 1), 42, addend, sum_tmplt, mode);
405 
406         ASSERT(acc.get_total() == total_tmplt, "Parallel prefix sum with lambda interface is not equal to body interface");
407         ASSERT(sum == sum_tmplt, "Parallel prefix vector with lambda interface is not equal to body interface");
408 
409 #if __TBB_CPP11_LAMBDAS_PRESENT
410         std::vector<T> sum_lambda(MAXN);
411         for (long i = 0; i<MAXN; ++i)
412             sum_lambda[i] = -2;
413         T total_lambda = ParallelScanLambda(Range(0, n, 1), 42, addend, sum_lambda, mode);
414 
415         ASSERT(acc.get_total() == total_lambda, "Parallel prefix sum with lambda interface is not equal to body interface");
416         ASSERT(sum == sum_lambda, "Parallel prefix vector with lambda interface is not equal to body interface");
417 
418 #if __TBB_CPP14_GENERIC_LAMBDAS_PRESENT
419         std::vector<T> sum_generic_lambda(MAXN);
420         for (long i = 0; i<MAXN; ++i)
421             sum_generic_lambda[i] = -2;
422         T total_generic_lambda = ParallelScanGenericLambda(Range(0, n, 1), 42, addend, sum_generic_lambda, mode);
423 
424         ASSERT(acc.get_total() == total_generic_lambda, "Parallel prefix sum with lambda (generic) interface is not equal to body interface");
425         ASSERT(sum == sum_generic_lambda, "Parallel prefix vector with lambda (generic) interface is not equal to body interface");
426 
427 #endif /* GENERIC_LAMBDAS */
428 #endif /* LAMBDAS */
429     }
430 }
431 
TestScanTags()432 static void TestScanTags() {
433     ASSERT( tbb::pre_scan_tag::is_final_scan()==false, NULL );
434     ASSERT( tbb::final_scan_tag::is_final_scan()==true, NULL );
435     ASSERT( tbb::pre_scan_tag() == false, NULL );
436     ASSERT( tbb::final_scan_tag() == true, NULL );
437 }
438 
439 #include "tbb/task_scheduler_init.h"
440 #include "harness_cpu.h"
441 
TestMain()442 int TestMain () {
443     TestScanTags();
444     for( int p=MinThread; p<=MaxThread; ++p ) {
445         for (int mode = 0; mode < 3; mode++) {
446             tbb::task_scheduler_init init(p);
447             NumberOfLiveStorage = 0;
448             TestAccumulator(mode, p);
449             // Test that all workers sleep when no work
450             TestCPUUserTime(p);
451 
452             // Checking has to be done late, because when parallel_scan makes copies of
453             // the user's "Body", the copies might be destroyed slightly after parallel_scan
454             // returns.
455             ASSERT( NumberOfLiveStorage==0, NULL );
456         }
457     }
458     return Harness::Done;
459 }
460