1 /*
2 Copyright (c) 2005-2020 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #include "tbb/tbb_stddef.h"
18 #include "tbb/pipeline.h"
19 #include "tbb/spin_mutex.h"
20 #include "tbb/atomic.h"
21 #include <cstdlib>
22 #include <cstdio>
23 #include "harness.h"
24
25 // In the test, variables related to token counting are declared
26 // as unsigned long to match definition of tbb::internal::Token.
27
28 struct Buffer {
29 //! Indicates that the buffer is not used.
30 static const unsigned long unused = ~0ul;
31 unsigned long id;
32 //! True if Buffer is in use.
33 bool is_busy;
34 unsigned long sequence_number;
BufferBuffer35 Buffer() : id(unused), is_busy(false), sequence_number(unused) {}
36 };
37
38 class waiting_probe {
39 size_t check_counter;
40 public:
waiting_probe()41 waiting_probe() : check_counter(0) {}
required()42 bool required( ) {
43 ++check_counter;
44 return !((check_counter+1)&size_t(0x7FFF));
45 }
46 void probe( ); // defined below
47 };
48
49 static const unsigned MaxStreamSize = 8000;
50 static const unsigned MaxStreamItemsPerThread = 1000;
51 //! Maximum number of filters allowed
52 static const unsigned MaxFilters = 5;
53 static unsigned StreamSize;
54 static const unsigned MaxBuffer = 8;
55 static bool Done[MaxFilters][MaxStreamSize];
56 static waiting_probe WaitTest;
57 static unsigned out_of_order_count;
58
59 #include "harness_concurrency_tracker.h"
60
61 class BaseFilter: public tbb::filter {
62 bool* const my_done;
63 const bool my_is_last;
64 bool my_is_running;
65 public:
66 tbb::atomic<tbb::internal::Token> current_token;
BaseFilter(tbb::filter::mode type,bool done[],bool is_last)67 BaseFilter( tbb::filter::mode type, bool done[], bool is_last ) :
68 filter(type),
69 my_done(done),
70 my_is_last(is_last),
71 my_is_running(false),
72 current_token()
73 {}
get_buffer(void * item)74 virtual Buffer* get_buffer( void* item ) {
75 current_token++;
76 return static_cast<Buffer*>(item);
77 }
operator ()(void * item)78 void* operator()( void* item ) __TBB_override {
79 Harness::ConcurrencyTracker ct;
80 if( is_serial() )
81 ASSERT( !my_is_running, "premature entry to serial stage" );
82 my_is_running = true;
83 Buffer* b = get_buffer(item);
84 if( b ) {
85 if( is_ordered() ) {
86 if( b->sequence_number == Buffer::unused )
87 b->sequence_number = current_token-1;
88 else
89 ASSERT( b->sequence_number==current_token-1, "item arrived out of order" );
90 } else if( is_serial() ) {
91 if( b->sequence_number != current_token-1 && b->sequence_number != Buffer::unused )
92 out_of_order_count++;
93 }
94 ASSERT( b->id < StreamSize, NULL );
95 ASSERT( !my_done[b->id], "duplicate processing of token?" );
96 ASSERT( b->is_busy, NULL );
97 my_done[b->id] = true;
98 if( my_is_last ) {
99 b->id = Buffer::unused;
100 b->sequence_number = Buffer::unused;
101 __TBB_store_with_release(b->is_busy, false);
102 }
103 }
104 my_is_running = false;
105 return b;
106 }
107 };
108
109 class InputFilter: public BaseFilter {
110 tbb::spin_mutex input_lock;
111 Buffer buffer[MaxBuffer];
112 const tbb::internal::Token my_number_of_tokens;
113 public:
InputFilter(tbb::filter::mode type,tbb::internal::Token ntokens,bool done[],bool is_last)114 InputFilter( tbb::filter::mode type, tbb::internal::Token ntokens, bool done[], bool is_last ) :
115 BaseFilter(type, done, is_last),
116 my_number_of_tokens(ntokens)
117 {}
get_buffer(void *)118 Buffer* get_buffer( void* ) __TBB_override {
119 unsigned long next_input;
120 unsigned free_buffer = 0;
121 { // lock protected scope
122 tbb::spin_mutex::scoped_lock lock(input_lock);
123 if( current_token>=StreamSize )
124 return NULL;
125 next_input = current_token++;
126 // once in a while, emulate waiting for input; this only makes sense for serial input
127 if( is_serial() && WaitTest.required() )
128 WaitTest.probe( );
129 while( free_buffer<MaxBuffer )
130 if( __TBB_load_with_acquire(buffer[free_buffer].is_busy) )
131 ++free_buffer;
132 else {
133 buffer[free_buffer].is_busy = true;
134 break;
135 }
136 }
137 ASSERT( free_buffer<my_number_of_tokens, "premature reuse of buffer" );
138 Buffer* b = &buffer[free_buffer];
139 ASSERT( &buffer[0] <= b, NULL );
140 ASSERT( b <= &buffer[MaxBuffer-1], NULL );
141 ASSERT( b->id == Buffer::unused, NULL);
142 b->id = next_input;
143 ASSERT( b->sequence_number == Buffer::unused, NULL);
144 return b;
145 }
146 };
147
148 //! The struct below repeats layout of tbb::pipeline.
149 struct hacked_pipeline {
150 tbb::filter* filter_list;
151 tbb::filter* filter_end;
152 tbb::empty_task* end_counter;
153 tbb::atomic<tbb::internal::Token> input_tokens;
154 tbb::atomic<tbb::internal::Token> token_counter;
155 bool end_of_input;
156 bool has_thread_bound_filters;
157
158 virtual ~hacked_pipeline();
159 };
160
161 //! The struct below repeats layout of tbb::internal::input_buffer.
162 struct hacked_input_buffer {
163 void* array; // This should be changed to task_info* if ever used
164 void* my_sem; // This should be changed to semaphore* if ever used
165 tbb::internal::Token array_size;
166 tbb::internal::Token low_token;
167 tbb::spin_mutex array_mutex;
168 tbb::internal::Token high_token;
169 bool is_ordered;
170 bool is_bound;
171 };
172
173 //! The struct below repeats layout of tbb::filter.
174 struct hacked_filter {
175 tbb::filter* next_filter_in_pipeline;
176 hacked_input_buffer* my_input_buffer;
177 unsigned char my_filter_mode;
178 tbb::filter* prev_filter_in_pipeline;
179 tbb::pipeline* my_pipeline;
180 tbb::filter* next_segment;
181
182 virtual ~hacked_filter();
183 };
184
185 bool do_hacking_tests = true;
186 const tbb::internal::Token tokens_before_wraparound = 0xF;
187
TestTrivialPipeline(unsigned nthread,unsigned number_of_filters)188 void TestTrivialPipeline( unsigned nthread, unsigned number_of_filters ) {
189 // There are 3 filter types: parallel, serial_in_order and serial_out_of_order
190 static const tbb::filter::mode filter_table[] = { tbb::filter::parallel, tbb::filter::serial_in_order, tbb::filter::serial_out_of_order};
191 const unsigned number_of_filter_types = sizeof(filter_table)/sizeof(filter_table[0]);
192 REMARK( "testing with %lu threads and %lu filters\n", nthread, number_of_filters );
193 ASSERT( number_of_filters<=MaxFilters, "too many filters" );
194 ASSERT( sizeof(hacked_pipeline) == sizeof(tbb::pipeline), "layout changed for tbb::pipeline?" );
195 ASSERT( sizeof(hacked_filter) == sizeof(tbb::filter), "layout changed for tbb::filter?" );
196 tbb::internal::Token ntokens = nthread<MaxBuffer ? nthread : MaxBuffer;
197 // Count maximum iterations number
198 unsigned limit = 1;
199 for( unsigned i=0; i<number_of_filters; ++i)
200 limit *= number_of_filter_types;
201 // Iterate over possible filter sequences
202 for( unsigned numeral=0; numeral<limit; ++numeral ) {
203 // Build pipeline
204 tbb::pipeline pipeline;
205 if( do_hacking_tests ) {
206 // A private member of pipeline is hacked there for sake of testing wrap-around immunity.
207 tbb::internal::punned_cast<hacked_pipeline*>(&pipeline)->token_counter = ~tokens_before_wraparound;
208 }
209 tbb::filter* filter[MaxFilters];
210 unsigned temp = numeral;
211 // parallelism_limit is the upper bound on the possible parallelism
212 unsigned parallelism_limit = 0;
213 for( unsigned i=0; i<number_of_filters; ++i, temp/=number_of_filter_types ) {
214 tbb::filter::mode filter_type = filter_table[temp%number_of_filter_types];
215 const bool is_last = i==number_of_filters-1;
216 if( i==0 )
217 filter[i] = new InputFilter(filter_type,ntokens,Done[i],is_last);
218 else
219 filter[i] = new BaseFilter(filter_type,Done[i],is_last);
220 pipeline.add_filter(*filter[i]);
221 // The ordered buffer of serial filters is hacked as well.
222 if ( filter[i]->is_serial() ) {
223 if( do_hacking_tests ) {
224 ((hacked_filter*)(void*)filter[i])->my_input_buffer->low_token = ~tokens_before_wraparound;
225 ((hacked_filter*)(void*)filter[i])->my_input_buffer->high_token = ~tokens_before_wraparound;
226 }
227 parallelism_limit += 1;
228 } else {
229 parallelism_limit = nthread;
230 }
231 }
232 // Account for clipping of parallelism.
233 if( parallelism_limit>nthread )
234 parallelism_limit = nthread;
235 if( parallelism_limit>ntokens )
236 parallelism_limit = (unsigned)ntokens;
237 Harness::ConcurrencyTracker::Reset();
238 unsigned streamSizeLimit = min( MaxStreamSize, nthread * MaxStreamItemsPerThread );
239 for( StreamSize=0; StreamSize<=streamSizeLimit; ) {
240 memset( Done, 0, sizeof(Done) );
241 for( unsigned i=0; i<number_of_filters; ++i ) {
242 static_cast<BaseFilter*>(filter[i])->current_token=0;
243 }
244 pipeline.run( ntokens );
245 ASSERT( !Harness::ConcurrencyTracker::InstantParallelism(), "filter still running?" );
246 for( unsigned i=0; i<number_of_filters; ++i )
247 ASSERT( static_cast<BaseFilter*>(filter[i])->current_token==StreamSize, NULL );
248 for( unsigned i=0; i<MaxFilters; ++i )
249 for( unsigned j=0; j<StreamSize; ++j ) {
250 ASSERT( Done[i][j]==(i<number_of_filters), NULL );
251 }
252 if( StreamSize < min(nthread*8, 32u) ) {
253 ++StreamSize;
254 } else {
255 StreamSize = StreamSize*8/3;
256 }
257 }
258 if( Harness::ConcurrencyTracker::PeakParallelism() < parallelism_limit )
259 REMARK( "nthread=%lu ntokens=%lu MaxParallelism=%lu parallelism_limit=%lu\n",
260 nthread, ntokens, Harness::ConcurrencyTracker::PeakParallelism(), parallelism_limit );
261 for( unsigned i=0; i < number_of_filters; ++i ) {
262 delete filter[i];
263 filter[i] = NULL;
264 }
265 pipeline.clear();
266 }
267 }
268
269 #include "harness_cpu.h"
270
271 static int nthread; // knowing number of threads is necessary to call TestCPUUserTime
272
probe()273 void waiting_probe::probe( ) {
274 if( nthread==1 ) return;
275 REMARK("emulating wait for input\n");
276 // Test that threads sleep while no work.
277 // The master doesn't sleep so there could be 2 active threads if a worker is waiting for input
278 TestCPUUserTime(nthread, 2);
279 }
280
281 #include "tbb/task_scheduler_init.h"
282
TestMain()283 int TestMain () {
284 out_of_order_count = 0;
285 if( MinThread<1 ) {
286 REPORT("must have at least one thread");
287 exit(1);
288 }
289 if( tbb::TBB_runtime_interface_version()>TBB_INTERFACE_VERSION) {
290 REMARK("Warning: implementation dependent tests disabled\n");
291 do_hacking_tests = false;
292 }
293
294 // Test with varying number of threads.
295 for( nthread=MinThread; nthread<=MaxThread; ++nthread ) {
296 // Initialize TBB task scheduler
297 tbb::task_scheduler_init init(nthread);
298
299 // Test pipelines with n filters
300 for( unsigned n=0; n<=MaxFilters; ++n )
301 TestTrivialPipeline(nthread,n);
302
303 // Test that all workers sleep when no work
304 TestCPUUserTime(nthread);
305 }
306 if( !out_of_order_count )
307 REPORT("Warning: out of order serial filter received tokens in order\n");
308 return Harness::Done;
309 }
310