1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "tbb/pipeline.h"
18 #include "tbb/spin_mutex.h"
19 #include "tbb/atomic.h"
20 #include "tbb/tbb_thread.h"
21 #include <cstdlib>
22 #include <cstdio>
23 #include "harness.h"
24 
25 // In the test, variables related to token counting are declared
26 // as unsigned long to match definition of tbb::internal::Token.
27 
28 //! Id of thread that first executes work on non-thread-bound stages
29 tbb::tbb_thread::id thread_id;
30 //! Zero thread id
31 tbb::tbb_thread::id id0;
32 //! True if non-thread-bound stages must be executed on one thread
33 bool is_serial_execution;
34 double sleeptime; // how long is a non-thread-bound stage to sleep?
35 
36 struct Buffer {
37     //! Indicates that the buffer is not used.
38     static const unsigned long unused = ~0ul;
39     unsigned long id;
40     //! True if Buffer is in use.
41     bool is_busy;
42     unsigned long sequence_number;
BufferBuffer43     Buffer() : id(unused), is_busy(false), sequence_number(unused) {}
44 };
45 
46 class waiting_probe {
47     size_t check_counter;
48 public:
waiting_probe()49     waiting_probe() : check_counter(0) {}
required()50     bool required( ) {
51         ++check_counter;
52         return !((check_counter+1)&size_t(0x7FFF));
53     }
54     void probe( ); // defined below
55 };
56 
57 static const unsigned StreamSize = 10;
58 //! Maximum number of filters allowed
59 static const unsigned MaxFilters = 4;
60 static const unsigned MaxBuffer = 8;
61 static bool Done[MaxFilters][StreamSize];
62 static waiting_probe WaitTest;
63 static unsigned out_of_order_count;
64 
65 #include "harness_concurrency_tracker.h"
66 
67 template<typename T>
68 class BaseFilter: public T {
69     bool* const my_done;
70     const bool my_is_last;
71     bool concurrency_observed;
72     tbb::atomic<int> running_count;
73 public:
74     tbb::atomic<tbb::internal::Token> current_token;
BaseFilter(tbb::filter::mode type,bool done[],bool is_last)75     BaseFilter( tbb::filter::mode type, bool done[], bool is_last ) :
76         T(type),
77         my_done(done),
78         my_is_last(is_last),
79         concurrency_observed(false),
80         current_token()
81     {
82         running_count = 0;
83     }
~BaseFilter()84     ~BaseFilter() {
85         if( this->is_serial() || is_serial_execution )
86             ASSERT( !concurrency_observed, "Unexpected concurrency in a [serial] filter" );
87         else if( sleeptime > 0 )
88             ASSERT( concurrency_observed, "No concurrency in a parallel filter" );
89     }
get_buffer(void * item)90     virtual Buffer* get_buffer( void* item ) {
91         current_token++;
92         return static_cast<Buffer*>(item);
93     }
operator ()(void * item)94     void* operator()( void* item ) __TBB_override {
95         // Check if work is done only on one thread when ntokens==1 or
96         // when pipeline has only one filter that is serial and non-thread-bound
97         if( is_serial_execution && !this->is_bound() ) {
98             // Get id of current thread
99             tbb::tbb_thread::id id = tbb::this_tbb_thread::get_id();
100             // At first execution, set thread_id to current thread id.
101             // Serialized execution is expected, so there should be no race.
102             if( thread_id == id0 )
103                 thread_id = id;
104             // Check if work is done on one thread
105             ASSERT( thread_id == id, "non-thread-bound stages executed on different threads when must be executed on a single one");
106         }
107         Harness::ConcurrencyTracker ct;
108         concurrency_observed = concurrency_observed || (running_count++ > 0);
109         if( this->is_serial() )
110             ASSERT( !concurrency_observed, "premature entry to serial stage" );
111 
112         Buffer* b = get_buffer(item);
113         if( b ) {
114             if(!this->is_bound() && sleeptime > 0) {
115                 if(this->is_serial()) {
116                     Harness::Sleep((int)sleeptime);
117                 } else {
118                     // early parallel tokens sleep longer
119                     int i = (int)((5 - (int)b->sequence_number) * sleeptime);
120                     if(i < (int)sleeptime) i = (int)sleeptime;
121                     Harness::Sleep(i);
122                 }
123             }
124             if( this->is_ordered() ) {
125                 if( b->sequence_number == Buffer::unused )
126                     b->sequence_number = current_token-1;
127                 else
128                     ASSERT( b->sequence_number==current_token-1, "item arrived out of order" );
129             } else if( this->is_serial() ) {
130                 if( b->sequence_number != current_token-1 && b->sequence_number != Buffer::unused )
131                     out_of_order_count++;
132             }
133             ASSERT( b->id < StreamSize, NULL );
134             ASSERT( !my_done[b->id], "duplicate processing of token?" );
135             ASSERT( b->is_busy, NULL );
136             my_done[b->id] = true;
137             if( my_is_last ) {
138                 b->id = Buffer::unused;
139                 b->sequence_number = Buffer::unused;
140                 __TBB_store_with_release(b->is_busy, false);
141             }
142         }
143         concurrency_observed = concurrency_observed || (--running_count > 0);
144         return b;
145     }
146 };
147 
148 template<typename T>
149 class InputFilter: public BaseFilter<T> {
150     tbb::spin_mutex input_lock;
151     Buffer buffer[MaxBuffer];
152     const tbb::internal::Token my_number_of_tokens;
153 public:
InputFilter(tbb::filter::mode type,tbb::internal::Token ntokens,bool done[],bool is_last)154     InputFilter( tbb::filter::mode type, tbb::internal::Token ntokens, bool done[], bool is_last ) :
155         BaseFilter<T>(type, done, is_last),
156         my_number_of_tokens(ntokens)
157     {}
get_buffer(void *)158     Buffer* get_buffer( void* ) __TBB_override {
159         unsigned long next_input;
160         unsigned free_buffer = 0;
161         { // lock protected scope
162             tbb::spin_mutex::scoped_lock lock(input_lock);
163             if( this->current_token>=StreamSize )
164                 return NULL;
165             next_input = this->current_token++;
166             // once in a while, emulate waiting for input; this only makes sense for serial input
167             if( this->is_serial() && WaitTest.required() )
168                 WaitTest.probe( );
169             while( free_buffer<MaxBuffer )
170                 if( __TBB_load_with_acquire(buffer[free_buffer].is_busy) )
171                     ++free_buffer;
172                 else {
173                     buffer[free_buffer].is_busy = true;
174                     break;
175                 }
176         }
177         ASSERT( free_buffer<my_number_of_tokens, "premature reuse of buffer" );
178         Buffer* b = &buffer[free_buffer];
179         ASSERT( &buffer[0] <= b, NULL );
180         ASSERT( b <= &buffer[MaxBuffer-1], NULL );
181         ASSERT( b->id == Buffer::unused, NULL);
182         b->id = next_input;
183         ASSERT( b->sequence_number == Buffer::unused, NULL);
184         return b;
185     }
186 };
187 
188 class process_loop {
189 public:
operator ()(tbb::thread_bound_filter * tbf)190     void operator()( tbb::thread_bound_filter* tbf ) {
191         tbb::thread_bound_filter::result_type flag;
192         do
193             flag = tbf->process_item();
194         while( flag != tbb::thread_bound_filter::end_of_stream );
195     }
196 };
197 
198 //! The struct below repeats layout of tbb::pipeline.
199 struct hacked_pipeline {
200     tbb::filter* filter_list;
201     tbb::filter* filter_end;
202     tbb::empty_task* end_counter;
203     tbb::atomic<tbb::internal::Token> input_tokens;
204     tbb::atomic<tbb::internal::Token> global_token_counter;
205     bool end_of_input;
206     bool has_thread_bound_filters;
207 
208     virtual ~hacked_pipeline();
209 };
210 
211 //! The struct below repeats layout of tbb::internal::ordered_buffer.
212 struct hacked_ordered_buffer {
213     void* array; // This should be changed to task_info* if ever used
214     tbb::internal::Token array_size;
215     tbb::internal::Token low_token;
216     tbb::spin_mutex array_mutex;
217     tbb::internal::Token high_token;
218     bool is_ordered;
219     bool is_bound;
220 };
221 
222 //! The struct below repeats layout of tbb::filter.
223 struct hacked_filter {
224     tbb::filter* next_filter_in_pipeline;
225     hacked_ordered_buffer* input_buffer;
226     unsigned char my_filter_mode;
227     tbb::filter* prev_filter_in_pipeline;
228     tbb::pipeline* my_pipeline;
229     tbb::filter* next_segment;
230 
231     virtual ~hacked_filter();
232 };
233 
234 #if _MSC_VER && !defined(__INTEL_COMPILER)
235     // Workaround for overzealous compiler warnings
236     // Suppress compiler warning about constant conditional expression
237     #pragma warning (disable: 4127)
238 #endif
239 
clear_global_state()240 void clear_global_state() {
241     Harness::ConcurrencyTracker::Reset();
242     memset( Done, 0, sizeof(Done) );
243     thread_id = id0;
244     is_serial_execution = false;
245 }
246 
247 
248 class PipelineTest {
249     // There are 3 non-thread-bound filter types: serial_in_order and serial_out_of_order, parallel
250     static const tbb::filter::mode non_tb_filters_table[3]; // = { tbb::filter::serial_in_order, tbb::filter::serial_out_of_order, tbb::filter::parallel};
251     // There are 2 thread-bound filter types: serial_in_order and serial_out_of_order
252     static const tbb::filter::mode tb_filters_table[2]; // = { tbb::filter::serial_in_order, tbb::filter::serial_out_of_order };
253 
254     static const unsigned number_of_non_tb_filter_types = sizeof(non_tb_filters_table)/sizeof(non_tb_filters_table[0]);
255     static const unsigned number_of_tb_filter_types = sizeof(tb_filters_table)/sizeof(tb_filters_table[0]);
256     static const unsigned number_of_filter_types = number_of_non_tb_filter_types + number_of_tb_filter_types;
257     // static unsigned my_nthread;
258     public:
259     static double TestOneConfiguration( unsigned numeral, unsigned nthread, unsigned number_of_filters, tbb::internal::Token ntokens);
260     static void TestTrivialPipeline( unsigned nthread, unsigned number_of_filters );
261     static void TestIdleSpinning(unsigned nthread);
262 
PrintConfiguration(unsigned numeral,unsigned nFilters)263     static void PrintConfiguration(unsigned numeral, unsigned nFilters) {
264         REMARK( "{ ");
265         for( unsigned i = 0; i < nFilters; ++i) {
266             switch( numeral % number_of_filter_types ) {
267                 case 0: REMARK("s  "); break;
268                 case 1: REMARK("B  "); break;
269                 case 2: REMARK("o  "); break;
270                 case 3: REMARK("Bo "); break;
271                 case 4: REMARK("P  "); break;
272                 default: REMARK(" ** ERROR** "); break;
273             }
274             numeral /= number_of_filter_types;
275         }
276         REMARK("}");
277     }
ContainsBoundFilter(unsigned numeral)278     static bool ContainsBoundFilter(unsigned numeral) {
279         for( ;numeral != 0; numeral /= number_of_filter_types)
280             if(numeral & 0x1) return true;
281         return false;
282     }
283 };
284 
285 const tbb::filter::mode PipelineTest::non_tb_filters_table[3] = {
286     tbb::filter::serial_in_order,       // 0
287     tbb::filter::serial_out_of_order,   // 2
288     tbb::filter::parallel               // 4
289 };
290 const tbb::filter::mode PipelineTest::tb_filters_table[2] = {
291     tbb::filter::serial_in_order,       // 1
292     tbb::filter::serial_out_of_order    // 3
293 };
294 
295 #include "harness_cpu.h"
296 
TestOneConfiguration(unsigned numeral,unsigned nthread,unsigned number_of_filters,tbb::internal::Token ntokens)297 double PipelineTest::TestOneConfiguration(unsigned numeral, unsigned nthread, unsigned number_of_filters, tbb::internal::Token ntokens)
298 {
299     // Build pipeline
300     tbb::pipeline pipeline;
301     tbb::filter* filter[MaxFilters];
302     unsigned temp = numeral;
303     // parallelism_limit is the upper bound on the possible parallelism
304     unsigned parallelism_limit = 0;
305     // number of thread-bound-filters in the current sequence
306     unsigned number_of_tb_filters = 0;
307     // ordinal numbers of thread-bound-filters in the current sequence
308     unsigned array_of_tb_filter_numbers[MaxFilters];
309     if(!ContainsBoundFilter(numeral)) return 0.0;
310     for( unsigned i=0; i<number_of_filters; ++i, temp/=number_of_filter_types ) {
311         bool is_bound = temp%number_of_filter_types&0x1;
312         tbb::filter::mode filter_type;
313         if( is_bound ) {
314             filter_type = tb_filters_table[temp%number_of_filter_types/number_of_non_tb_filter_types];
315         } else
316             filter_type = non_tb_filters_table[temp%number_of_filter_types/number_of_tb_filter_types];
317         const bool is_last = i==number_of_filters-1;
318         if( is_bound ) {
319             if( i == 0 )
320                 filter[i] = new InputFilter<tbb::thread_bound_filter>(filter_type,ntokens,Done[i],is_last);
321             else
322                 filter[i] = new BaseFilter<tbb::thread_bound_filter>(filter_type,Done[i],is_last);
323             array_of_tb_filter_numbers[number_of_tb_filters] = i;
324             number_of_tb_filters++;
325         } else {
326             if( i == 0 )
327                 filter[i] = new InputFilter<tbb::filter>(filter_type,ntokens,Done[i],is_last);
328             else
329                 filter[i] = new BaseFilter<tbb::filter>(filter_type,Done[i],is_last);
330         }
331         pipeline.add_filter(*filter[i]);
332         if ( filter[i]->is_serial() ) {
333             parallelism_limit += 1;
334         } else {
335             parallelism_limit = nthread;
336         }
337     }
338     ASSERT(number_of_tb_filters,NULL);
339     clear_global_state();
340     // Account for clipping of parallelism.
341     if( parallelism_limit>nthread )
342         parallelism_limit = nthread;
343     if( parallelism_limit>ntokens )
344         parallelism_limit = (unsigned)ntokens;
345 
346     for( unsigned i=0; i<number_of_filters; ++i ) {
347         static_cast<BaseFilter<tbb::filter>*>(filter[i])->current_token=0;
348     }
349     tbb::tbb_thread* t[MaxFilters];
350     for( unsigned j = 0; j<number_of_tb_filters; j++)
351         t[j] = new tbb::tbb_thread(process_loop(), static_cast<tbb::thread_bound_filter*>(filter[array_of_tb_filter_numbers[j]]));
352     if( ntokens == 1 || ( number_of_filters == 1 && number_of_tb_filters == 0 && filter[0]->is_serial() ))
353         is_serial_execution = true;
354     double strttime = GetCPUUserTime();
355     pipeline.run( ntokens );
356     double endtime = GetCPUUserTime();
357     for( unsigned j = 0; j<number_of_tb_filters; j++)
358         t[j]->join();
359     ASSERT( !Harness::ConcurrencyTracker::InstantParallelism(), "filter still running?" );
360     for( unsigned i=0; i<number_of_filters; ++i )
361         ASSERT( static_cast<BaseFilter<tbb::filter>*>(filter[i])->current_token==StreamSize, NULL );
362     for( unsigned i=0; i<MaxFilters; ++i )
363         for( unsigned j=0; j<StreamSize; ++j ) {
364             ASSERT( Done[i][j]==(i<number_of_filters), NULL );
365         }
366     if( Harness::ConcurrencyTracker::PeakParallelism() < parallelism_limit )
367         REMARK( "nthread=%lu ntokens=%lu MaxParallelism=%lu parallelism_limit=%lu\n",
368             nthread, ntokens, Harness::ConcurrencyTracker::PeakParallelism(), parallelism_limit );
369     for( unsigned i=0; i < number_of_filters; ++i ) {
370         delete filter[i];
371         filter[i] = NULL;
372     }
373     for( unsigned j = 0; j<number_of_tb_filters; j++)
374         delete t[j];
375     pipeline.clear();
376     return endtime - strttime;
377 } // TestOneConfiguration
378 
TestTrivialPipeline(unsigned nthread,unsigned number_of_filters)379 void PipelineTest::TestTrivialPipeline( unsigned nthread, unsigned number_of_filters ) {
380 
381     REMARK( "testing with %lu threads and %lu filters\n", nthread, number_of_filters );
382     ASSERT( number_of_filters<=MaxFilters, "too many filters" );
383     tbb::internal::Token max_tokens = nthread < MaxBuffer ? nthread : MaxBuffer;
384     // The loop has 1 iteration if max_tokens=1 and 2 iterations if max_tokens>1:
385     // one iteration for ntokens=1 and second for ntokens=max_tokens
386     // Iteration for ntokens=1 is required in each test case to check if pipeline run only on one thread
387     unsigned max_iteration = max_tokens > 1 ? 2 : 1;
388     tbb::internal::Token ntokens = 1;
389     for( unsigned iteration = 0; iteration < max_iteration; iteration++) {
390         if( iteration > 0 )
391             ntokens = max_tokens;
392         // Count maximum iterations number
393         unsigned limit = 1;
394         for( unsigned i=0; i<number_of_filters; ++i)
395             limit *= number_of_filter_types;
396         // Iterate over possible filter sequences
397         for( unsigned numeral=0; numeral<limit; ++numeral ) {
398             REMARK( "testing configuration %lu of %lu\n", numeral, limit );
399             (void)TestOneConfiguration(numeral, nthread, number_of_filters, ntokens);
400         }
401     }
402 }
403 
404 // varying times for sleep result in different user times for all pipelines.
405 // So we compare the running time of an all non-TBF pipeline with different (with
406 // luck representative) TBF configurations.
407 //
408 // We run the tests multiple times and compare the average runtimes for those cases
409 // that don't return 0 user time.  configurations that exceed the allowable extra
410 // time are reported.
TestIdleSpinning(unsigned nthread)411 void PipelineTest::TestIdleSpinning( unsigned nthread)  {
412     unsigned sample_setups[] = {
413         // in the comments below, s == serial, o == serial out-of-order,
414         // B == thread bound, Bo == thread bound out-of-order, p == parallel
415         1,   // B  s  s  s
416         5,   // s  B  s  s
417         25,  // s  s  B  s
418         125, // s  s  s  B
419         6,   // B  B  s  s
420         26,  // B  s  B  s
421         126, // B  s  s  B
422         30,  // s  B  B  s
423         130, // s  B  s  B
424         150, // s  s  B  B
425         31,  // B  B  B  s
426         131, // B  B  s  B
427         155, // s  B  B  B
428         495, // s  p  p  Bo
429         71,  // B  p  o  s
430         355, // s  B  p  o
431         95,  // s  p  Bo s
432         475, // s  s  p  Bo
433     };
434     const int nsetups = sizeof(sample_setups) / sizeof(unsigned);
435     const int ntests = 4;
436     const double bignum = 1000000000.0;
437     const double allowable_slowdown = 3.5;
438     unsigned zero_count = 0;
439 
440     REMARK( "testing idle spinning with %lu threads\n", nthread );
441     tbb::internal::Token max_tokens = nthread < MaxBuffer ? nthread : MaxBuffer;
442     for( int i=0; i<nsetups; ++i ) {
443         unsigned numeral = sample_setups[i];
444         unsigned temp = numeral;
445         unsigned nbound = 0;
446         while(temp) {
447             if((temp%number_of_filter_types)&0x01) nbound++;
448             temp /= number_of_filter_types;
449         }
450         sleeptime = 20.0;
451         double s0 = bignum;
452         double s1 = bignum;
453         int v0cnt = 0;
454         int v1cnt = 0;
455         double s0sum = 0.0;
456         double s1sum = 0.0;
457         REMARK(" TestOneConfiguration, pipeline == ");
458         PrintConfiguration(numeral, MaxFilters);
459         REMARK(", max_tokens== %d\n", (int)max_tokens);
460         for(int j = 0; j < ntests; ++j) {
461             double s1a = TestOneConfiguration(numeral, nthread, MaxFilters, max_tokens);
462             double s0a = TestOneConfiguration((unsigned)0, nthread, MaxFilters, max_tokens);
463             s1sum += s1a;
464             s0sum += s0a;
465             if(s0a > 0.0) {
466                 ++v0cnt;
467                 s0 = (s0a < s0) ? s0a : s0;
468             } else {
469                 ++zero_count;
470             }
471             if(s1a > 0.0) {
472                 ++v1cnt;
473                 s1 = (s1a < s1) ? s1a : s1;
474             } else {
475                 ++zero_count;
476             }
477         }
478         if(s0 == bignum || s1 == bignum) continue;
479         s0sum /= (double)v0cnt;
480         s1sum /= (double)v1cnt;
481         double slowdown = (s1sum-s0sum)/s0sum;
482         if(slowdown > allowable_slowdown)
483             REMARK( "with %lu threads configuration %lu has slowdown > %g (%g)\n", nthread, numeral, allowable_slowdown, slowdown );
484     }
485     REMARK("Total of %lu zero times\n", zero_count);
486 }
487 
488 static int nthread; // knowing number of threads is necessary to call TestCPUUserTime
489 
probe()490 void waiting_probe::probe( ) {
491     if( nthread==1 ) return;
492     REMARK("emulating wait for input\n");
493     // Test that threads sleep while no work.
494     // The master doesn't sleep so there could be 2 active threads if a worker is waiting for input
495     TestCPUUserTime(nthread, 2);
496 }
497 
498 #include "tbb/task_scheduler_init.h"
499 
TestMain()500 int TestMain () {
501     out_of_order_count = 0;
502     if( MinThread<1 ) {
503         REPORT("must have at least one thread");
504         exit(1);
505     }
506 
507     // Test with varying number of threads.
508     for( nthread=MinThread; nthread<=MaxThread; ++nthread ) {
509         // Initialize TBB task scheduler
510         tbb::task_scheduler_init init(nthread);
511         sleeptime = 0.0;  // msec : 0 == no_timing, > 0, each filter stage sleeps for sleeptime
512 
513         // Test pipelines with 1 and maximal number of filters
514         for( unsigned n=1; n<=MaxFilters; n*=MaxFilters ) {
515             // Thread-bound stages are serviced by user-created threads; those
516             // don't run the pipeline and don't service non-thread-bound stages
517             PipelineTest::TestTrivialPipeline(nthread,n);
518         }
519 
520         // Test that all workers sleep when no work
521         TestCPUUserTime(nthread);
522         if((unsigned)nthread >= MaxFilters)  // test works when number of threads >= number of stages
523             PipelineTest::TestIdleSpinning(nthread);
524     }
525     if( !out_of_order_count )
526         REPORT("Warning: out of order serial filter received tokens in order\n");
527     return Harness::Done;
528 }
529