1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "oneapi/tbb/parallel_pipeline.h"
18 #include "oneapi/tbb/spin_mutex.h"
19 #include "oneapi/tbb/tbb_allocator.h"
20 #include "oneapi/tbb/cache_aligned_allocator.h"
21 #include "itt_notify.h"
22 #include "tls.h"
23 #include "oneapi/tbb/detail/_exception.h"
24 #include "oneapi/tbb/detail/_small_object_pool.h"
25 
26 namespace tbb {
27 namespace detail {
28 namespace r1 {
29 
30 void handle_perror(int error_code, const char* aux_info);
31 
32 using Token = unsigned long;
33 
34 //! A processing pipeline that applies filters to items.
35 /** @ingroup algorithms */
36 class pipeline {
37     friend void parallel_pipeline(d1::task_group_context&, std::size_t, const d1::filter_node&);
38 public:
39 
40     //! Construct empty pipeline.
pipeline(d1::task_group_context & cxt,std::size_t max_token)41     pipeline(d1::task_group_context& cxt, std::size_t max_token) :
42         my_context(cxt),
43         first_filter(nullptr),
44         last_filter(nullptr),
45         input_tokens(Token(max_token)),
46         end_of_input(false),
47         wait_ctx(0) {
48             __TBB_ASSERT( max_token>0, "pipeline::run must have at least one token" );
49         }
50 
51     ~pipeline();
52 
53     //! Add filter to end of pipeline.
54     void add_filter( d1::base_filter& );
55 
56     //! Traverse tree of fitler-node in-order and add filter for each leaf
fill_pipeline(const d1::filter_node & root)57     void fill_pipeline(const d1::filter_node& root) {
58         if( root.left && root.right ) {
59             fill_pipeline(*root.left);
60             fill_pipeline(*root.right);
61         }
62         else {
63             __TBB_ASSERT(!root.left && !root.right, "tree should be full");
64             add_filter(*root.create_filter());
65         }
66     }
67 
68 private:
69     friend class stage_task;
70     friend class base_filter;
71     friend void set_end_of_input(d1::base_filter& bf);
72 
73     task_group_context& my_context;
74 
75     //! Pointer to first filter in the pipeline.
76     d1::base_filter* first_filter;
77 
78     //! Pointer to last filter in the pipeline.
79     d1::base_filter* last_filter;
80 
81     //! Number of idle tokens waiting for input stage.
82     std::atomic<Token> input_tokens;
83 
84     //! False until flow_control::stop() is called.
85     std::atomic<bool> end_of_input;
86 
87     d1::wait_context wait_ctx;
88 };
89 
90 //! This structure is used to store task information in an input buffer
91 struct task_info {
92     void* my_object = nullptr;
93     //! Invalid unless a task went through an ordered stage.
94     Token my_token = 0;
95     //! False until my_token is set.
96     bool my_token_ready  = false;
97     //! True if my_object is valid.
98     bool is_valid = false;
99     //! Set to initial state (no object, no token)
resettbb::detail::r1::task_info100     void reset() {
101         my_object = nullptr;
102         my_token = 0;
103         my_token_ready = false;
104         is_valid = false;
105     }
106 };
107 
108 //! A buffer of input items for a filter.
109 /** Each item is a task_info, inserted into a position in the buffer corresponding to a Token. */
110 class input_buffer {
111     friend class base_filter;
112     friend class stage_task;
113     friend class pipeline;
114     friend void set_end_of_input(d1::base_filter& bf);
115 
116     using size_type = Token;
117 
118     //! Array of deferred tasks that cannot yet start executing.
119     task_info* array;
120 
121     //! Size of array
122     /** Always 0 or a power of 2 */
123     size_type array_size;
124 
125     //! Lowest token that can start executing.
126     /** All prior Token have already been seen. */
127     Token low_token;
128 
129     //! Serializes updates.
130     spin_mutex array_mutex;
131 
132     //! Resize "array".
133     /** Caller is responsible to acquiring a lock on "array_mutex". */
134     void grow( size_type minimum_size );
135 
136     //! Initial size for "array"
137     /** Must be a power of 2 */
138     static const size_type initial_buffer_size = 4;
139 
140     //! Used for out of order buffer, and for assigning my_token if is_ordered and my_token not already assigned
141     Token high_token;
142 
143     //! True for ordered filter, false otherwise.
144     const bool is_ordered;
145 
146     //! for parallel filters that accepts NULLs, thread-local flag for reaching end_of_input
147     using end_of_input_tls_t = basic_tls<std::intptr_t>;
148     end_of_input_tls_t end_of_input_tls;
149     bool end_of_input_tls_allocated; // no way to test pthread creation of TLS
150 
151 public:
152     input_buffer(const input_buffer&) = delete;
153     input_buffer& operator=(const input_buffer&) = delete;
154 
155     //! Construct empty buffer.
input_buffer(bool ordered)156     input_buffer( bool ordered) :
157             array(nullptr),
158             array_size(0),
159             low_token(0),
160             high_token(0),
161             is_ordered(ordered),
162             end_of_input_tls(),
163             end_of_input_tls_allocated(false) {
164         grow(initial_buffer_size);
165         __TBB_ASSERT( array, nullptr );
166     }
167 
168     //! Destroy the buffer.
~input_buffer()169     ~input_buffer() {
170         __TBB_ASSERT( array, nullptr );
171         cache_aligned_allocator<task_info>().deallocate(array,array_size);
172         poison_pointer( array );
173         if( end_of_input_tls_allocated ) {
174             destroy_my_tls();
175         }
176     }
177 
178     //! Define order when the first filter is serial_in_order.
get_ordered_token()179     Token get_ordered_token(){
180         return high_token++;
181     }
182 
183     //! Put a token into the buffer.
184     /** If task information was placed into buffer, returns true;
185         otherwise returns false, informing the caller to create and spawn a task.
186     */
try_put_token(task_info & info)187     bool try_put_token( task_info& info ) {
188         info.is_valid = true;
189         spin_mutex::scoped_lock lock( array_mutex );
190         Token token;
191         if( is_ordered ) {
192             if( !info.my_token_ready ) {
193                 info.my_token = high_token++;
194                 info.my_token_ready = true;
195             }
196             token = info.my_token;
197         } else
198             token = high_token++;
199         __TBB_ASSERT( (long)(token-low_token)>=0, nullptr );
200         if( token!=low_token ) {
201             // Trying to put token that is beyond low_token.
202             // Need to wait until low_token catches up before dispatching.
203             if( token-low_token>=array_size )
204                 grow( token-low_token+1 );
205             ITT_NOTIFY( sync_releasing, this );
206             array[token&(array_size-1)] = info;
207             return true;
208         }
209         return false;
210     }
211 
212     //! Note that processing of a token is finished.
213     /** Fires up processing of the next token, if processing was deferred. */
214     // Uses template to avoid explicit dependency on stage_task.
215     template<typename StageTask>
try_to_spawn_task_for_next_token(StageTask & spawner,d1::execution_data & ed)216     void try_to_spawn_task_for_next_token(StageTask& spawner, d1::execution_data& ed) {
217         task_info wakee;
218         {
219             spin_mutex::scoped_lock lock( array_mutex );
220             // Wake the next task
221             task_info& item = array[++low_token & (array_size-1)];
222             ITT_NOTIFY( sync_acquired, this );
223             wakee = item;
224             item.is_valid = false;
225         }
226         if( wakee.is_valid )
227             spawner.spawn_stage_task(wakee, ed);
228     }
229 
230     // end_of_input signal for parallel_pipeline, parallel input filters with 0 tokens allowed.
create_my_tls()231     void create_my_tls() {
232         int status = end_of_input_tls.create();
233         if(status)
234             handle_perror(status, "TLS not allocated for filter");
235         end_of_input_tls_allocated = true;
236     }
destroy_my_tls()237     void destroy_my_tls() {
238         int status = end_of_input_tls.destroy();
239         if(status)
240             handle_perror(status, "Failed to destroy filter TLS");
241     }
my_tls_end_of_input()242     bool my_tls_end_of_input() {
243         return end_of_input_tls.get() != 0;
244     }
set_my_tls_end_of_input()245     void set_my_tls_end_of_input() {
246         end_of_input_tls.set(1);
247     }
248 };
249 
grow(size_type minimum_size)250 void input_buffer::grow( size_type minimum_size ) {
251     size_type old_size = array_size;
252     size_type new_size = old_size ? 2*old_size : initial_buffer_size;
253     while( new_size<minimum_size )
254         new_size*=2;
255     task_info* new_array = cache_aligned_allocator<task_info>().allocate(new_size);
256     task_info* old_array = array;
257     for( size_type i=0; i<new_size; ++i )
258         new_array[i].is_valid = false;
259     Token t=low_token;
260     for( size_type i=0; i<old_size; ++i, ++t )
261         new_array[t&(new_size-1)] = old_array[t&(old_size-1)];
262     array = new_array;
263     array_size = new_size;
264     if( old_array )
265         cache_aligned_allocator<task_info>().deallocate(old_array,old_size);
266 }
267 
268 class stage_task : public d1::task, public task_info {
269 private:
270     friend class pipeline;
271     pipeline& my_pipeline;
272     d1::base_filter* my_filter;
273     d1::small_object_allocator m_allocator;
274     //! True if this task has not yet read the input.
275     bool my_at_start;
276 
277     //! True if this can be executed again.
278     bool execute_filter(d1::execution_data& ed);
279 
280     //! Spawn task if token is available.
try_spawn_stage_task(d1::execution_data & ed)281     void try_spawn_stage_task(d1::execution_data& ed) {
282         ITT_NOTIFY( sync_releasing, &my_pipeline.input_tokens );
283         if( (my_pipeline.input_tokens.fetch_sub(1, std::memory_order_release)) > 1 ) {
284             d1::small_object_allocator alloc{};
285             r1::spawn( *alloc.new_object<stage_task>(ed, my_pipeline, alloc ), my_pipeline.my_context );
286         }
287     }
288 
289 public:
290 
291     //! Construct stage_task for first stage in a pipeline.
292     /** Such a stage has not read any input yet. */
stage_task(pipeline & pipeline,d1::small_object_allocator & alloc)293     stage_task(pipeline& pipeline, d1::small_object_allocator& alloc ) :
294         my_pipeline(pipeline),
295         my_filter(pipeline.first_filter),
296         m_allocator(alloc),
297         my_at_start(true)
298     {
299         task_info::reset();
300         my_pipeline.wait_ctx.reserve();
301     }
302     //! Construct stage_task for a subsequent stage in a pipeline.
stage_task(pipeline & pipeline,d1::base_filter * filter,const task_info & info,d1::small_object_allocator & alloc)303     stage_task(pipeline& pipeline, d1::base_filter* filter, const task_info& info, d1::small_object_allocator& alloc) :
304         task_info(info),
305         my_pipeline(pipeline),
306         my_filter(filter),
307         m_allocator(alloc),
308         my_at_start(false)
309     {
310         my_pipeline.wait_ctx.reserve();
311     }
312     //! Roughly equivalent to the constructor of input stage task
reset()313     void reset() {
314         task_info::reset();
315         my_filter = my_pipeline.first_filter;
316         my_at_start = true;
317     }
finalize(d1::execution_data & ed)318     void finalize(d1::execution_data& ed) {
319         m_allocator.delete_object(this, ed);
320     }
321     //! The virtual task execution method
execute(d1::execution_data & ed)322     task* execute(d1::execution_data& ed) override {
323         if(!execute_filter(ed)) {
324             finalize(ed);
325             return nullptr;
326         }
327         return this;
328     }
cancel(d1::execution_data & ed)329     task* cancel(d1::execution_data& ed) override {
330         finalize(ed);
331         return nullptr;
332     }
333 
~stage_task()334     ~stage_task() {
335         if ( my_filter && my_object ) {
336             my_filter->finalize(my_object);
337             my_object = nullptr;
338         }
339         my_pipeline.wait_ctx.release();
340     }
341     //! Creates and spawns stage_task from task_info
spawn_stage_task(const task_info & info,d1::execution_data & ed)342     void spawn_stage_task(const task_info& info, d1::execution_data& ed) {
343         d1::small_object_allocator alloc{};
344         stage_task* clone = alloc.new_object<stage_task>(ed, my_pipeline, my_filter, info, alloc);
345         r1::spawn(*clone, my_pipeline.my_context);
346     }
347 };
348 
execute_filter(d1::execution_data & ed)349 bool stage_task::execute_filter(d1::execution_data& ed) {
350     __TBB_ASSERT( !my_at_start || !my_object, "invalid state of task" );
351     if( my_at_start ) {
352         if( my_filter->is_serial() ) {
353             my_object = (*my_filter)(my_object);
354             if( my_object || ( my_filter->object_may_be_null() && !my_pipeline.end_of_input.load(std::memory_order_relaxed)) ) {
355                 if( my_filter->is_ordered() ) {
356                     my_token = my_filter->my_input_buffer->get_ordered_token();
357                     my_token_ready = true;
358                 }
359                 if( !my_filter->next_filter_in_pipeline ) { // we're only filter in pipeline
360                     reset();
361                     return true;
362                 } else {
363                     try_spawn_stage_task(ed);
364                 }
365             } else {
366                 my_pipeline.end_of_input.store(true, std::memory_order_relaxed);
367                 return false;
368             }
369         } else /*not is_serial*/ {
370             if ( my_pipeline.end_of_input.load(std::memory_order_relaxed) ) {
371                 return false;
372             }
373 
374             try_spawn_stage_task(ed);
375 
376             my_object = (*my_filter)(my_object);
377             if( !my_object && (!my_filter->object_may_be_null() || my_filter->my_input_buffer->my_tls_end_of_input()) ){
378                 my_pipeline.end_of_input.store(true, std::memory_order_relaxed);
379                 return false;
380             }
381         }
382         my_at_start = false;
383     } else {
384         my_object = (*my_filter)(my_object);
385         if( my_filter->is_serial() )
386             my_filter->my_input_buffer->try_to_spawn_task_for_next_token(*this, ed);
387     }
388     my_filter = my_filter->next_filter_in_pipeline;
389     if( my_filter ) {
390         // There is another filter to execute.
391         if( my_filter->is_serial() ) {
392             // The next filter must execute tokens when they are available (in order for serial_in_order)
393             if( my_filter->my_input_buffer->try_put_token(*this) ){
394                 my_filter = nullptr; // To prevent deleting my_object twice if exception occurs
395                 return false;
396             }
397         }
398     } else {
399         // Reached end of the pipe.
400         std::size_t ntokens_avail = my_pipeline.input_tokens.fetch_add(1, std::memory_order_acquire);
401 
402         if( ntokens_avail>0  // Only recycle if there is one available token
403                 || my_pipeline.end_of_input.load(std::memory_order_relaxed) ) {
404             return false; // No need to recycle for new input
405         }
406         ITT_NOTIFY( sync_acquired, &my_pipeline.input_tokens );
407         // Recycle as an input stage task.
408         reset();
409     }
410     return true;
411 }
412 
~pipeline()413 pipeline::~pipeline() {
414     while( first_filter ) {
415         d1::base_filter* f = first_filter;
416         if( input_buffer* b = f->my_input_buffer ) {
417             b->~input_buffer();
418             deallocate_memory(b);
419         }
420         first_filter = f->next_filter_in_pipeline;
421         f->~base_filter();
422         deallocate_memory(f);
423     }
424 }
425 
add_filter(d1::base_filter & new_fitler)426 void pipeline::add_filter( d1::base_filter& new_fitler ) {
427     __TBB_ASSERT( new_fitler.next_filter_in_pipeline==d1::base_filter::not_in_pipeline(), "filter already part of pipeline?" );
428     new_fitler.my_pipeline = this;
429     if ( first_filter == nullptr )
430         first_filter = &new_fitler;
431     else
432         last_filter->next_filter_in_pipeline = &new_fitler;
433     new_fitler.next_filter_in_pipeline = nullptr;
434     last_filter = &new_fitler;
435     if( new_fitler.is_serial() ) {
436         new_fitler.my_input_buffer = new (allocate_memory(sizeof(input_buffer))) input_buffer( new_fitler.is_ordered() );
437     } else {
438         if( first_filter == &new_fitler && new_fitler.object_may_be_null() ) {
439             //TODO: buffer only needed to hold TLS; could improve
440             new_fitler.my_input_buffer = new (allocate_memory(sizeof(input_buffer))) input_buffer( /*is_ordered*/false );
441             new_fitler.my_input_buffer->create_my_tls();
442         }
443     }
444 }
445 
parallel_pipeline(d1::task_group_context & cxt,std::size_t max_token,const d1::filter_node & fn)446 void __TBB_EXPORTED_FUNC parallel_pipeline(d1::task_group_context& cxt, std::size_t max_token, const d1::filter_node& fn) {
447     pipeline pipe(cxt, max_token);
448 
449     pipe.fill_pipeline(fn);
450 
451     d1::small_object_allocator alloc{};
452     stage_task& st = *alloc.new_object<stage_task>(pipe, alloc);
453 
454     // Start execution of tasks
455     r1::execute_and_wait(st, cxt, pipe.wait_ctx, cxt);
456 }
457 
set_end_of_input(d1::base_filter & bf)458 void __TBB_EXPORTED_FUNC set_end_of_input(d1::base_filter& bf) {
459     __TBB_ASSERT(bf.my_input_buffer, nullptr);
460     __TBB_ASSERT(bf.object_may_be_null(), nullptr);
461     if(bf.is_serial() ) {
462         bf.my_pipeline->end_of_input.store(true, std::memory_order_relaxed);
463     } else {
464         __TBB_ASSERT(bf.my_input_buffer->end_of_input_tls_allocated, nullptr);
465         bf.my_input_buffer->set_my_tls_end_of_input();
466     }
467 }
468 
469 } // namespace r1
470 } // namespace detail
471 } // namespace tbb
472