1 /*
2 Copyright (c) 2005-2021 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #include "oneapi/tbb/parallel_pipeline.h"
18 #include "oneapi/tbb/spin_mutex.h"
19 #include "oneapi/tbb/tbb_allocator.h"
20 #include "oneapi/tbb/cache_aligned_allocator.h"
21 #include "itt_notify.h"
22 #include "tls.h"
23 #include "oneapi/tbb/detail/_exception.h"
24 #include "oneapi/tbb/detail/_small_object_pool.h"
25
26 namespace tbb {
27 namespace detail {
28 namespace r1 {
29
30 void handle_perror(int error_code, const char* aux_info);
31
32 using Token = unsigned long;
33
34 //! A processing pipeline that applies filters to items.
35 /** @ingroup algorithms */
36 class pipeline {
37 friend void parallel_pipeline(d1::task_group_context&, std::size_t, const d1::filter_node&);
38 public:
39
40 //! Construct empty pipeline.
pipeline(d1::task_group_context & cxt,std::size_t max_token)41 pipeline(d1::task_group_context& cxt, std::size_t max_token) :
42 my_context(cxt),
43 first_filter(nullptr),
44 last_filter(nullptr),
45 input_tokens(Token(max_token)),
46 end_of_input(false),
47 wait_ctx(0) {
48 __TBB_ASSERT( max_token>0, "pipeline::run must have at least one token" );
49 }
50
51 ~pipeline();
52
53 //! Add filter to end of pipeline.
54 void add_filter( d1::base_filter& );
55
56 //! Traverse tree of fitler-node in-order and add filter for each leaf
fill_pipeline(const d1::filter_node & root)57 void fill_pipeline(const d1::filter_node& root) {
58 if( root.left && root.right ) {
59 fill_pipeline(*root.left);
60 fill_pipeline(*root.right);
61 }
62 else {
63 __TBB_ASSERT(!root.left && !root.right, "tree should be full");
64 add_filter(*root.create_filter());
65 }
66 }
67
68 private:
69 friend class stage_task;
70 friend class base_filter;
71 friend void set_end_of_input(d1::base_filter& bf);
72
73 task_group_context& my_context;
74
75 //! Pointer to first filter in the pipeline.
76 d1::base_filter* first_filter;
77
78 //! Pointer to last filter in the pipeline.
79 d1::base_filter* last_filter;
80
81 //! Number of idle tokens waiting for input stage.
82 std::atomic<Token> input_tokens;
83
84 //! False until flow_control::stop() is called.
85 std::atomic<bool> end_of_input;
86
87 d1::wait_context wait_ctx;
88 };
89
90 //! This structure is used to store task information in an input buffer
91 struct task_info {
92 void* my_object = nullptr;
93 //! Invalid unless a task went through an ordered stage.
94 Token my_token = 0;
95 //! False until my_token is set.
96 bool my_token_ready = false;
97 //! True if my_object is valid.
98 bool is_valid = false;
99 //! Set to initial state (no object, no token)
resettbb::detail::r1::task_info100 void reset() {
101 my_object = nullptr;
102 my_token = 0;
103 my_token_ready = false;
104 is_valid = false;
105 }
106 };
107
108 //! A buffer of input items for a filter.
109 /** Each item is a task_info, inserted into a position in the buffer corresponding to a Token. */
110 class input_buffer {
111 friend class base_filter;
112 friend class stage_task;
113 friend class pipeline;
114 friend void set_end_of_input(d1::base_filter& bf);
115
116 using size_type = Token;
117
118 //! Array of deferred tasks that cannot yet start executing.
119 task_info* array;
120
121 //! Size of array
122 /** Always 0 or a power of 2 */
123 size_type array_size;
124
125 //! Lowest token that can start executing.
126 /** All prior Token have already been seen. */
127 Token low_token;
128
129 //! Serializes updates.
130 spin_mutex array_mutex;
131
132 //! Resize "array".
133 /** Caller is responsible to acquiring a lock on "array_mutex". */
134 void grow( size_type minimum_size );
135
136 //! Initial size for "array"
137 /** Must be a power of 2 */
138 static const size_type initial_buffer_size = 4;
139
140 //! Used for out of order buffer, and for assigning my_token if is_ordered and my_token not already assigned
141 Token high_token;
142
143 //! True for ordered filter, false otherwise.
144 const bool is_ordered;
145
146 //! for parallel filters that accepts NULLs, thread-local flag for reaching end_of_input
147 using end_of_input_tls_t = basic_tls<std::intptr_t>;
148 end_of_input_tls_t end_of_input_tls;
149 bool end_of_input_tls_allocated; // no way to test pthread creation of TLS
150
151 public:
152 input_buffer(const input_buffer&) = delete;
153 input_buffer& operator=(const input_buffer&) = delete;
154
155 //! Construct empty buffer.
input_buffer(bool ordered)156 input_buffer( bool ordered) :
157 array(nullptr),
158 array_size(0),
159 low_token(0),
160 high_token(0),
161 is_ordered(ordered),
162 end_of_input_tls(),
163 end_of_input_tls_allocated(false) {
164 grow(initial_buffer_size);
165 __TBB_ASSERT( array, nullptr );
166 }
167
168 //! Destroy the buffer.
~input_buffer()169 ~input_buffer() {
170 __TBB_ASSERT( array, nullptr );
171 cache_aligned_allocator<task_info>().deallocate(array,array_size);
172 poison_pointer( array );
173 if( end_of_input_tls_allocated ) {
174 destroy_my_tls();
175 }
176 }
177
178 //! Define order when the first filter is serial_in_order.
get_ordered_token()179 Token get_ordered_token(){
180 return high_token++;
181 }
182
183 //! Put a token into the buffer.
184 /** If task information was placed into buffer, returns true;
185 otherwise returns false, informing the caller to create and spawn a task.
186 */
try_put_token(task_info & info)187 bool try_put_token( task_info& info ) {
188 info.is_valid = true;
189 spin_mutex::scoped_lock lock( array_mutex );
190 Token token;
191 if( is_ordered ) {
192 if( !info.my_token_ready ) {
193 info.my_token = high_token++;
194 info.my_token_ready = true;
195 }
196 token = info.my_token;
197 } else
198 token = high_token++;
199 __TBB_ASSERT( (long)(token-low_token)>=0, nullptr );
200 if( token!=low_token ) {
201 // Trying to put token that is beyond low_token.
202 // Need to wait until low_token catches up before dispatching.
203 if( token-low_token>=array_size )
204 grow( token-low_token+1 );
205 ITT_NOTIFY( sync_releasing, this );
206 array[token&(array_size-1)] = info;
207 return true;
208 }
209 return false;
210 }
211
212 //! Note that processing of a token is finished.
213 /** Fires up processing of the next token, if processing was deferred. */
214 // Uses template to avoid explicit dependency on stage_task.
215 template<typename StageTask>
try_to_spawn_task_for_next_token(StageTask & spawner,d1::execution_data & ed)216 void try_to_spawn_task_for_next_token(StageTask& spawner, d1::execution_data& ed) {
217 task_info wakee;
218 {
219 spin_mutex::scoped_lock lock( array_mutex );
220 // Wake the next task
221 task_info& item = array[++low_token & (array_size-1)];
222 ITT_NOTIFY( sync_acquired, this );
223 wakee = item;
224 item.is_valid = false;
225 }
226 if( wakee.is_valid )
227 spawner.spawn_stage_task(wakee, ed);
228 }
229
230 // end_of_input signal for parallel_pipeline, parallel input filters with 0 tokens allowed.
create_my_tls()231 void create_my_tls() {
232 int status = end_of_input_tls.create();
233 if(status)
234 handle_perror(status, "TLS not allocated for filter");
235 end_of_input_tls_allocated = true;
236 }
destroy_my_tls()237 void destroy_my_tls() {
238 int status = end_of_input_tls.destroy();
239 if(status)
240 handle_perror(status, "Failed to destroy filter TLS");
241 }
my_tls_end_of_input()242 bool my_tls_end_of_input() {
243 return end_of_input_tls.get() != 0;
244 }
set_my_tls_end_of_input()245 void set_my_tls_end_of_input() {
246 end_of_input_tls.set(1);
247 }
248 };
249
grow(size_type minimum_size)250 void input_buffer::grow( size_type minimum_size ) {
251 size_type old_size = array_size;
252 size_type new_size = old_size ? 2*old_size : initial_buffer_size;
253 while( new_size<minimum_size )
254 new_size*=2;
255 task_info* new_array = cache_aligned_allocator<task_info>().allocate(new_size);
256 task_info* old_array = array;
257 for( size_type i=0; i<new_size; ++i )
258 new_array[i].is_valid = false;
259 Token t=low_token;
260 for( size_type i=0; i<old_size; ++i, ++t )
261 new_array[t&(new_size-1)] = old_array[t&(old_size-1)];
262 array = new_array;
263 array_size = new_size;
264 if( old_array )
265 cache_aligned_allocator<task_info>().deallocate(old_array,old_size);
266 }
267
268 class stage_task : public d1::task, public task_info {
269 private:
270 friend class pipeline;
271 pipeline& my_pipeline;
272 d1::base_filter* my_filter;
273 d1::small_object_allocator m_allocator;
274 //! True if this task has not yet read the input.
275 bool my_at_start;
276
277 //! True if this can be executed again.
278 bool execute_filter(d1::execution_data& ed);
279
280 //! Spawn task if token is available.
try_spawn_stage_task(d1::execution_data & ed)281 void try_spawn_stage_task(d1::execution_data& ed) {
282 ITT_NOTIFY( sync_releasing, &my_pipeline.input_tokens );
283 if( (my_pipeline.input_tokens.fetch_sub(1, std::memory_order_release)) > 1 ) {
284 d1::small_object_allocator alloc{};
285 r1::spawn( *alloc.new_object<stage_task>(ed, my_pipeline, alloc ), my_pipeline.my_context );
286 }
287 }
288
289 public:
290
291 //! Construct stage_task for first stage in a pipeline.
292 /** Such a stage has not read any input yet. */
stage_task(pipeline & pipeline,d1::small_object_allocator & alloc)293 stage_task(pipeline& pipeline, d1::small_object_allocator& alloc ) :
294 my_pipeline(pipeline),
295 my_filter(pipeline.first_filter),
296 m_allocator(alloc),
297 my_at_start(true)
298 {
299 task_info::reset();
300 my_pipeline.wait_ctx.reserve();
301 }
302 //! Construct stage_task for a subsequent stage in a pipeline.
stage_task(pipeline & pipeline,d1::base_filter * filter,const task_info & info,d1::small_object_allocator & alloc)303 stage_task(pipeline& pipeline, d1::base_filter* filter, const task_info& info, d1::small_object_allocator& alloc) :
304 task_info(info),
305 my_pipeline(pipeline),
306 my_filter(filter),
307 m_allocator(alloc),
308 my_at_start(false)
309 {
310 my_pipeline.wait_ctx.reserve();
311 }
312 //! Roughly equivalent to the constructor of input stage task
reset()313 void reset() {
314 task_info::reset();
315 my_filter = my_pipeline.first_filter;
316 my_at_start = true;
317 }
finalize(d1::execution_data & ed)318 void finalize(d1::execution_data& ed) {
319 m_allocator.delete_object(this, ed);
320 }
321 //! The virtual task execution method
execute(d1::execution_data & ed)322 task* execute(d1::execution_data& ed) override {
323 if(!execute_filter(ed)) {
324 finalize(ed);
325 return nullptr;
326 }
327 return this;
328 }
cancel(d1::execution_data & ed)329 task* cancel(d1::execution_data& ed) override {
330 finalize(ed);
331 return nullptr;
332 }
333
~stage_task()334 ~stage_task() {
335 if ( my_filter && my_object ) {
336 my_filter->finalize(my_object);
337 my_object = nullptr;
338 }
339 my_pipeline.wait_ctx.release();
340 }
341 //! Creates and spawns stage_task from task_info
spawn_stage_task(const task_info & info,d1::execution_data & ed)342 void spawn_stage_task(const task_info& info, d1::execution_data& ed) {
343 d1::small_object_allocator alloc{};
344 stage_task* clone = alloc.new_object<stage_task>(ed, my_pipeline, my_filter, info, alloc);
345 r1::spawn(*clone, my_pipeline.my_context);
346 }
347 };
348
execute_filter(d1::execution_data & ed)349 bool stage_task::execute_filter(d1::execution_data& ed) {
350 __TBB_ASSERT( !my_at_start || !my_object, "invalid state of task" );
351 if( my_at_start ) {
352 if( my_filter->is_serial() ) {
353 my_object = (*my_filter)(my_object);
354 if( my_object || ( my_filter->object_may_be_null() && !my_pipeline.end_of_input.load(std::memory_order_relaxed)) ) {
355 if( my_filter->is_ordered() ) {
356 my_token = my_filter->my_input_buffer->get_ordered_token();
357 my_token_ready = true;
358 }
359 if( !my_filter->next_filter_in_pipeline ) { // we're only filter in pipeline
360 reset();
361 return true;
362 } else {
363 try_spawn_stage_task(ed);
364 }
365 } else {
366 my_pipeline.end_of_input.store(true, std::memory_order_relaxed);
367 return false;
368 }
369 } else /*not is_serial*/ {
370 if ( my_pipeline.end_of_input.load(std::memory_order_relaxed) ) {
371 return false;
372 }
373
374 try_spawn_stage_task(ed);
375
376 my_object = (*my_filter)(my_object);
377 if( !my_object && (!my_filter->object_may_be_null() || my_filter->my_input_buffer->my_tls_end_of_input()) ){
378 my_pipeline.end_of_input.store(true, std::memory_order_relaxed);
379 return false;
380 }
381 }
382 my_at_start = false;
383 } else {
384 my_object = (*my_filter)(my_object);
385 if( my_filter->is_serial() )
386 my_filter->my_input_buffer->try_to_spawn_task_for_next_token(*this, ed);
387 }
388 my_filter = my_filter->next_filter_in_pipeline;
389 if( my_filter ) {
390 // There is another filter to execute.
391 if( my_filter->is_serial() ) {
392 // The next filter must execute tokens when they are available (in order for serial_in_order)
393 if( my_filter->my_input_buffer->try_put_token(*this) ){
394 my_filter = nullptr; // To prevent deleting my_object twice if exception occurs
395 return false;
396 }
397 }
398 } else {
399 // Reached end of the pipe.
400 std::size_t ntokens_avail = my_pipeline.input_tokens.fetch_add(1, std::memory_order_acquire);
401
402 if( ntokens_avail>0 // Only recycle if there is one available token
403 || my_pipeline.end_of_input.load(std::memory_order_relaxed) ) {
404 return false; // No need to recycle for new input
405 }
406 ITT_NOTIFY( sync_acquired, &my_pipeline.input_tokens );
407 // Recycle as an input stage task.
408 reset();
409 }
410 return true;
411 }
412
~pipeline()413 pipeline::~pipeline() {
414 while( first_filter ) {
415 d1::base_filter* f = first_filter;
416 if( input_buffer* b = f->my_input_buffer ) {
417 b->~input_buffer();
418 deallocate_memory(b);
419 }
420 first_filter = f->next_filter_in_pipeline;
421 f->~base_filter();
422 deallocate_memory(f);
423 }
424 }
425
add_filter(d1::base_filter & new_fitler)426 void pipeline::add_filter( d1::base_filter& new_fitler ) {
427 __TBB_ASSERT( new_fitler.next_filter_in_pipeline==d1::base_filter::not_in_pipeline(), "filter already part of pipeline?" );
428 new_fitler.my_pipeline = this;
429 if ( first_filter == nullptr )
430 first_filter = &new_fitler;
431 else
432 last_filter->next_filter_in_pipeline = &new_fitler;
433 new_fitler.next_filter_in_pipeline = nullptr;
434 last_filter = &new_fitler;
435 if( new_fitler.is_serial() ) {
436 new_fitler.my_input_buffer = new (allocate_memory(sizeof(input_buffer))) input_buffer( new_fitler.is_ordered() );
437 } else {
438 if( first_filter == &new_fitler && new_fitler.object_may_be_null() ) {
439 //TODO: buffer only needed to hold TLS; could improve
440 new_fitler.my_input_buffer = new (allocate_memory(sizeof(input_buffer))) input_buffer( /*is_ordered*/false );
441 new_fitler.my_input_buffer->create_my_tls();
442 }
443 }
444 }
445
parallel_pipeline(d1::task_group_context & cxt,std::size_t max_token,const d1::filter_node & fn)446 void __TBB_EXPORTED_FUNC parallel_pipeline(d1::task_group_context& cxt, std::size_t max_token, const d1::filter_node& fn) {
447 pipeline pipe(cxt, max_token);
448
449 pipe.fill_pipeline(fn);
450
451 d1::small_object_allocator alloc{};
452 stage_task& st = *alloc.new_object<stage_task>(pipe, alloc);
453
454 // Start execution of tasks
455 r1::execute_and_wait(st, cxt, pipe.wait_ctx, cxt);
456 }
457
set_end_of_input(d1::base_filter & bf)458 void __TBB_EXPORTED_FUNC set_end_of_input(d1::base_filter& bf) {
459 __TBB_ASSERT(bf.my_input_buffer, nullptr);
460 __TBB_ASSERT(bf.object_may_be_null(), nullptr);
461 if(bf.is_serial() ) {
462 bf.my_pipeline->end_of_input.store(true, std::memory_order_relaxed);
463 } else {
464 __TBB_ASSERT(bf.my_input_buffer->end_of_input_tls_allocated, nullptr);
465 bf.my_input_buffer->set_my_tls_end_of_input();
466 }
467 }
468
469 } // namespace r1
470 } // namespace detail
471 } // namespace tbb
472