1 /*
2     Copyright 2005-2014 Intel Corporation.  All Rights Reserved.
3 
4     This file is part of Threading Building Blocks. Threading Building Blocks is free software;
5     you can redistribute it and/or modify it under the terms of the GNU General Public License
6     version 2  as  published  by  the  Free Software Foundation.  Threading Building Blocks is
7     distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
8     implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
9     See  the GNU General Public License for more details.   You should have received a copy of
10     the  GNU General Public License along with Threading Building Blocks; if not, write to the
11     Free Software Foundation, Inc.,  51 Franklin St,  Fifth Floor,  Boston,  MA 02110-1301 USA
12 
13     As a special exception,  you may use this file  as part of a free software library without
14     restriction.  Specifically,  if other files instantiate templates  or use macros or inline
15     functions from this file, or you compile this file and link it with other files to produce
16     an executable,  this file does not by itself cause the resulting executable to be covered
17     by the GNU General Public License. This exception does not however invalidate any other
18     reasons why the executable file might be covered by the GNU General Public License.
19 */
20 
21 #include "tbb/pipeline.h"
22 #include "tbb/spin_mutex.h"
23 #include "tbb/cache_aligned_allocator.h"
24 #include "itt_notify.h"
25 #include "semaphore.h"
26 #include "tls.h"  // for parallel filters that do not use NULL as end_of_input
27 
28 
29 namespace tbb {
30 
31 namespace internal {
32 
33 //! This structure is used to store task information in a input buffer
34 struct task_info {
35     void* my_object;
36     //! Invalid unless a task went through an ordered stage.
37     Token my_token;
38     //! False until my_token is set.
39     bool my_token_ready;
40     //! True if my_object is valid.
41     bool is_valid;
42     //! Set to initial state (no object, no token)
resettbb::internal::task_info43     void reset() {
44         my_object = NULL;
45         my_token = 0;
46         my_token_ready = false;
47         is_valid = false;
48     }
49 };
50 //! A buffer of input items for a filter.
51 /** Each item is a task_info, inserted into a position in the buffer corresponding to a Token. */
52 class input_buffer : no_copy {
53     friend class tbb::internal::pipeline_root_task;
54     friend class tbb::filter;
55     friend class tbb::thread_bound_filter;
56     friend class tbb::internal::stage_task;
57     friend class tbb::pipeline;
58 
59     typedef  Token  size_type;
60 
61     //! Array of deferred tasks that cannot yet start executing.
62     task_info* array;
63 
64     //! for thread-bound filter, semaphore for waiting, NULL otherwise.
65     semaphore* my_sem;
66 
67     //! Size of array
68     /** Always 0 or a power of 2 */
69     size_type array_size;
70 
71     //! Lowest token that can start executing.
72     /** All prior Token have already been seen. */
73     Token low_token;
74 
75     //! Serializes updates.
76     spin_mutex array_mutex;
77 
78     //! Resize "array".
79     /** Caller is responsible to acquiring a lock on "array_mutex". */
80     void grow( size_type minimum_size );
81 
82     //! Initial size for "array"
83     /** Must be a power of 2 */
84     static const size_type initial_buffer_size = 4;
85 
86     //! Used for out of order buffer, and for assigning my_token if is_ordered and my_token not already assigned
87     Token high_token;
88 
89     //! True for ordered filter, false otherwise.
90     bool is_ordered;
91 
92     //! True for thread-bound filter, false otherwise.
93     bool is_bound;
94 
95     //! for parallel filters that accepts NULLs, thread-local flag for reaching end_of_input
96     typedef basic_tls<intptr_t> end_of_input_tls_t;
97     end_of_input_tls_t end_of_input_tls;
98     bool end_of_input_tls_allocated; // no way to test pthread creation of TLS
99 
create_sema(size_t initial_tokens)100     void create_sema(size_t initial_tokens) { __TBB_ASSERT(!my_sem,NULL); my_sem = new internal::semaphore(initial_tokens); }
free_sema()101     void free_sema() { __TBB_ASSERT(my_sem,NULL); delete my_sem; }
sema_P()102     void sema_P() { __TBB_ASSERT(my_sem,NULL); my_sem->P(); }
sema_V()103     void sema_V() { __TBB_ASSERT(my_sem,NULL); my_sem->V(); }
104 
105 public:
106     //! Construct empty buffer.
input_buffer(bool is_ordered_,bool is_bound_)107     input_buffer( bool is_ordered_, bool is_bound_ ) :
108             array(NULL), my_sem(NULL), array_size(0),
109             low_token(0), high_token(0),
110             is_ordered(is_ordered_), is_bound(is_bound_),
111             end_of_input_tls_allocated(false) {
112         grow(initial_buffer_size);
113         __TBB_ASSERT( array, NULL );
114         if(is_bound) create_sema(0);
115     }
116 
117     //! Destroy the buffer.
~input_buffer()118     ~input_buffer() {
119         __TBB_ASSERT( array, NULL );
120         cache_aligned_allocator<task_info>().deallocate(array,array_size);
121         poison_pointer( array );
122         if(my_sem) {
123             free_sema();
124         }
125         if(end_of_input_tls_allocated) {
126             destroy_my_tls();
127         }
128     }
129 
130     //! Put a token into the buffer.
131     /** If task information was placed into buffer, returns true;
132         otherwise returns false, informing the caller to create and spawn a task.
133         If input buffer owned by thread-bound filter and the item at
134         low_token was not valid, issue a V()
135         If the input_buffer is owned by a successor to a thread-bound filter,
136         the force_put parameter should be true to ensure the token is inserted
137         in the buffer.
138     */
put_token(task_info & info_,bool force_put=false)139     bool put_token( task_info& info_, bool force_put = false ) {
140         {
141             info_.is_valid = true;
142             spin_mutex::scoped_lock lock( array_mutex );
143             Token token;
144             bool was_empty = !array[low_token&(array_size-1)].is_valid;
145             if( is_ordered ) {
146                 if( !info_.my_token_ready ) {
147                     info_.my_token = high_token++;
148                     info_.my_token_ready = true;
149                 }
150                 token = info_.my_token;
151             } else
152                 token = high_token++;
153             __TBB_ASSERT( (tokendiff_t)(token-low_token)>=0, NULL );
154             if( token!=low_token || is_bound || force_put ) {
155                 // Trying to put token that is beyond low_token.
156                 // Need to wait until low_token catches up before dispatching.
157                 if( token-low_token>=array_size )
158                     grow( token-low_token+1 );
159                 ITT_NOTIFY( sync_releasing, this );
160                 array[token&(array_size-1)] = info_;
161                 if(was_empty && is_bound) {
162                     sema_V();
163                 }
164                 return true;
165             }
166         }
167         return false;
168     }
169 
170     //! Note that processing of a token is finished.
171     /** Fires up processing of the next token, if processing was deferred. */
172     // Using template to avoid explicit dependency on stage_task
173     // this is only called for serial filters, and is the reason for the
174     // advance parameter in return_item (we're incrementing low_token here.)
175     // Non-TBF serial stages don't advance the token at the start because the presence
176     // of the current token in the buffer keeps another stage from being spawned.
177     template<typename StageTask>
note_done(Token token,StageTask & spawner)178     void note_done( Token token, StageTask& spawner ) {
179         task_info wakee;
180         wakee.reset();
181         {
182             spin_mutex::scoped_lock lock( array_mutex );
183             if( !is_ordered || token==low_token ) {
184                 // Wake the next task
185                 task_info& item = array[++low_token & (array_size-1)];
186                 ITT_NOTIFY( sync_acquired, this );
187                 wakee = item;
188                 item.is_valid = false;
189             }
190         }
191         if( wakee.is_valid )
192             spawner.spawn_stage_task(wakee);
193     }
194 
195 #if __TBB_TASK_GROUP_CONTEXT
196     //! The method destroys all data in filters to prevent memory leaks
clear(filter * my_filter)197     void clear( filter* my_filter ) {
198         long t=low_token;
199         for( size_type i=0; i<array_size; ++i, ++t ){
200             task_info& temp = array[t&(array_size-1)];
201             if (temp.is_valid ) {
202                 my_filter->finalize(temp.my_object);
203                 temp.is_valid = false;
204             }
205         }
206     }
207 #endif
208 
209     //! return an item, invalidate the queued item, but only advance if advance
210     //  advance == true for parallel filters.  If the filter is serial, leave the
211     // item in the buffer to keep another stage from being spawned.
return_item(task_info & info,bool advance)212     bool return_item(task_info& info, bool advance) {
213         spin_mutex::scoped_lock lock( array_mutex );
214         task_info& item = array[low_token&(array_size-1)];
215         ITT_NOTIFY( sync_acquired, this );
216         if( item.is_valid ) {
217             info = item;
218             item.is_valid = false;
219             if (advance) low_token++;
220             return true;
221         }
222         return false;
223     }
224 
225     //! true if the current low_token is valid.
has_item()226     bool has_item() { spin_mutex::scoped_lock lock(array_mutex); return array[low_token&(array_size -1)].is_valid; }
227 
228     // end_of_input signal for parallel_pipeline, parallel input filters with 0 tokens allowed.
create_my_tls()229     void create_my_tls() { int status = end_of_input_tls.create(); if(status) handle_perror(status, "TLS not allocated for filter"); end_of_input_tls_allocated = true; }
destroy_my_tls()230     void destroy_my_tls() { int status = end_of_input_tls.destroy(); if(status) handle_perror(status, "Failed to destroy filter TLS"); }
my_tls_end_of_input()231     bool my_tls_end_of_input() { return end_of_input_tls.get() != 0; }
set_my_tls_end_of_input()232     void set_my_tls_end_of_input() { end_of_input_tls.set(1); }
233 };
234 
grow(size_type minimum_size)235 void input_buffer::grow( size_type minimum_size ) {
236     size_type old_size = array_size;
237     size_type new_size = old_size ? 2*old_size : initial_buffer_size;
238     while( new_size<minimum_size )
239         new_size*=2;
240     task_info* new_array = cache_aligned_allocator<task_info>().allocate(new_size);
241     task_info* old_array = array;
242     for( size_type i=0; i<new_size; ++i )
243         new_array[i].is_valid = false;
244     long t=low_token;
245     for( size_type i=0; i<old_size; ++i, ++t )
246         new_array[t&(new_size-1)] = old_array[t&(old_size-1)];
247     array = new_array;
248     array_size = new_size;
249     if( old_array )
250         cache_aligned_allocator<task_info>().deallocate(old_array,old_size);
251 }
252 
253 class stage_task: public task, public task_info {
254 private:
255     friend class tbb::pipeline;
256     pipeline& my_pipeline;
257     filter* my_filter;
258     //! True if this task has not yet read the input.
259     bool my_at_start;
260 
261 public:
262     //! Construct stage_task for first stage in a pipeline.
263     /** Such a stage has not read any input yet. */
stage_task(pipeline & pipeline)264     stage_task( pipeline& pipeline ) :
265         my_pipeline(pipeline),
266         my_filter(pipeline.filter_list),
267         my_at_start(true)
268     {
269         task_info::reset();
270     }
271     //! Construct stage_task for a subsequent stage in a pipeline.
stage_task(pipeline & pipeline,filter * filter_,const task_info & info)272     stage_task( pipeline& pipeline, filter* filter_, const task_info& info ) :
273         task_info(info),
274         my_pipeline(pipeline),
275         my_filter(filter_),
276         my_at_start(false)
277     {}
278     //! Roughly equivalent to the constructor of input stage task
reset()279     void reset() {
280         task_info::reset();
281         my_filter = my_pipeline.filter_list;
282         my_at_start = true;
283     }
284     //! The virtual task execution method
285     /*override*/ task* execute();
286 #if __TBB_TASK_GROUP_CONTEXT
~stage_task()287     ~stage_task()
288     {
289         if (my_filter && my_object && (my_filter->my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(4)) {
290             __TBB_ASSERT(is_cancelled(), "Trying to finalize the task that wasn't cancelled");
291             my_filter->finalize(my_object);
292             my_object = NULL;
293         }
294     }
295 #endif // __TBB_TASK_GROUP_CONTEXT
296     //! Creates and spawns stage_task from task_info
spawn_stage_task(const task_info & info)297     void spawn_stage_task(const task_info& info)
298     {
299         stage_task* clone = new (allocate_additional_child_of(*parent()))
300                                 stage_task( my_pipeline, my_filter, info );
301         spawn(*clone);
302     }
303 };
304 
execute()305 task* stage_task::execute() {
306     __TBB_ASSERT( !my_at_start || !my_object, NULL );
307     __TBB_ASSERT( !my_filter->is_bound(), NULL );
308     if( my_at_start ) {
309         if( my_filter->is_serial() ) {
310             my_object = (*my_filter)(my_object);
311             if( my_object || ( my_filter->object_may_be_null() && !my_pipeline.end_of_input) )
312             {
313                 if( my_filter->is_ordered() ) {
314                     my_token = my_pipeline.token_counter++; // ideally, with relaxed semantics
315                     my_token_ready = true;
316                 } else if( (my_filter->my_filter_mode & my_filter->version_mask) >= __TBB_PIPELINE_VERSION(5) ) {
317                     if( my_pipeline.has_thread_bound_filters )
318                         my_pipeline.token_counter++; // ideally, with relaxed semantics
319                 }
320                 if( !my_filter->next_filter_in_pipeline ) { // we're only filter in pipeline
321                     reset();
322                     goto process_another_stage;
323                 } else {
324                     ITT_NOTIFY( sync_releasing, &my_pipeline.input_tokens );
325                     if( --my_pipeline.input_tokens>0 )
326                         spawn( *new( allocate_additional_child_of(*parent()) ) stage_task( my_pipeline ) );
327                 }
328             } else {
329                 my_pipeline.end_of_input = true;
330                 return NULL;
331             }
332         } else /*not is_serial*/ {
333             if( my_pipeline.end_of_input )
334                 return NULL;
335             if( (my_filter->my_filter_mode & my_filter->version_mask) >= __TBB_PIPELINE_VERSION(5) ) {
336                 if( my_pipeline.has_thread_bound_filters )
337                     my_pipeline.token_counter++;
338             }
339             ITT_NOTIFY( sync_releasing, &my_pipeline.input_tokens );
340             if( --my_pipeline.input_tokens>0 )
341                 spawn( *new( allocate_additional_child_of(*parent()) ) stage_task( my_pipeline ) );
342             my_object = (*my_filter)(my_object);
343             if( !my_object && (!my_filter->object_may_be_null() || my_filter->my_input_buffer->my_tls_end_of_input()) )
344             {
345                 my_pipeline.end_of_input = true;
346                 if( (my_filter->my_filter_mode & my_filter->version_mask) >= __TBB_PIPELINE_VERSION(5) ) {
347                     if( my_pipeline.has_thread_bound_filters )
348                         my_pipeline.token_counter--;  // fix token_counter
349                 }
350                 return NULL;
351             }
352         }
353         my_at_start = false;
354     } else {
355         my_object = (*my_filter)(my_object);
356         if( my_filter->is_serial() )
357             my_filter->my_input_buffer->note_done(my_token, *this);
358     }
359     my_filter = my_filter->next_filter_in_pipeline;
360     if( my_filter ) {
361         // There is another filter to execute.
362         if( my_filter->is_serial() ) {
363             // The next filter must execute tokens in order
364             if( my_filter->my_input_buffer->put_token(*this) ){
365                 // Can't proceed with the same item
366                 if( my_filter->is_bound() ) {
367                     // Find the next non-thread-bound filter
368                     do {
369                         my_filter = my_filter->next_filter_in_pipeline;
370                     } while( my_filter && my_filter->is_bound() );
371                     // Check if there is an item ready to process
372                     if( my_filter && my_filter->my_input_buffer->return_item(*this, !my_filter->is_serial()))
373                         goto process_another_stage;
374                 }
375                 my_filter = NULL; // To prevent deleting my_object twice if exception occurs
376                 return NULL;
377             }
378         }
379     } else {
380         // Reached end of the pipe.
381         size_t ntokens_avail = ++my_pipeline.input_tokens;
382         if(my_pipeline.filter_list->is_bound() ) {
383             if(ntokens_avail == 1) {
384                 my_pipeline.filter_list->my_input_buffer->sema_V();
385             }
386             return NULL;
387         }
388         if( ntokens_avail>1  // Only recycle if there is one available token
389                 || my_pipeline.end_of_input ) {
390             return NULL; // No need to recycle for new input
391         }
392         ITT_NOTIFY( sync_acquired, &my_pipeline.input_tokens );
393         // Recycle as an input stage task.
394         reset();
395     }
396 process_another_stage:
397     /* A semi-hackish way to reexecute the same task object immediately without spawning.
398        recycle_as_continuation marks the task for future execution,
399        and then 'this' pointer is returned to bypass spawning. */
400     recycle_as_continuation();
401     return this;
402 }
403 
404 class pipeline_root_task: public task {
405     pipeline& my_pipeline;
406     bool do_segment_scanning;
407 
execute()408     /*override*/ task* execute() {
409         if( !my_pipeline.end_of_input )
410             if( !my_pipeline.filter_list->is_bound() )
411                 if( my_pipeline.input_tokens > 0 ) {
412                     recycle_as_continuation();
413                     set_ref_count(1);
414                     return new( allocate_child() ) stage_task( my_pipeline );
415                 }
416         if( do_segment_scanning ) {
417             filter* current_filter = my_pipeline.filter_list->next_segment;
418             /* first non-thread-bound filter that follows thread-bound one
419             and may have valid items to process */
420             filter* first_suitable_filter = current_filter;
421             while( current_filter ) {
422                 __TBB_ASSERT( !current_filter->is_bound(), "filter is thread-bound?" );
423                 __TBB_ASSERT( current_filter->prev_filter_in_pipeline->is_bound(), "previous filter is not thread-bound?" );
424                 if( !my_pipeline.end_of_input || current_filter->has_more_work())
425                 {
426                     task_info info;
427                     info.reset();
428                     if( current_filter->my_input_buffer->return_item(info, !current_filter->is_serial()) ) {
429                         set_ref_count(1);
430                         recycle_as_continuation();
431                         return new( allocate_child() ) stage_task( my_pipeline, current_filter, info);
432                     }
433                     current_filter = current_filter->next_segment;
434                     if( !current_filter ) {
435                         if( !my_pipeline.end_of_input ) {
436                             recycle_as_continuation();
437                             return this;
438                         }
439                         current_filter = first_suitable_filter;
440                         __TBB_Yield();
441                     }
442                 } else {
443                     /* The preceding pipeline segment is empty.
444                     Fast-forward to the next post-TBF segment. */
445                     first_suitable_filter = first_suitable_filter->next_segment;
446                     current_filter = first_suitable_filter;
447                 }
448             } /* while( current_filter ) */
449             return NULL;
450         } else {
451             if( !my_pipeline.end_of_input ) {
452                 recycle_as_continuation();
453                 return this;
454             }
455             return NULL;
456         }
457     }
458 public:
pipeline_root_task(pipeline & pipeline)459     pipeline_root_task( pipeline& pipeline ): my_pipeline(pipeline), do_segment_scanning(false)
460     {
461         __TBB_ASSERT( my_pipeline.filter_list, NULL );
462         filter* first = my_pipeline.filter_list;
463         if( (first->my_filter_mode & first->version_mask) >= __TBB_PIPELINE_VERSION(5) ) {
464             // Scanning the pipeline for segments
465             filter* head_of_previous_segment = first;
466             for(  filter* subfilter=first->next_filter_in_pipeline;
467                   subfilter!=NULL;
468                   subfilter=subfilter->next_filter_in_pipeline )
469             {
470                 if( subfilter->prev_filter_in_pipeline->is_bound() && !subfilter->is_bound() ) {
471                     do_segment_scanning = true;
472                     head_of_previous_segment->next_segment = subfilter;
473                     head_of_previous_segment = subfilter;
474                 }
475             }
476         }
477     }
478 };
479 
480 #if _MSC_VER && !defined(__INTEL_COMPILER)
481     // Workaround for overzealous compiler warnings
482     // Suppress compiler warning about constant conditional expression
483     #pragma warning (disable: 4127)
484 #endif
485 
486 // The class destroys end_counter and clears all input buffers if pipeline was cancelled.
487 class pipeline_cleaner: internal::no_copy {
488     pipeline& my_pipeline;
489 public:
pipeline_cleaner(pipeline & _pipeline)490     pipeline_cleaner(pipeline& _pipeline) :
491         my_pipeline(_pipeline)
492     {}
~pipeline_cleaner()493     ~pipeline_cleaner(){
494 #if __TBB_TASK_GROUP_CONTEXT
495         if (my_pipeline.end_counter->is_cancelled()) // Pipeline was cancelled
496             my_pipeline.clear_filters();
497 #endif
498         my_pipeline.end_counter = NULL;
499     }
500 };
501 
502 } // namespace internal
503 
inject_token(task &)504 void pipeline::inject_token( task& ) {
505     __TBB_ASSERT(false,"illegal call to inject_token");
506 }
507 
508 #if __TBB_TASK_GROUP_CONTEXT
clear_filters()509 void pipeline::clear_filters() {
510     for( filter* f = filter_list; f; f = f->next_filter_in_pipeline ) {
511         if ((f->my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(4))
512             if( internal::input_buffer* b = f->my_input_buffer )
513                 b->clear(f);
514     }
515 }
516 #endif
517 
pipeline()518 pipeline::pipeline() :
519     filter_list(NULL),
520     filter_end(NULL),
521     end_counter(NULL),
522     end_of_input(false),
523     has_thread_bound_filters(false)
524 {
525     token_counter = 0;
526     input_tokens = 0;
527 }
528 
~pipeline()529 pipeline::~pipeline() {
530     clear();
531 }
532 
clear()533 void pipeline::clear() {
534     filter* next;
535     for( filter* f = filter_list; f; f=next ) {
536         if( internal::input_buffer* b = f->my_input_buffer ) {
537             delete b;
538             f->my_input_buffer = NULL;
539         }
540         next=f->next_filter_in_pipeline;
541         f->next_filter_in_pipeline = filter::not_in_pipeline();
542         if ( (f->my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(3) ) {
543             f->prev_filter_in_pipeline = filter::not_in_pipeline();
544             f->my_pipeline = NULL;
545         }
546         if ( (f->my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(5) )
547             f->next_segment = NULL;
548     }
549     filter_list = filter_end = NULL;
550 }
551 
add_filter(filter & filter_)552 void pipeline::add_filter( filter& filter_ ) {
553 #if TBB_USE_ASSERT
554     if ( (filter_.my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(3) )
555         __TBB_ASSERT( filter_.prev_filter_in_pipeline==filter::not_in_pipeline(), "filter already part of pipeline?" );
556     __TBB_ASSERT( filter_.next_filter_in_pipeline==filter::not_in_pipeline(), "filter already part of pipeline?" );
557     __TBB_ASSERT( !end_counter, "invocation of add_filter on running pipeline" );
558 #endif
559     if ( (filter_.my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(3) ) {
560         filter_.my_pipeline = this;
561         filter_.prev_filter_in_pipeline = filter_end;
562         if ( filter_list == NULL)
563             filter_list = &filter_;
564         else
565             filter_end->next_filter_in_pipeline = &filter_;
566         filter_.next_filter_in_pipeline = NULL;
567         filter_end = &filter_;
568     }
569     else
570     {
571         if( !filter_end )
572             filter_end = reinterpret_cast<filter*>(&filter_list);
573 
574         *reinterpret_cast<filter**>(filter_end) = &filter_;
575         filter_end = reinterpret_cast<filter*>(&filter_.next_filter_in_pipeline);
576         *reinterpret_cast<filter**>(filter_end) = NULL;
577     }
578     if( (filter_.my_filter_mode & filter_.version_mask) >= __TBB_PIPELINE_VERSION(5) ) {
579         if( filter_.is_serial() ) {
580             if( filter_.is_bound() )
581                 has_thread_bound_filters = true;
582             filter_.my_input_buffer = new internal::input_buffer( filter_.is_ordered(), filter_.is_bound() );
583         }
584         else {
585             if(filter_.prev_filter_in_pipeline) {
586                 if(filter_.prev_filter_in_pipeline->is_bound()) {
587                     // successors to bound filters must have an input_buffer
588                     filter_.my_input_buffer = new internal::input_buffer( /*is_ordered*/false, false );
589                 }
590             }
591             else {  // input filter
592                 if(filter_.object_may_be_null() ) {
593                     //TODO: buffer only needed to hold TLS; could improve
594                     filter_.my_input_buffer = new internal::input_buffer( /*is_ordered*/false, false );
595                     filter_.my_input_buffer->create_my_tls();
596                 }
597             }
598         }
599     } else {
600         if( filter_.is_serial() ) {
601             filter_.my_input_buffer = new internal::input_buffer( filter_.is_ordered(), false );
602         }
603     }
604 
605 }
606 
remove_filter(filter & filter_)607 void pipeline::remove_filter( filter& filter_ ) {
608     __TBB_ASSERT( filter_.prev_filter_in_pipeline!=filter::not_in_pipeline(), "filter not part of pipeline" );
609     __TBB_ASSERT( filter_.next_filter_in_pipeline!=filter::not_in_pipeline(), "filter not part of pipeline" );
610     __TBB_ASSERT( !end_counter, "invocation of remove_filter on running pipeline" );
611     if (&filter_ == filter_list)
612         filter_list = filter_.next_filter_in_pipeline;
613     else {
614         __TBB_ASSERT( filter_.prev_filter_in_pipeline, "filter list broken?" );
615         filter_.prev_filter_in_pipeline->next_filter_in_pipeline = filter_.next_filter_in_pipeline;
616     }
617     if (&filter_ == filter_end)
618         filter_end = filter_.prev_filter_in_pipeline;
619     else {
620         __TBB_ASSERT( filter_.next_filter_in_pipeline, "filter list broken?" );
621         filter_.next_filter_in_pipeline->prev_filter_in_pipeline = filter_.prev_filter_in_pipeline;
622     }
623     if( internal::input_buffer* b = filter_.my_input_buffer ) {
624         delete b;
625         filter_.my_input_buffer = NULL;
626     }
627     filter_.next_filter_in_pipeline = filter_.prev_filter_in_pipeline = filter::not_in_pipeline();
628     if ( (filter_.my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(5) )
629         filter_.next_segment = NULL;
630     filter_.my_pipeline = NULL;
631 }
632 
run(size_t max_number_of_live_tokens,tbb::task_group_context & context)633 void pipeline::run( size_t max_number_of_live_tokens
634 #if __TBB_TASK_GROUP_CONTEXT
635     , tbb::task_group_context& context
636 #endif
637     ) {
638     __TBB_ASSERT( max_number_of_live_tokens>0, "pipeline::run must have at least one token" );
639     __TBB_ASSERT( !end_counter, "pipeline already running?" );
640     if( filter_list ) {
641         internal::pipeline_cleaner my_pipeline_cleaner(*this);
642         end_of_input = false;
643         input_tokens = internal::Token(max_number_of_live_tokens);
644         if(has_thread_bound_filters) {
645             // release input filter if thread-bound
646             if(filter_list->is_bound()) {
647                 filter_list->my_input_buffer->sema_V();
648             }
649         }
650 #if __TBB_TASK_GROUP_CONTEXT
651         end_counter = new( task::allocate_root(context) ) internal::pipeline_root_task( *this );
652 #else
653         end_counter = new( task::allocate_root() ) internal::pipeline_root_task( *this );
654 #endif
655         // Start execution of tasks
656         task::spawn_root_and_wait( *end_counter );
657 
658         if(has_thread_bound_filters) {
659             for(filter* f = filter_list->next_filter_in_pipeline; f; f=f->next_filter_in_pipeline) {
660                 if(f->is_bound()) {
661                     f->my_input_buffer->sema_V(); // wake to end
662                 }
663             }
664         }
665     }
666 }
667 
668 #if __TBB_TASK_GROUP_CONTEXT
run(size_t max_number_of_live_tokens)669 void pipeline::run( size_t max_number_of_live_tokens ) {
670     if( filter_list ) {
671         // Construct task group context with the exception propagation mode expected
672         // by the pipeline caller.
673         uintptr_t ctx_traits = filter_list->my_filter_mode & filter::exact_exception_propagation ?
674                 task_group_context::default_traits :
675                 task_group_context::default_traits & ~task_group_context::exact_exception;
676         task_group_context context(task_group_context::bound, ctx_traits);
677         run(max_number_of_live_tokens, context);
678     }
679 }
680 #endif // __TBB_TASK_GROUP_CONTEXT
681 
has_more_work()682 bool filter::has_more_work() {
683     __TBB_ASSERT(my_pipeline, NULL);
684     __TBB_ASSERT(my_input_buffer, "has_more_work() called for filter with no input buffer");
685     return (internal::tokendiff_t)(my_pipeline->token_counter - my_input_buffer->low_token) != 0;
686 }
687 
~filter()688 filter::~filter() {
689     if ( (my_filter_mode & version_mask) >= __TBB_PIPELINE_VERSION(3) ) {
690         if ( next_filter_in_pipeline != filter::not_in_pipeline() )
691             my_pipeline->remove_filter(*this);
692         else
693             __TBB_ASSERT( prev_filter_in_pipeline == filter::not_in_pipeline(), "probably filter list is broken" );
694     } else {
695         __TBB_ASSERT( next_filter_in_pipeline==filter::not_in_pipeline(), "cannot destroy filter that is part of pipeline" );
696     }
697 }
698 
699 void
set_end_of_input()700 filter::set_end_of_input() {
701     __TBB_ASSERT(my_input_buffer, NULL);
702     __TBB_ASSERT(object_may_be_null(), NULL);
703     if(is_serial()) {
704         my_pipeline->end_of_input = true;
705     }
706     else {
707         __TBB_ASSERT(my_input_buffer->end_of_input_tls_allocated, NULL);
708         my_input_buffer->set_my_tls_end_of_input();
709     }
710 }
711 
process_item()712 thread_bound_filter::result_type thread_bound_filter::process_item() {
713     return internal_process_item(true);
714 }
715 
try_process_item()716 thread_bound_filter::result_type thread_bound_filter::try_process_item() {
717     return internal_process_item(false);
718 }
719 
internal_process_item(bool is_blocking)720 thread_bound_filter::result_type thread_bound_filter::internal_process_item(bool is_blocking) {
721     __TBB_ASSERT(my_pipeline != NULL,"It's not supposed that process_item is called for a filter that is not in a pipeline.");
722     internal::task_info info;
723     info.reset();
724 
725     if( my_pipeline->end_of_input && !has_more_work() )
726         return end_of_stream;
727 
728     if( !prev_filter_in_pipeline ) {
729         if( my_pipeline->end_of_input )
730             return end_of_stream;
731         while( my_pipeline->input_tokens == 0 ) {
732             if( !is_blocking )
733                 return item_not_available;
734             my_input_buffer->sema_P();
735         }
736         info.my_object = (*this)(info.my_object);
737         if( info.my_object ) {
738             __TBB_ASSERT(my_pipeline->input_tokens > 0, "Token failed in thread-bound filter");
739             my_pipeline->input_tokens--;
740             if( is_ordered() ) {
741                 info.my_token = my_pipeline->token_counter;
742                 info.my_token_ready = true;
743             }
744             my_pipeline->token_counter++; // ideally, with relaxed semantics
745         } else {
746             my_pipeline->end_of_input = true;
747             return end_of_stream;
748         }
749     } else { /* this is not an input filter */
750         while( !my_input_buffer->has_item() ) {
751             if( !is_blocking ) {
752                 return item_not_available;
753             }
754             my_input_buffer->sema_P();
755             if( my_pipeline->end_of_input && !has_more_work() ) {
756                 return end_of_stream;
757             }
758         }
759         if( !my_input_buffer->return_item(info, /*advance*/true) ) {
760             __TBB_ASSERT(false,"return_item failed");
761         }
762         info.my_object = (*this)(info.my_object);
763     }
764     if( next_filter_in_pipeline ) {
765         if ( !next_filter_in_pipeline->my_input_buffer->put_token(info,/*force_put=*/true) ) {
766             __TBB_ASSERT(false, "Couldn't put token after thread-bound buffer");
767         }
768     } else {
769         size_t ntokens_avail = ++(my_pipeline->input_tokens);
770         if( my_pipeline->filter_list->is_bound() ) {
771             if( ntokens_avail == 1 ) {
772                 my_pipeline->filter_list->my_input_buffer->sema_V();
773             }
774         }
775     }
776 
777     return success;
778 }
779 
780 } // tbb
781 
782