1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "tbb/pipeline.h"
18 #include "tbb/spin_mutex.h"
19 #include "tbb/cache_aligned_allocator.h"
20 #include "itt_notify.h"
21 #include "semaphore.h"
22 #include "tls.h"  // for parallel filters that do not use NULL as end_of_input
23 
24 
25 namespace tbb {
26 
27 namespace internal {
28 
29 //! This structure is used to store task information in a input buffer
30 struct task_info {
31     void* my_object;
32     //! Invalid unless a task went through an ordered stage.
33     Token my_token;
34     //! False until my_token is set.
35     bool my_token_ready;
36     //! True if my_object is valid.
37     bool is_valid;
38     //! Set to initial state (no object, no token)
resettbb::internal::task_info39     void reset() {
40         my_object = NULL;
41         my_token = 0;
42         my_token_ready = false;
43         is_valid = false;
44     }
45 };
46 //! A buffer of input items for a filter.
47 /** Each item is a task_info, inserted into a position in the buffer corresponding to a Token. */
48 class input_buffer : no_copy {
49     friend class tbb::internal::pipeline_root_task;
50     friend class tbb::filter;
51     friend class tbb::thread_bound_filter;
52     friend class tbb::internal::stage_task;
53     friend class tbb::pipeline;
54 
55     typedef  Token  size_type;
56 
57     //! Array of deferred tasks that cannot yet start executing.
58     task_info* array;
59 
60     //! for thread-bound filter, semaphore for waiting, NULL otherwise.
61     semaphore* my_sem;
62 
63     //! Size of array
64     /** Always 0 or a power of 2 */
65     size_type array_size;
66 
67     //! Lowest token that can start executing.
68     /** All prior Token have already been seen. */
69     Token low_token;
70 
71     //! Serializes updates.
72     spin_mutex array_mutex;
73 
74     //! Resize "array".
75     /** Caller is responsible to acquiring a lock on "array_mutex". */
76     void grow( size_type minimum_size );
77 
78     //! Initial size for "array"
79     /** Must be a power of 2 */
80     static const size_type initial_buffer_size = 4;
81 
82     //! Used for out of order buffer, and for assigning my_token if is_ordered and my_token not already assigned
83     Token high_token;
84 
85     //! True for ordered filter, false otherwise.
86     bool is_ordered;
87 
88     //! True for thread-bound filter, false otherwise.
89     bool is_bound;
90 
91     //! for parallel filters that accepts NULLs, thread-local flag for reaching end_of_input
92     typedef basic_tls<intptr_t> end_of_input_tls_t;
93     end_of_input_tls_t end_of_input_tls;
94     bool end_of_input_tls_allocated; // no way to test pthread creation of TLS
95 
create_sema(size_t initial_tokens)96     void create_sema(size_t initial_tokens) { __TBB_ASSERT(!my_sem,NULL); my_sem = new internal::semaphore(initial_tokens); }
free_sema()97     void free_sema() { __TBB_ASSERT(my_sem,NULL); delete my_sem; }
sema_P()98     void sema_P() { __TBB_ASSERT(my_sem,NULL); my_sem->P(); }
sema_V()99     void sema_V() { __TBB_ASSERT(my_sem,NULL); my_sem->V(); }
100 
101 public:
102     //! Construct empty buffer.
input_buffer(bool is_ordered_,bool is_bound_)103     input_buffer( bool is_ordered_, bool is_bound_ ) :
104             array(NULL), my_sem(NULL), array_size(0),
105             low_token(0), high_token(0),
106             is_ordered(is_ordered_), is_bound(is_bound_),
107             end_of_input_tls_allocated(false) {
108         grow(initial_buffer_size);
109         __TBB_ASSERT( array, NULL );
110         if(is_bound) create_sema(0);
111     }
112 
113     //! Destroy the buffer.
~input_buffer()114     ~input_buffer() {
115         __TBB_ASSERT( array, NULL );
116         cache_aligned_allocator<task_info>().deallocate(array,array_size);
117         poison_pointer( array );
118         if(my_sem) {
119             free_sema();
120         }
121         if(end_of_input_tls_allocated) {
122             destroy_my_tls();
123         }
124     }
125 
126     //! Put a token into the buffer.
127     /** If task information was placed into buffer, returns true;
128         otherwise returns false, informing the caller to create and spawn a task.
129         If input buffer owned by thread-bound filter and the item at
130         low_token was not valid, issue a V()
131         If the input_buffer is owned by a successor to a thread-bound filter,
132         the force_put parameter should be true to ensure the token is inserted
133         in the buffer.
134     */
put_token(task_info & info_,bool force_put=false)135     bool put_token( task_info& info_, bool force_put = false ) {
136         {
137             info_.is_valid = true;
138             spin_mutex::scoped_lock lock( array_mutex );
139             Token token;
140             bool was_empty = !array[low_token&(array_size-1)].is_valid;
141             if( is_ordered ) {
142                 if( !info_.my_token_ready ) {
143                     info_.my_token = high_token++;
144                     info_.my_token_ready = true;
145                 }
146                 token = info_.my_token;
147             } else
148                 token = high_token++;
149             __TBB_ASSERT( (tokendiff_t)(token-low_token)>=0, NULL );
150             if( token!=low_token || is_bound || force_put ) {
151                 // Trying to put token that is beyond low_token.
152                 // Need to wait until low_token catches up before dispatching.
153                 if( token-low_token>=array_size )
154                     grow( token-low_token+1 );
155                 ITT_NOTIFY( sync_releasing, this );
156                 array[token&(array_size-1)] = info_;
157                 if(was_empty && is_bound) {
158                     sema_V();
159                 }
160                 return true;
161             }
162         }
163         return false;
164     }
165 
166     //! Note that processing of a token is finished.
167     /** Fires up processing of the next token, if processing was deferred. */
168     // Uses template to avoid explicit dependency on stage_task.
169     // This is only called for serial filters, and is the reason for the
170     // advance parameter in return_item (we're incrementing low_token here.)
171     // Non-TBF serial stages don't advance the token at the start because the presence
172     // of the current token in the buffer keeps another stage from being spawned.
173     template<typename StageTask>
note_done(Token token,StageTask & spawner)174     void note_done( Token token, StageTask& spawner ) {
175         task_info wakee;
176         wakee.reset();
177         {
178             spin_mutex::scoped_lock lock( array_mutex );
179             if( !is_ordered || token==low_token ) {
180                 // Wake the next task
181                 task_info& item = array[++low_token & (array_size-1)];
182                 ITT_NOTIFY( sync_acquired, this );
183                 wakee = item;
184                 item.is_valid = false;
185             }
186         }
187         if( wakee.is_valid )
188             spawner.spawn_stage_task(wakee);
189     }
190 
191 #if __TBB_TASK_GROUP_CONTEXT
192     //! The method destroys all data in filters to prevent memory leaks
clear(filter * my_filter)193     void clear( filter* my_filter ) {
194         long t=low_token;
195         for( size_type i=0; i<array_size; ++i, ++t ){
196             task_info& temp = array[t&(array_size-1)];
197             if (temp.is_valid ) {
198                 my_filter->finalize(temp.my_object);
199                 temp.is_valid = false;
200             }
201         }
202     }
203 #endif
204 
205     //! return an item, invalidate the queued item, but only advance if the filter
206     // is parallel (as indicated by advance == true). If the filter is serial, leave the
207     // item in the buffer to keep another stage from being spawned.
return_item(task_info & info,bool advance)208     bool return_item(task_info& info, bool advance) {
209         spin_mutex::scoped_lock lock( array_mutex );
210         task_info& item = array[low_token&(array_size-1)];
211         ITT_NOTIFY( sync_acquired, this );
212         if( item.is_valid ) {
213             info = item;
214             item.is_valid = false;
215             if (advance) low_token++;
216             return true;
217         }
218         return false;
219     }
220 
221     //! true if the current low_token is valid.
has_item()222     bool has_item() { spin_mutex::scoped_lock lock(array_mutex); return array[low_token&(array_size -1)].is_valid; }
223 
224     // end_of_input signal for parallel_pipeline, parallel input filters with 0 tokens allowed.
create_my_tls()225     void create_my_tls() { int status = end_of_input_tls.create(); if(status) handle_perror(status, "TLS not allocated for filter"); end_of_input_tls_allocated = true; }
destroy_my_tls()226     void destroy_my_tls() { int status = end_of_input_tls.destroy(); if(status) handle_perror(status, "Failed to destroy filter TLS"); }
my_tls_end_of_input()227     bool my_tls_end_of_input() { return end_of_input_tls.get() != 0; }
set_my_tls_end_of_input()228     void set_my_tls_end_of_input() { end_of_input_tls.set(1); }
229 };
230 
grow(size_type minimum_size)231 void input_buffer::grow( size_type minimum_size ) {
232     size_type old_size = array_size;
233     size_type new_size = old_size ? 2*old_size : initial_buffer_size;
234     while( new_size<minimum_size )
235         new_size*=2;
236     task_info* new_array = cache_aligned_allocator<task_info>().allocate(new_size);
237     task_info* old_array = array;
238     for( size_type i=0; i<new_size; ++i )
239         new_array[i].is_valid = false;
240     long t=low_token;
241     for( size_type i=0; i<old_size; ++i, ++t )
242         new_array[t&(new_size-1)] = old_array[t&(old_size-1)];
243     array = new_array;
244     array_size = new_size;
245     if( old_array )
246         cache_aligned_allocator<task_info>().deallocate(old_array,old_size);
247 }
248 
249 class stage_task: public task, public task_info {
250 private:
251     friend class tbb::pipeline;
252     pipeline& my_pipeline;
253     filter* my_filter;
254     //! True if this task has not yet read the input.
255     bool my_at_start;
256 
257 public:
258     //! Construct stage_task for first stage in a pipeline.
259     /** Such a stage has not read any input yet. */
stage_task(pipeline & pipeline)260     stage_task( pipeline& pipeline ) :
261         my_pipeline(pipeline),
262         my_filter(pipeline.filter_list),
263         my_at_start(true)
264     {
265         task_info::reset();
266     }
267     //! Construct stage_task for a subsequent stage in a pipeline.
stage_task(pipeline & pipeline,filter * filter_,const task_info & info)268     stage_task( pipeline& pipeline, filter* filter_, const task_info& info ) :
269         task_info(info),
270         my_pipeline(pipeline),
271         my_filter(filter_),
272         my_at_start(false)
273     {}
274     //! Roughly equivalent to the constructor of input stage task
reset()275     void reset() {
276         task_info::reset();
277         my_filter = my_pipeline.filter_list;
278         my_at_start = true;
279     }
280     //! The virtual task execution method
281     task* execute() __TBB_override;
282 #if __TBB_TASK_GROUP_CONTEXT
~stage_task()283     ~stage_task()
284     {
285         if (my_filter && my_object && (my_filter->my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(4)) {
286             __TBB_ASSERT(is_cancelled(), "Trying to finalize the task that wasn't cancelled");
287             my_filter->finalize(my_object);
288             my_object = NULL;
289         }
290     }
291 #endif // __TBB_TASK_GROUP_CONTEXT
292     //! Creates and spawns stage_task from task_info
spawn_stage_task(const task_info & info)293     void spawn_stage_task(const task_info& info)
294     {
295         stage_task* clone = new (allocate_additional_child_of(*parent()))
296                                 stage_task( my_pipeline, my_filter, info );
297         spawn(*clone);
298     }
299 };
300 
execute()301 task* stage_task::execute() {
302     __TBB_ASSERT( !my_at_start || !my_object, NULL );
303     __TBB_ASSERT( !my_filter->is_bound(), NULL );
304     if( my_at_start ) {
305         if( my_filter->is_serial() ) {
306             my_object = (*my_filter)(my_object);
307             if( my_object || ( my_filter->object_may_be_null() && !my_pipeline.end_of_input) )
308             {
309                 if( my_filter->is_ordered() ) {
310                     my_token = my_pipeline.token_counter++; // ideally, with relaxed semantics
311                     my_token_ready = true;
312                 } else if( (my_filter->my_filter_mode & my_filter->version_mask) >= __TBB_PIPELINE_VERSION(5) ) {
313                     if( my_pipeline.has_thread_bound_filters )
314                         my_pipeline.token_counter++; // ideally, with relaxed semantics
315                 }
316                 if( !my_filter->next_filter_in_pipeline ) { // we're only filter in pipeline
317                     reset();
318                     goto process_another_stage;
319                 } else {
320                     ITT_NOTIFY( sync_releasing, &my_pipeline.input_tokens );
321                     if( --my_pipeline.input_tokens>0 )
322                         spawn( *new( allocate_additional_child_of(*parent()) ) stage_task( my_pipeline ) );
323                 }
324             } else {
325                 my_pipeline.end_of_input = true;
326                 return NULL;
327             }
328         } else /*not is_serial*/ {
329             if( my_pipeline.end_of_input )
330                 return NULL;
331             if( (my_filter->my_filter_mode & my_filter->version_mask) >= __TBB_PIPELINE_VERSION(5) ) {
332                 if( my_pipeline.has_thread_bound_filters )
333                     my_pipeline.token_counter++;
334             }
335             ITT_NOTIFY( sync_releasing, &my_pipeline.input_tokens );
336             if( --my_pipeline.input_tokens>0 )
337                 spawn( *new( allocate_additional_child_of(*parent()) ) stage_task( my_pipeline ) );
338             my_object = (*my_filter)(my_object);
339             if( !my_object && (!my_filter->object_may_be_null() || my_filter->my_input_buffer->my_tls_end_of_input()) )
340             {
341                 my_pipeline.end_of_input = true;
342                 if( (my_filter->my_filter_mode & my_filter->version_mask) >= __TBB_PIPELINE_VERSION(5) ) {
343                     if( my_pipeline.has_thread_bound_filters )
344                         my_pipeline.token_counter--;  // fix token_counter
345                 }
346                 return NULL;
347             }
348         }
349         my_at_start = false;
350     } else {
351         my_object = (*my_filter)(my_object);
352         if( my_filter->is_serial() )
353             my_filter->my_input_buffer->note_done(my_token, *this);
354     }
355     my_filter = my_filter->next_filter_in_pipeline;
356     if( my_filter ) {
357         // There is another filter to execute.
358         if( my_filter->is_serial() ) {
359             // The next filter must execute tokens in order
360             if( my_filter->my_input_buffer->put_token(*this) ){
361                 // Can't proceed with the same item
362                 if( my_filter->is_bound() ) {
363                     // Find the next non-thread-bound filter
364                     do {
365                         my_filter = my_filter->next_filter_in_pipeline;
366                     } while( my_filter && my_filter->is_bound() );
367                     // Check if there is an item ready to process
368                     if( my_filter && my_filter->my_input_buffer->return_item(*this, !my_filter->is_serial()))
369                         goto process_another_stage;
370                 }
371                 my_filter = NULL; // To prevent deleting my_object twice if exception occurs
372                 return NULL;
373             }
374         }
375     } else {
376         // Reached end of the pipe.
377         size_t ntokens_avail = ++my_pipeline.input_tokens;
378         if(my_pipeline.filter_list->is_bound() ) {
379             if(ntokens_avail == 1) {
380                 my_pipeline.filter_list->my_input_buffer->sema_V();
381             }
382             return NULL;
383         }
384         if( ntokens_avail>1  // Only recycle if there is one available token
385                 || my_pipeline.end_of_input ) {
386             return NULL; // No need to recycle for new input
387         }
388         ITT_NOTIFY( sync_acquired, &my_pipeline.input_tokens );
389         // Recycle as an input stage task.
390         reset();
391     }
392 process_another_stage:
393     /* A semi-hackish way to reexecute the same task object immediately without spawning.
394        recycle_as_continuation marks the task for future execution,
395        and then 'this' pointer is returned to bypass spawning. */
396     recycle_as_continuation();
397     return this;
398 }
399 
400 class pipeline_root_task: public task {
401     pipeline& my_pipeline;
402     bool do_segment_scanning;
403 
execute()404     task* execute() __TBB_override {
405         if( !my_pipeline.end_of_input )
406             if( !my_pipeline.filter_list->is_bound() )
407                 if( my_pipeline.input_tokens > 0 ) {
408                     recycle_as_continuation();
409                     set_ref_count(1);
410                     return new( allocate_child() ) stage_task( my_pipeline );
411                 }
412         if( do_segment_scanning ) {
413             filter* current_filter = my_pipeline.filter_list->next_segment;
414             /* first non-thread-bound filter that follows thread-bound one
415             and may have valid items to process */
416             filter* first_suitable_filter = current_filter;
417             while( current_filter ) {
418                 __TBB_ASSERT( !current_filter->is_bound(), "filter is thread-bound?" );
419                 __TBB_ASSERT( current_filter->prev_filter_in_pipeline->is_bound(), "previous filter is not thread-bound?" );
420                 if( !my_pipeline.end_of_input || current_filter->has_more_work())
421                 {
422                     task_info info;
423                     info.reset();
424                     task* bypass = NULL;
425                     int refcnt = 0;
426                     task_list list;
427                     // No new tokens are created; it's OK to process all waiting tokens.
428                     // If the filter is serial, the second call to return_item will return false.
429                     while( current_filter->my_input_buffer->return_item(info, !current_filter->is_serial()) ) {
430                         task* t = new( allocate_child() ) stage_task( my_pipeline, current_filter, info );
431                         if( ++refcnt == 1 )
432                             bypass = t;
433                         else // there's more than one task
434                             list.push_back(*t);
435                         // TODO: limit the list size (to arena size?) to spawn tasks sooner
436                         __TBB_ASSERT( refcnt <= int(my_pipeline.token_counter), "token counting error" );
437                         info.reset();
438                     }
439                     if( refcnt ) {
440                         set_ref_count( refcnt );
441                         if( refcnt > 1 )
442                             spawn(list);
443                         recycle_as_continuation();
444                         return bypass;
445                     }
446                     current_filter = current_filter->next_segment;
447                     if( !current_filter ) {
448                         if( !my_pipeline.end_of_input ) {
449                             recycle_as_continuation();
450                             return this;
451                         }
452                         current_filter = first_suitable_filter;
453                         __TBB_Yield();
454                     }
455                 } else {
456                     /* The preceding pipeline segment is empty.
457                     Fast-forward to the next post-TBF segment. */
458                     first_suitable_filter = first_suitable_filter->next_segment;
459                     current_filter = first_suitable_filter;
460                 }
461             } /* while( current_filter ) */
462             return NULL;
463         } else {
464             if( !my_pipeline.end_of_input ) {
465                 recycle_as_continuation();
466                 return this;
467             }
468             return NULL;
469         }
470     }
471 public:
pipeline_root_task(pipeline & pipeline)472     pipeline_root_task( pipeline& pipeline ): my_pipeline(pipeline), do_segment_scanning(false)
473     {
474         __TBB_ASSERT( my_pipeline.filter_list, NULL );
475         filter* first = my_pipeline.filter_list;
476         if( (first->my_filter_mode & first->version_mask) >= __TBB_PIPELINE_VERSION(5) ) {
477             // Scanning the pipeline for segments
478             filter* head_of_previous_segment = first;
479             for(  filter* subfilter=first->next_filter_in_pipeline;
480                   subfilter!=NULL;
481                   subfilter=subfilter->next_filter_in_pipeline )
482             {
483                 if( subfilter->prev_filter_in_pipeline->is_bound() && !subfilter->is_bound() ) {
484                     do_segment_scanning = true;
485                     head_of_previous_segment->next_segment = subfilter;
486                     head_of_previous_segment = subfilter;
487                 }
488             }
489         }
490     }
491 };
492 
493 #if _MSC_VER && !defined(__INTEL_COMPILER)
494     // Workaround for overzealous compiler warnings
495     // Suppress compiler warning about constant conditional expression
496     #pragma warning (disable: 4127)
497 #endif
498 
499 // The class destroys end_counter and clears all input buffers if pipeline was cancelled.
500 class pipeline_cleaner: internal::no_copy {
501     pipeline& my_pipeline;
502 public:
pipeline_cleaner(pipeline & _pipeline)503     pipeline_cleaner(pipeline& _pipeline) :
504         my_pipeline(_pipeline)
505     {}
~pipeline_cleaner()506     ~pipeline_cleaner(){
507 #if __TBB_TASK_GROUP_CONTEXT
508         if (my_pipeline.end_counter->is_cancelled()) // Pipeline was cancelled
509             my_pipeline.clear_filters();
510 #endif
511         my_pipeline.end_counter = NULL;
512     }
513 };
514 
515 } // namespace internal
516 
inject_token(task &)517 void pipeline::inject_token( task& ) {
518     __TBB_ASSERT(false,"illegal call to inject_token");
519 }
520 
521 #if __TBB_TASK_GROUP_CONTEXT
clear_filters()522 void pipeline::clear_filters() {
523     for( filter* f = filter_list; f; f = f->next_filter_in_pipeline ) {
524         if ((f->my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(4))
525             if( internal::input_buffer* b = f->my_input_buffer )
526                 b->clear(f);
527     }
528 }
529 #endif
530 
pipeline()531 pipeline::pipeline() :
532     filter_list(NULL),
533     filter_end(NULL),
534     end_counter(NULL),
535     end_of_input(false),
536     has_thread_bound_filters(false)
537 {
538     token_counter = 0;
539     input_tokens = 0;
540 }
541 
~pipeline()542 pipeline::~pipeline() {
543     clear();
544 }
545 
clear()546 void pipeline::clear() {
547     filter* next;
548     for( filter* f = filter_list; f; f=next ) {
549         if( internal::input_buffer* b = f->my_input_buffer ) {
550             delete b;
551             f->my_input_buffer = NULL;
552         }
553         next=f->next_filter_in_pipeline;
554         f->next_filter_in_pipeline = filter::not_in_pipeline();
555         if ( (f->my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(3) ) {
556             f->prev_filter_in_pipeline = filter::not_in_pipeline();
557             f->my_pipeline = NULL;
558         }
559         if ( (f->my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(5) )
560             f->next_segment = NULL;
561     }
562     filter_list = filter_end = NULL;
563 }
564 
add_filter(filter & filter_)565 void pipeline::add_filter( filter& filter_ ) {
566 #if TBB_USE_ASSERT
567     if ( (filter_.my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(3) )
568         __TBB_ASSERT( filter_.prev_filter_in_pipeline==filter::not_in_pipeline(), "filter already part of pipeline?" );
569     __TBB_ASSERT( filter_.next_filter_in_pipeline==filter::not_in_pipeline(), "filter already part of pipeline?" );
570     __TBB_ASSERT( !end_counter, "invocation of add_filter on running pipeline" );
571 #endif
572     if ( (filter_.my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(3) ) {
573         filter_.my_pipeline = this;
574         filter_.prev_filter_in_pipeline = filter_end;
575         if ( filter_list == NULL)
576             filter_list = &filter_;
577         else
578             filter_end->next_filter_in_pipeline = &filter_;
579         filter_.next_filter_in_pipeline = NULL;
580         filter_end = &filter_;
581     } else {
582         if( !filter_end )
583             filter_end = reinterpret_cast<filter*>(&filter_list);
584 
585         *reinterpret_cast<filter**>(filter_end) = &filter_;
586         filter_end = reinterpret_cast<filter*>(&filter_.next_filter_in_pipeline);
587         *reinterpret_cast<filter**>(filter_end) = NULL;
588     }
589     if( (filter_.my_filter_mode & filter_.version_mask) >= __TBB_PIPELINE_VERSION(5) ) {
590         if( filter_.is_serial() ) {
591             if( filter_.is_bound() )
592                 has_thread_bound_filters = true;
593             filter_.my_input_buffer = new internal::input_buffer( filter_.is_ordered(), filter_.is_bound() );
594         } else {
595             if(filter_.prev_filter_in_pipeline) {
596                 if(filter_.prev_filter_in_pipeline->is_bound()) {
597                     // successors to bound filters must have an input_buffer
598                     filter_.my_input_buffer = new internal::input_buffer( /*is_ordered*/false, false );
599                 }
600             } else {  // input filter
601                 if(filter_.object_may_be_null() ) {
602                     //TODO: buffer only needed to hold TLS; could improve
603                     filter_.my_input_buffer = new internal::input_buffer( /*is_ordered*/false, false );
604                     filter_.my_input_buffer->create_my_tls();
605                 }
606             }
607         }
608     } else {
609         if( filter_.is_serial() ) {
610             filter_.my_input_buffer = new internal::input_buffer( filter_.is_ordered(), false );
611         }
612     }
613 
614 }
615 
remove_filter(filter & filter_)616 void pipeline::remove_filter( filter& filter_ ) {
617     __TBB_ASSERT( filter_.prev_filter_in_pipeline!=filter::not_in_pipeline(), "filter not part of pipeline" );
618     __TBB_ASSERT( filter_.next_filter_in_pipeline!=filter::not_in_pipeline(), "filter not part of pipeline" );
619     __TBB_ASSERT( !end_counter, "invocation of remove_filter on running pipeline" );
620     if (&filter_ == filter_list)
621         filter_list = filter_.next_filter_in_pipeline;
622     else {
623         __TBB_ASSERT( filter_.prev_filter_in_pipeline, "filter list broken?" );
624         filter_.prev_filter_in_pipeline->next_filter_in_pipeline = filter_.next_filter_in_pipeline;
625     }
626     if (&filter_ == filter_end)
627         filter_end = filter_.prev_filter_in_pipeline;
628     else {
629         __TBB_ASSERT( filter_.next_filter_in_pipeline, "filter list broken?" );
630         filter_.next_filter_in_pipeline->prev_filter_in_pipeline = filter_.prev_filter_in_pipeline;
631     }
632     if( internal::input_buffer* b = filter_.my_input_buffer ) {
633         delete b;
634         filter_.my_input_buffer = NULL;
635     }
636     filter_.next_filter_in_pipeline = filter_.prev_filter_in_pipeline = filter::not_in_pipeline();
637     if ( (filter_.my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(5) )
638         filter_.next_segment = NULL;
639     filter_.my_pipeline = NULL;
640 }
641 
run(size_t max_number_of_live_tokens,tbb::task_group_context & context)642 void pipeline::run( size_t max_number_of_live_tokens
643 #if __TBB_TASK_GROUP_CONTEXT
644     , tbb::task_group_context& context
645 #endif
646     ) {
647     __TBB_ASSERT( max_number_of_live_tokens>0, "pipeline::run must have at least one token" );
648     __TBB_ASSERT( !end_counter, "pipeline already running?" );
649     if( filter_list ) {
650         internal::pipeline_cleaner my_pipeline_cleaner(*this);
651         end_of_input = false;
652         input_tokens = internal::Token(max_number_of_live_tokens);
653         if(has_thread_bound_filters) {
654             // release input filter if thread-bound
655             if(filter_list->is_bound()) {
656                 filter_list->my_input_buffer->sema_V();
657             }
658         }
659 #if __TBB_TASK_GROUP_CONTEXT
660         end_counter = new( task::allocate_root(context) ) internal::pipeline_root_task( *this );
661 #else
662         end_counter = new( task::allocate_root() ) internal::pipeline_root_task( *this );
663 #endif
664         // Start execution of tasks
665         task::spawn_root_and_wait( *end_counter );
666 
667         if(has_thread_bound_filters) {
668             for(filter* f = filter_list->next_filter_in_pipeline; f; f=f->next_filter_in_pipeline) {
669                 if(f->is_bound()) {
670                     f->my_input_buffer->sema_V(); // wake to end
671                 }
672             }
673         }
674     }
675 }
676 
677 #if __TBB_TASK_GROUP_CONTEXT
run(size_t max_number_of_live_tokens)678 void pipeline::run( size_t max_number_of_live_tokens ) {
679     if( filter_list ) {
680         // Construct task group context with the exception propagation mode expected
681         // by the pipeline caller.
682         uintptr_t ctx_traits = filter_list->my_filter_mode & filter::exact_exception_propagation ?
683                 task_group_context::default_traits :
684                 task_group_context::default_traits & ~task_group_context::exact_exception;
685         task_group_context context(task_group_context::bound, ctx_traits);
686         run(max_number_of_live_tokens, context);
687     }
688 }
689 #endif // __TBB_TASK_GROUP_CONTEXT
690 
has_more_work()691 bool filter::has_more_work() {
692     __TBB_ASSERT(my_pipeline, NULL);
693     __TBB_ASSERT(my_input_buffer, "has_more_work() called for filter with no input buffer");
694     return (internal::tokendiff_t)(my_pipeline->token_counter - my_input_buffer->low_token) != 0;
695 }
696 
~filter()697 filter::~filter() {
698     if ( (my_filter_mode & version_mask) >= __TBB_PIPELINE_VERSION(3) ) {
699         if ( next_filter_in_pipeline != filter::not_in_pipeline() )
700             my_pipeline->remove_filter(*this);
701         else
702             __TBB_ASSERT( prev_filter_in_pipeline == filter::not_in_pipeline(), "probably filter list is broken" );
703     } else {
704         __TBB_ASSERT( next_filter_in_pipeline==filter::not_in_pipeline(), "cannot destroy filter that is part of pipeline" );
705     }
706 }
707 
set_end_of_input()708 void filter::set_end_of_input() {
709     __TBB_ASSERT(my_input_buffer, NULL);
710     __TBB_ASSERT(object_may_be_null(), NULL);
711     if(is_serial()) {
712         my_pipeline->end_of_input = true;
713     } else {
714         __TBB_ASSERT(my_input_buffer->end_of_input_tls_allocated, NULL);
715         my_input_buffer->set_my_tls_end_of_input();
716     }
717 }
718 
process_item()719 thread_bound_filter::result_type thread_bound_filter::process_item() {
720     return internal_process_item(true);
721 }
722 
try_process_item()723 thread_bound_filter::result_type thread_bound_filter::try_process_item() {
724     return internal_process_item(false);
725 }
726 
internal_process_item(bool is_blocking)727 thread_bound_filter::result_type thread_bound_filter::internal_process_item(bool is_blocking) {
728     __TBB_ASSERT(my_pipeline != NULL,"It's not supposed that process_item is called for a filter that is not in a pipeline.");
729     internal::task_info info;
730     info.reset();
731 
732     if( my_pipeline->end_of_input && !has_more_work() )
733         return end_of_stream;
734 
735     if( !prev_filter_in_pipeline ) {
736         if( my_pipeline->end_of_input )
737             return end_of_stream;
738         while( my_pipeline->input_tokens == 0 ) {
739             if( !is_blocking )
740                 return item_not_available;
741             my_input_buffer->sema_P();
742         }
743         info.my_object = (*this)(info.my_object);
744         if( info.my_object ) {
745             __TBB_ASSERT(my_pipeline->input_tokens > 0, "Token failed in thread-bound filter");
746             my_pipeline->input_tokens--;
747             if( is_ordered() ) {
748                 info.my_token = my_pipeline->token_counter;
749                 info.my_token_ready = true;
750             }
751             my_pipeline->token_counter++; // ideally, with relaxed semantics
752         } else {
753             my_pipeline->end_of_input = true;
754             return end_of_stream;
755         }
756     } else { /* this is not an input filter */
757         while( !my_input_buffer->has_item() ) {
758             if( !is_blocking ) {
759                 return item_not_available;
760             }
761             my_input_buffer->sema_P();
762             if( my_pipeline->end_of_input && !has_more_work() ) {
763                 return end_of_stream;
764             }
765         }
766         if( !my_input_buffer->return_item(info, /*advance*/true) ) {
767             __TBB_ASSERT(false,"return_item failed");
768         }
769         info.my_object = (*this)(info.my_object);
770     }
771     if( next_filter_in_pipeline ) {
772         if ( !next_filter_in_pipeline->my_input_buffer->put_token(info,/*force_put=*/true) ) {
773             __TBB_ASSERT(false, "Couldn't put token after thread-bound buffer");
774         }
775     } else {
776         size_t ntokens_avail = ++(my_pipeline->input_tokens);
777         if( my_pipeline->filter_list->is_bound() ) {
778             if( ntokens_avail == 1 ) {
779                 my_pipeline->filter_list->my_input_buffer->sema_V();
780             }
781         }
782     }
783 
784     return success;
785 }
786 
787 } // tbb
788 
789