1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "tbb/tbb_stddef.h"
18 #include "tbb/pipeline.h"
19 #include "tbb/spin_mutex.h"
20 #include "tbb/atomic.h"
21 #include <cstdlib>
22 #include <cstdio>
23 #include "harness.h"
24 
25 // In the test, variables related to token counting are declared
26 // as unsigned long to match definition of tbb::internal::Token.
27 
28 struct Buffer {
29     //! Indicates that the buffer is not used.
30     static const unsigned long unused = ~0ul;
31     unsigned long id;
32     //! True if Buffer is in use.
33     bool is_busy;
34     unsigned long sequence_number;
BufferBuffer35     Buffer() : id(unused), is_busy(false), sequence_number(unused) {}
36 };
37 
38 class waiting_probe {
39     size_t check_counter;
40 public:
waiting_probe()41     waiting_probe() : check_counter(0) {}
required()42     bool required( ) {
43         ++check_counter;
44         return !((check_counter+1)&size_t(0x7FFF));
45     }
46     void probe( ); // defined below
47 };
48 
49 static const unsigned MaxStreamSize = 8000;
50 static const unsigned MaxStreamItemsPerThread = 1000;
51 //! Maximum number of filters allowed
52 static const unsigned MaxFilters = 5;
53 static unsigned StreamSize;
54 static const unsigned MaxBuffer = 8;
55 static bool Done[MaxFilters][MaxStreamSize];
56 static waiting_probe WaitTest;
57 static unsigned out_of_order_count;
58 
59 #include "harness_concurrency_tracker.h"
60 
61 class BaseFilter: public tbb::filter {
62     bool* const my_done;
63     const bool my_is_last;
64     bool my_is_running;
65 public:
66     tbb::atomic<tbb::internal::Token> current_token;
BaseFilter(tbb::filter::mode type,bool done[],bool is_last)67     BaseFilter( tbb::filter::mode type, bool done[], bool is_last ) :
68         filter(type),
69         my_done(done),
70         my_is_last(is_last),
71         my_is_running(false),
72         current_token()
73     {}
get_buffer(void * item)74     virtual Buffer* get_buffer( void* item ) {
75         current_token++;
76         return static_cast<Buffer*>(item);
77     }
operator ()(void * item)78     void* operator()( void* item ) __TBB_override {
79         Harness::ConcurrencyTracker ct;
80         if( is_serial() )
81             ASSERT( !my_is_running, "premature entry to serial stage" );
82         my_is_running = true;
83         Buffer* b = get_buffer(item);
84         if( b ) {
85             if( is_ordered() ) {
86                 if( b->sequence_number == Buffer::unused )
87                     b->sequence_number = current_token-1;
88                 else
89                     ASSERT( b->sequence_number==current_token-1, "item arrived out of order" );
90             } else if( is_serial() ) {
91                 if( b->sequence_number != current_token-1 && b->sequence_number != Buffer::unused )
92                     out_of_order_count++;
93             }
94             ASSERT( b->id < StreamSize, NULL );
95             ASSERT( !my_done[b->id], "duplicate processing of token?" );
96             ASSERT( b->is_busy, NULL );
97             my_done[b->id] = true;
98             if( my_is_last ) {
99                 b->id = Buffer::unused;
100                 b->sequence_number = Buffer::unused;
101                 __TBB_store_with_release(b->is_busy, false);
102             }
103         }
104         my_is_running = false;
105         return b;
106     }
107 };
108 
109 class InputFilter: public BaseFilter {
110     tbb::spin_mutex input_lock;
111     Buffer buffer[MaxBuffer];
112     const tbb::internal::Token my_number_of_tokens;
113 public:
InputFilter(tbb::filter::mode type,tbb::internal::Token ntokens,bool done[],bool is_last)114     InputFilter( tbb::filter::mode type, tbb::internal::Token ntokens, bool done[], bool is_last ) :
115         BaseFilter(type, done, is_last),
116         my_number_of_tokens(ntokens)
117     {}
get_buffer(void *)118     Buffer* get_buffer( void* ) __TBB_override {
119         unsigned long next_input;
120         unsigned free_buffer = 0;
121         { // lock protected scope
122             tbb::spin_mutex::scoped_lock lock(input_lock);
123             if( current_token>=StreamSize )
124                 return NULL;
125             next_input = current_token++;
126             // once in a while, emulate waiting for input; this only makes sense for serial input
127             if( is_serial() && WaitTest.required() )
128                 WaitTest.probe( );
129             while( free_buffer<MaxBuffer )
130                 if( __TBB_load_with_acquire(buffer[free_buffer].is_busy) )
131                     ++free_buffer;
132                 else {
133                     buffer[free_buffer].is_busy = true;
134                     break;
135                 }
136         }
137         ASSERT( free_buffer<my_number_of_tokens, "premature reuse of buffer" );
138         Buffer* b = &buffer[free_buffer];
139         ASSERT( &buffer[0] <= b, NULL );
140         ASSERT( b <= &buffer[MaxBuffer-1], NULL );
141         ASSERT( b->id == Buffer::unused, NULL);
142         b->id = next_input;
143         ASSERT( b->sequence_number == Buffer::unused, NULL);
144         return b;
145     }
146 };
147 
148 //! The struct below repeats layout of tbb::pipeline.
149 struct hacked_pipeline {
150     tbb::filter* filter_list;
151     tbb::filter* filter_end;
152     tbb::empty_task* end_counter;
153     tbb::atomic<tbb::internal::Token> input_tokens;
154     tbb::atomic<tbb::internal::Token> token_counter;
155     bool end_of_input;
156     bool has_thread_bound_filters;
157 
158     virtual ~hacked_pipeline();
159 };
160 
161 //! The struct below repeats layout of tbb::internal::input_buffer.
162 struct hacked_input_buffer {
163     void* array; // This should be changed to task_info* if ever used
164     void* my_sem; // This should be changed to semaphore* if ever used
165     tbb::internal::Token array_size;
166     tbb::internal::Token low_token;
167     tbb::spin_mutex array_mutex;
168     tbb::internal::Token high_token;
169     bool is_ordered;
170     bool is_bound;
171 };
172 
173 //! The struct below repeats layout of tbb::filter.
174 struct hacked_filter {
175     tbb::filter* next_filter_in_pipeline;
176     hacked_input_buffer* my_input_buffer;
177     unsigned char my_filter_mode;
178     tbb::filter* prev_filter_in_pipeline;
179     tbb::pipeline* my_pipeline;
180     tbb::filter* next_segment;
181 
182     virtual ~hacked_filter();
183 };
184 
185 bool do_hacking_tests = true;
186 const tbb::internal::Token tokens_before_wraparound = 0xF;
187 
TestTrivialPipeline(unsigned nthread,unsigned number_of_filters)188 void TestTrivialPipeline( unsigned nthread, unsigned number_of_filters ) {
189     // There are 3 filter types: parallel, serial_in_order and serial_out_of_order
190     static const tbb::filter::mode filter_table[] = { tbb::filter::parallel, tbb::filter::serial_in_order, tbb::filter::serial_out_of_order};
191     const unsigned number_of_filter_types = sizeof(filter_table)/sizeof(filter_table[0]);
192     REMARK( "testing with %lu threads and %lu filters\n", nthread, number_of_filters );
193     ASSERT( number_of_filters<=MaxFilters, "too many filters" );
194     ASSERT( sizeof(hacked_pipeline) == sizeof(tbb::pipeline), "layout changed for tbb::pipeline?" );
195     ASSERT( sizeof(hacked_filter) == sizeof(tbb::filter), "layout changed for tbb::filter?" );
196     tbb::internal::Token ntokens = nthread<MaxBuffer ? nthread : MaxBuffer;
197     // Count maximum iterations number
198     unsigned limit = 1;
199     for( unsigned i=0; i<number_of_filters; ++i)
200         limit *= number_of_filter_types;
201     // Iterate over possible filter sequences
202     for( unsigned numeral=0; numeral<limit; ++numeral ) {
203         // Build pipeline
204         tbb::pipeline pipeline;
205         if( do_hacking_tests ) {
206             // A private member of pipeline is hacked there for sake of testing wrap-around immunity.
207             tbb::internal::punned_cast<hacked_pipeline*>(&pipeline)->token_counter = ~tokens_before_wraparound;
208         }
209         tbb::filter* filter[MaxFilters];
210         unsigned temp = numeral;
211         // parallelism_limit is the upper bound on the possible parallelism
212         unsigned parallelism_limit = 0;
213         for( unsigned i=0; i<number_of_filters; ++i, temp/=number_of_filter_types ) {
214             tbb::filter::mode filter_type = filter_table[temp%number_of_filter_types];
215             const bool is_last = i==number_of_filters-1;
216             if( i==0 )
217                 filter[i] = new InputFilter(filter_type,ntokens,Done[i],is_last);
218             else
219                 filter[i] = new BaseFilter(filter_type,Done[i],is_last);
220             pipeline.add_filter(*filter[i]);
221             // The ordered buffer of serial filters is hacked as well.
222             if ( filter[i]->is_serial() ) {
223                 if( do_hacking_tests ) {
224                     ((hacked_filter*)(void*)filter[i])->my_input_buffer->low_token = ~tokens_before_wraparound;
225                     ((hacked_filter*)(void*)filter[i])->my_input_buffer->high_token = ~tokens_before_wraparound;
226                 }
227                 parallelism_limit += 1;
228             } else {
229                 parallelism_limit = nthread;
230             }
231         }
232         // Account for clipping of parallelism.
233         if( parallelism_limit>nthread )
234             parallelism_limit = nthread;
235         if( parallelism_limit>ntokens )
236             parallelism_limit = (unsigned)ntokens;
237         Harness::ConcurrencyTracker::Reset();
238         unsigned streamSizeLimit = min( MaxStreamSize, nthread * MaxStreamItemsPerThread );
239         for( StreamSize=0; StreamSize<=streamSizeLimit; ) {
240             memset( Done, 0, sizeof(Done) );
241             for( unsigned i=0; i<number_of_filters; ++i ) {
242                 static_cast<BaseFilter*>(filter[i])->current_token=0;
243             }
244             pipeline.run( ntokens );
245             ASSERT( !Harness::ConcurrencyTracker::InstantParallelism(), "filter still running?" );
246             for( unsigned i=0; i<number_of_filters; ++i )
247                 ASSERT( static_cast<BaseFilter*>(filter[i])->current_token==StreamSize, NULL );
248             for( unsigned i=0; i<MaxFilters; ++i )
249                 for( unsigned j=0; j<StreamSize; ++j ) {
250                     ASSERT( Done[i][j]==(i<number_of_filters), NULL );
251                 }
252             if( StreamSize < min(nthread*8, 32u) ) {
253                 ++StreamSize;
254             } else {
255                 StreamSize = StreamSize*8/3;
256             }
257         }
258         if( Harness::ConcurrencyTracker::PeakParallelism() < parallelism_limit )
259             REMARK( "nthread=%lu ntokens=%lu MaxParallelism=%lu parallelism_limit=%lu\n",
260                 nthread, ntokens, Harness::ConcurrencyTracker::PeakParallelism(), parallelism_limit );
261         for( unsigned i=0; i < number_of_filters; ++i ) {
262             delete filter[i];
263             filter[i] = NULL;
264         }
265         pipeline.clear();
266     }
267 }
268 
269 #include "harness_cpu.h"
270 
271 static int nthread; // knowing number of threads is necessary to call TestCPUUserTime
272 
probe()273 void waiting_probe::probe( ) {
274     if( nthread==1 ) return;
275     REMARK("emulating wait for input\n");
276     // Test that threads sleep while no work.
277     // The master doesn't sleep so there could be 2 active threads if a worker is waiting for input
278     TestCPUUserTime(nthread, 2);
279 }
280 
281 #include "tbb/task_scheduler_init.h"
282 
TestMain()283 int TestMain () {
284     out_of_order_count = 0;
285     if( MinThread<1 ) {
286         REPORT("must have at least one thread");
287         exit(1);
288     }
289     if( tbb::TBB_runtime_interface_version()>TBB_INTERFACE_VERSION) {
290         REMARK("Warning: implementation dependent tests disabled\n");
291         do_hacking_tests = false;
292     }
293 
294     // Test with varying number of threads.
295     for( nthread=MinThread; nthread<=MaxThread; ++nthread ) {
296         // Initialize TBB task scheduler
297         tbb::task_scheduler_init init(nthread);
298 
299         // Test pipelines with n filters
300         for( unsigned n=0; n<=MaxFilters; ++n )
301             TestTrivialPipeline(nthread,n);
302 
303         // Test that all workers sleep when no work
304         TestCPUUserTime(nthread);
305     }
306     if( !out_of_order_count )
307         REPORT("Warning: out of order serial filter received tokens in order\n");
308     return Harness::Done;
309 }
310