1 /*
2 Copyright 2005-2014 Intel Corporation. All Rights Reserved.
3
4 This file is part of Threading Building Blocks. Threading Building Blocks is free software;
5 you can redistribute it and/or modify it under the terms of the GNU General Public License
6 version 2 as published by the Free Software Foundation. Threading Building Blocks is
7 distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
8 implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
9 See the GNU General Public License for more details. You should have received a copy of
10 the GNU General Public License along with Threading Building Blocks; if not, write to the
11 Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
12
13 As a special exception, you may use this file as part of a free software library without
14 restriction. Specifically, if other files instantiate templates or use macros or inline
15 functions from this file, or you compile this file and link it with other files to produce
16 an executable, this file does not by itself cause the resulting executable to be covered
17 by the GNU General Public License. This exception does not however invalidate any other
18 reasons why the executable file might be covered by the GNU General Public License.
19 */
20
21 #include "tbb/pipeline.h"
22 #include "tbb/spin_mutex.h"
23 #include "tbb/cache_aligned_allocator.h"
24 #include "itt_notify.h"
25 #include "semaphore.h"
26 #include "tls.h" // for parallel filters that do not use NULL as end_of_input
27
28
29 namespace tbb {
30
31 namespace internal {
32
33 //! This structure is used to store task information in a input buffer
34 struct task_info {
35 void* my_object;
36 //! Invalid unless a task went through an ordered stage.
37 Token my_token;
38 //! False until my_token is set.
39 bool my_token_ready;
40 //! True if my_object is valid.
41 bool is_valid;
42 //! Set to initial state (no object, no token)
resettbb::internal::task_info43 void reset() {
44 my_object = NULL;
45 my_token = 0;
46 my_token_ready = false;
47 is_valid = false;
48 }
49 };
50 //! A buffer of input items for a filter.
51 /** Each item is a task_info, inserted into a position in the buffer corresponding to a Token. */
52 class input_buffer : no_copy {
53 friend class tbb::internal::pipeline_root_task;
54 friend class tbb::filter;
55 friend class tbb::thread_bound_filter;
56 friend class tbb::internal::stage_task;
57 friend class tbb::pipeline;
58
59 typedef Token size_type;
60
61 //! Array of deferred tasks that cannot yet start executing.
62 task_info* array;
63
64 //! for thread-bound filter, semaphore for waiting, NULL otherwise.
65 semaphore* my_sem;
66
67 //! Size of array
68 /** Always 0 or a power of 2 */
69 size_type array_size;
70
71 //! Lowest token that can start executing.
72 /** All prior Token have already been seen. */
73 Token low_token;
74
75 //! Serializes updates.
76 spin_mutex array_mutex;
77
78 //! Resize "array".
79 /** Caller is responsible to acquiring a lock on "array_mutex". */
80 void grow( size_type minimum_size );
81
82 //! Initial size for "array"
83 /** Must be a power of 2 */
84 static const size_type initial_buffer_size = 4;
85
86 //! Used for out of order buffer, and for assigning my_token if is_ordered and my_token not already assigned
87 Token high_token;
88
89 //! True for ordered filter, false otherwise.
90 bool is_ordered;
91
92 //! True for thread-bound filter, false otherwise.
93 bool is_bound;
94
95 //! for parallel filters that accepts NULLs, thread-local flag for reaching end_of_input
96 typedef basic_tls<intptr_t> end_of_input_tls_t;
97 end_of_input_tls_t end_of_input_tls;
98 bool end_of_input_tls_allocated; // no way to test pthread creation of TLS
99
create_sema(size_t initial_tokens)100 void create_sema(size_t initial_tokens) { __TBB_ASSERT(!my_sem,NULL); my_sem = new internal::semaphore(initial_tokens); }
free_sema()101 void free_sema() { __TBB_ASSERT(my_sem,NULL); delete my_sem; }
sema_P()102 void sema_P() { __TBB_ASSERT(my_sem,NULL); my_sem->P(); }
sema_V()103 void sema_V() { __TBB_ASSERT(my_sem,NULL); my_sem->V(); }
104
105 public:
106 //! Construct empty buffer.
input_buffer(bool is_ordered_,bool is_bound_)107 input_buffer( bool is_ordered_, bool is_bound_ ) :
108 array(NULL), my_sem(NULL), array_size(0),
109 low_token(0), high_token(0),
110 is_ordered(is_ordered_), is_bound(is_bound_),
111 end_of_input_tls_allocated(false) {
112 grow(initial_buffer_size);
113 __TBB_ASSERT( array, NULL );
114 if(is_bound) create_sema(0);
115 }
116
117 //! Destroy the buffer.
~input_buffer()118 ~input_buffer() {
119 __TBB_ASSERT( array, NULL );
120 cache_aligned_allocator<task_info>().deallocate(array,array_size);
121 poison_pointer( array );
122 if(my_sem) {
123 free_sema();
124 }
125 if(end_of_input_tls_allocated) {
126 destroy_my_tls();
127 }
128 }
129
130 //! Put a token into the buffer.
131 /** If task information was placed into buffer, returns true;
132 otherwise returns false, informing the caller to create and spawn a task.
133 If input buffer owned by thread-bound filter and the item at
134 low_token was not valid, issue a V()
135 If the input_buffer is owned by a successor to a thread-bound filter,
136 the force_put parameter should be true to ensure the token is inserted
137 in the buffer.
138 */
put_token(task_info & info_,bool force_put=false)139 bool put_token( task_info& info_, bool force_put = false ) {
140 {
141 info_.is_valid = true;
142 spin_mutex::scoped_lock lock( array_mutex );
143 Token token;
144 bool was_empty = !array[low_token&(array_size-1)].is_valid;
145 if( is_ordered ) {
146 if( !info_.my_token_ready ) {
147 info_.my_token = high_token++;
148 info_.my_token_ready = true;
149 }
150 token = info_.my_token;
151 } else
152 token = high_token++;
153 __TBB_ASSERT( (tokendiff_t)(token-low_token)>=0, NULL );
154 if( token!=low_token || is_bound || force_put ) {
155 // Trying to put token that is beyond low_token.
156 // Need to wait until low_token catches up before dispatching.
157 if( token-low_token>=array_size )
158 grow( token-low_token+1 );
159 ITT_NOTIFY( sync_releasing, this );
160 array[token&(array_size-1)] = info_;
161 if(was_empty && is_bound) {
162 sema_V();
163 }
164 return true;
165 }
166 }
167 return false;
168 }
169
170 //! Note that processing of a token is finished.
171 /** Fires up processing of the next token, if processing was deferred. */
172 // Using template to avoid explicit dependency on stage_task
173 // this is only called for serial filters, and is the reason for the
174 // advance parameter in return_item (we're incrementing low_token here.)
175 // Non-TBF serial stages don't advance the token at the start because the presence
176 // of the current token in the buffer keeps another stage from being spawned.
177 template<typename StageTask>
note_done(Token token,StageTask & spawner)178 void note_done( Token token, StageTask& spawner ) {
179 task_info wakee;
180 wakee.reset();
181 {
182 spin_mutex::scoped_lock lock( array_mutex );
183 if( !is_ordered || token==low_token ) {
184 // Wake the next task
185 task_info& item = array[++low_token & (array_size-1)];
186 ITT_NOTIFY( sync_acquired, this );
187 wakee = item;
188 item.is_valid = false;
189 }
190 }
191 if( wakee.is_valid )
192 spawner.spawn_stage_task(wakee);
193 }
194
195 #if __TBB_TASK_GROUP_CONTEXT
196 //! The method destroys all data in filters to prevent memory leaks
clear(filter * my_filter)197 void clear( filter* my_filter ) {
198 long t=low_token;
199 for( size_type i=0; i<array_size; ++i, ++t ){
200 task_info& temp = array[t&(array_size-1)];
201 if (temp.is_valid ) {
202 my_filter->finalize(temp.my_object);
203 temp.is_valid = false;
204 }
205 }
206 }
207 #endif
208
209 //! return an item, invalidate the queued item, but only advance if advance
210 // advance == true for parallel filters. If the filter is serial, leave the
211 // item in the buffer to keep another stage from being spawned.
return_item(task_info & info,bool advance)212 bool return_item(task_info& info, bool advance) {
213 spin_mutex::scoped_lock lock( array_mutex );
214 task_info& item = array[low_token&(array_size-1)];
215 ITT_NOTIFY( sync_acquired, this );
216 if( item.is_valid ) {
217 info = item;
218 item.is_valid = false;
219 if (advance) low_token++;
220 return true;
221 }
222 return false;
223 }
224
225 //! true if the current low_token is valid.
has_item()226 bool has_item() { spin_mutex::scoped_lock lock(array_mutex); return array[low_token&(array_size -1)].is_valid; }
227
228 // end_of_input signal for parallel_pipeline, parallel input filters with 0 tokens allowed.
create_my_tls()229 void create_my_tls() { int status = end_of_input_tls.create(); if(status) handle_perror(status, "TLS not allocated for filter"); end_of_input_tls_allocated = true; }
destroy_my_tls()230 void destroy_my_tls() { int status = end_of_input_tls.destroy(); if(status) handle_perror(status, "Failed to destroy filter TLS"); }
my_tls_end_of_input()231 bool my_tls_end_of_input() { return end_of_input_tls.get() != 0; }
set_my_tls_end_of_input()232 void set_my_tls_end_of_input() { end_of_input_tls.set(1); }
233 };
234
grow(size_type minimum_size)235 void input_buffer::grow( size_type minimum_size ) {
236 size_type old_size = array_size;
237 size_type new_size = old_size ? 2*old_size : initial_buffer_size;
238 while( new_size<minimum_size )
239 new_size*=2;
240 task_info* new_array = cache_aligned_allocator<task_info>().allocate(new_size);
241 task_info* old_array = array;
242 for( size_type i=0; i<new_size; ++i )
243 new_array[i].is_valid = false;
244 long t=low_token;
245 for( size_type i=0; i<old_size; ++i, ++t )
246 new_array[t&(new_size-1)] = old_array[t&(old_size-1)];
247 array = new_array;
248 array_size = new_size;
249 if( old_array )
250 cache_aligned_allocator<task_info>().deallocate(old_array,old_size);
251 }
252
253 class stage_task: public task, public task_info {
254 private:
255 friend class tbb::pipeline;
256 pipeline& my_pipeline;
257 filter* my_filter;
258 //! True if this task has not yet read the input.
259 bool my_at_start;
260
261 public:
262 //! Construct stage_task for first stage in a pipeline.
263 /** Such a stage has not read any input yet. */
stage_task(pipeline & pipeline)264 stage_task( pipeline& pipeline ) :
265 my_pipeline(pipeline),
266 my_filter(pipeline.filter_list),
267 my_at_start(true)
268 {
269 task_info::reset();
270 }
271 //! Construct stage_task for a subsequent stage in a pipeline.
stage_task(pipeline & pipeline,filter * filter_,const task_info & info)272 stage_task( pipeline& pipeline, filter* filter_, const task_info& info ) :
273 task_info(info),
274 my_pipeline(pipeline),
275 my_filter(filter_),
276 my_at_start(false)
277 {}
278 //! Roughly equivalent to the constructor of input stage task
reset()279 void reset() {
280 task_info::reset();
281 my_filter = my_pipeline.filter_list;
282 my_at_start = true;
283 }
284 //! The virtual task execution method
285 /*override*/ task* execute();
286 #if __TBB_TASK_GROUP_CONTEXT
~stage_task()287 ~stage_task()
288 {
289 if (my_filter && my_object && (my_filter->my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(4)) {
290 __TBB_ASSERT(is_cancelled(), "Trying to finalize the task that wasn't cancelled");
291 my_filter->finalize(my_object);
292 my_object = NULL;
293 }
294 }
295 #endif // __TBB_TASK_GROUP_CONTEXT
296 //! Creates and spawns stage_task from task_info
spawn_stage_task(const task_info & info)297 void spawn_stage_task(const task_info& info)
298 {
299 stage_task* clone = new (allocate_additional_child_of(*parent()))
300 stage_task( my_pipeline, my_filter, info );
301 spawn(*clone);
302 }
303 };
304
execute()305 task* stage_task::execute() {
306 __TBB_ASSERT( !my_at_start || !my_object, NULL );
307 __TBB_ASSERT( !my_filter->is_bound(), NULL );
308 if( my_at_start ) {
309 if( my_filter->is_serial() ) {
310 my_object = (*my_filter)(my_object);
311 if( my_object || ( my_filter->object_may_be_null() && !my_pipeline.end_of_input) )
312 {
313 if( my_filter->is_ordered() ) {
314 my_token = my_pipeline.token_counter++; // ideally, with relaxed semantics
315 my_token_ready = true;
316 } else if( (my_filter->my_filter_mode & my_filter->version_mask) >= __TBB_PIPELINE_VERSION(5) ) {
317 if( my_pipeline.has_thread_bound_filters )
318 my_pipeline.token_counter++; // ideally, with relaxed semantics
319 }
320 if( !my_filter->next_filter_in_pipeline ) { // we're only filter in pipeline
321 reset();
322 goto process_another_stage;
323 } else {
324 ITT_NOTIFY( sync_releasing, &my_pipeline.input_tokens );
325 if( --my_pipeline.input_tokens>0 )
326 spawn( *new( allocate_additional_child_of(*parent()) ) stage_task( my_pipeline ) );
327 }
328 } else {
329 my_pipeline.end_of_input = true;
330 return NULL;
331 }
332 } else /*not is_serial*/ {
333 if( my_pipeline.end_of_input )
334 return NULL;
335 if( (my_filter->my_filter_mode & my_filter->version_mask) >= __TBB_PIPELINE_VERSION(5) ) {
336 if( my_pipeline.has_thread_bound_filters )
337 my_pipeline.token_counter++;
338 }
339 ITT_NOTIFY( sync_releasing, &my_pipeline.input_tokens );
340 if( --my_pipeline.input_tokens>0 )
341 spawn( *new( allocate_additional_child_of(*parent()) ) stage_task( my_pipeline ) );
342 my_object = (*my_filter)(my_object);
343 if( !my_object && (!my_filter->object_may_be_null() || my_filter->my_input_buffer->my_tls_end_of_input()) )
344 {
345 my_pipeline.end_of_input = true;
346 if( (my_filter->my_filter_mode & my_filter->version_mask) >= __TBB_PIPELINE_VERSION(5) ) {
347 if( my_pipeline.has_thread_bound_filters )
348 my_pipeline.token_counter--; // fix token_counter
349 }
350 return NULL;
351 }
352 }
353 my_at_start = false;
354 } else {
355 my_object = (*my_filter)(my_object);
356 if( my_filter->is_serial() )
357 my_filter->my_input_buffer->note_done(my_token, *this);
358 }
359 my_filter = my_filter->next_filter_in_pipeline;
360 if( my_filter ) {
361 // There is another filter to execute.
362 if( my_filter->is_serial() ) {
363 // The next filter must execute tokens in order
364 if( my_filter->my_input_buffer->put_token(*this) ){
365 // Can't proceed with the same item
366 if( my_filter->is_bound() ) {
367 // Find the next non-thread-bound filter
368 do {
369 my_filter = my_filter->next_filter_in_pipeline;
370 } while( my_filter && my_filter->is_bound() );
371 // Check if there is an item ready to process
372 if( my_filter && my_filter->my_input_buffer->return_item(*this, !my_filter->is_serial()))
373 goto process_another_stage;
374 }
375 my_filter = NULL; // To prevent deleting my_object twice if exception occurs
376 return NULL;
377 }
378 }
379 } else {
380 // Reached end of the pipe.
381 size_t ntokens_avail = ++my_pipeline.input_tokens;
382 if(my_pipeline.filter_list->is_bound() ) {
383 if(ntokens_avail == 1) {
384 my_pipeline.filter_list->my_input_buffer->sema_V();
385 }
386 return NULL;
387 }
388 if( ntokens_avail>1 // Only recycle if there is one available token
389 || my_pipeline.end_of_input ) {
390 return NULL; // No need to recycle for new input
391 }
392 ITT_NOTIFY( sync_acquired, &my_pipeline.input_tokens );
393 // Recycle as an input stage task.
394 reset();
395 }
396 process_another_stage:
397 /* A semi-hackish way to reexecute the same task object immediately without spawning.
398 recycle_as_continuation marks the task for future execution,
399 and then 'this' pointer is returned to bypass spawning. */
400 recycle_as_continuation();
401 return this;
402 }
403
404 class pipeline_root_task: public task {
405 pipeline& my_pipeline;
406 bool do_segment_scanning;
407
execute()408 /*override*/ task* execute() {
409 if( !my_pipeline.end_of_input )
410 if( !my_pipeline.filter_list->is_bound() )
411 if( my_pipeline.input_tokens > 0 ) {
412 recycle_as_continuation();
413 set_ref_count(1);
414 return new( allocate_child() ) stage_task( my_pipeline );
415 }
416 if( do_segment_scanning ) {
417 filter* current_filter = my_pipeline.filter_list->next_segment;
418 /* first non-thread-bound filter that follows thread-bound one
419 and may have valid items to process */
420 filter* first_suitable_filter = current_filter;
421 while( current_filter ) {
422 __TBB_ASSERT( !current_filter->is_bound(), "filter is thread-bound?" );
423 __TBB_ASSERT( current_filter->prev_filter_in_pipeline->is_bound(), "previous filter is not thread-bound?" );
424 if( !my_pipeline.end_of_input || current_filter->has_more_work())
425 {
426 task_info info;
427 info.reset();
428 if( current_filter->my_input_buffer->return_item(info, !current_filter->is_serial()) ) {
429 set_ref_count(1);
430 recycle_as_continuation();
431 return new( allocate_child() ) stage_task( my_pipeline, current_filter, info);
432 }
433 current_filter = current_filter->next_segment;
434 if( !current_filter ) {
435 if( !my_pipeline.end_of_input ) {
436 recycle_as_continuation();
437 return this;
438 }
439 current_filter = first_suitable_filter;
440 __TBB_Yield();
441 }
442 } else {
443 /* The preceding pipeline segment is empty.
444 Fast-forward to the next post-TBF segment. */
445 first_suitable_filter = first_suitable_filter->next_segment;
446 current_filter = first_suitable_filter;
447 }
448 } /* while( current_filter ) */
449 return NULL;
450 } else {
451 if( !my_pipeline.end_of_input ) {
452 recycle_as_continuation();
453 return this;
454 }
455 return NULL;
456 }
457 }
458 public:
pipeline_root_task(pipeline & pipeline)459 pipeline_root_task( pipeline& pipeline ): my_pipeline(pipeline), do_segment_scanning(false)
460 {
461 __TBB_ASSERT( my_pipeline.filter_list, NULL );
462 filter* first = my_pipeline.filter_list;
463 if( (first->my_filter_mode & first->version_mask) >= __TBB_PIPELINE_VERSION(5) ) {
464 // Scanning the pipeline for segments
465 filter* head_of_previous_segment = first;
466 for( filter* subfilter=first->next_filter_in_pipeline;
467 subfilter!=NULL;
468 subfilter=subfilter->next_filter_in_pipeline )
469 {
470 if( subfilter->prev_filter_in_pipeline->is_bound() && !subfilter->is_bound() ) {
471 do_segment_scanning = true;
472 head_of_previous_segment->next_segment = subfilter;
473 head_of_previous_segment = subfilter;
474 }
475 }
476 }
477 }
478 };
479
480 #if _MSC_VER && !defined(__INTEL_COMPILER)
481 // Workaround for overzealous compiler warnings
482 // Suppress compiler warning about constant conditional expression
483 #pragma warning (disable: 4127)
484 #endif
485
486 // The class destroys end_counter and clears all input buffers if pipeline was cancelled.
487 class pipeline_cleaner: internal::no_copy {
488 pipeline& my_pipeline;
489 public:
pipeline_cleaner(pipeline & _pipeline)490 pipeline_cleaner(pipeline& _pipeline) :
491 my_pipeline(_pipeline)
492 {}
~pipeline_cleaner()493 ~pipeline_cleaner(){
494 #if __TBB_TASK_GROUP_CONTEXT
495 if (my_pipeline.end_counter->is_cancelled()) // Pipeline was cancelled
496 my_pipeline.clear_filters();
497 #endif
498 my_pipeline.end_counter = NULL;
499 }
500 };
501
502 } // namespace internal
503
inject_token(task &)504 void pipeline::inject_token( task& ) {
505 __TBB_ASSERT(false,"illegal call to inject_token");
506 }
507
508 #if __TBB_TASK_GROUP_CONTEXT
clear_filters()509 void pipeline::clear_filters() {
510 for( filter* f = filter_list; f; f = f->next_filter_in_pipeline ) {
511 if ((f->my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(4))
512 if( internal::input_buffer* b = f->my_input_buffer )
513 b->clear(f);
514 }
515 }
516 #endif
517
pipeline()518 pipeline::pipeline() :
519 filter_list(NULL),
520 filter_end(NULL),
521 end_counter(NULL),
522 end_of_input(false),
523 has_thread_bound_filters(false)
524 {
525 token_counter = 0;
526 input_tokens = 0;
527 }
528
~pipeline()529 pipeline::~pipeline() {
530 clear();
531 }
532
clear()533 void pipeline::clear() {
534 filter* next;
535 for( filter* f = filter_list; f; f=next ) {
536 if( internal::input_buffer* b = f->my_input_buffer ) {
537 delete b;
538 f->my_input_buffer = NULL;
539 }
540 next=f->next_filter_in_pipeline;
541 f->next_filter_in_pipeline = filter::not_in_pipeline();
542 if ( (f->my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(3) ) {
543 f->prev_filter_in_pipeline = filter::not_in_pipeline();
544 f->my_pipeline = NULL;
545 }
546 if ( (f->my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(5) )
547 f->next_segment = NULL;
548 }
549 filter_list = filter_end = NULL;
550 }
551
add_filter(filter & filter_)552 void pipeline::add_filter( filter& filter_ ) {
553 #if TBB_USE_ASSERT
554 if ( (filter_.my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(3) )
555 __TBB_ASSERT( filter_.prev_filter_in_pipeline==filter::not_in_pipeline(), "filter already part of pipeline?" );
556 __TBB_ASSERT( filter_.next_filter_in_pipeline==filter::not_in_pipeline(), "filter already part of pipeline?" );
557 __TBB_ASSERT( !end_counter, "invocation of add_filter on running pipeline" );
558 #endif
559 if ( (filter_.my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(3) ) {
560 filter_.my_pipeline = this;
561 filter_.prev_filter_in_pipeline = filter_end;
562 if ( filter_list == NULL)
563 filter_list = &filter_;
564 else
565 filter_end->next_filter_in_pipeline = &filter_;
566 filter_.next_filter_in_pipeline = NULL;
567 filter_end = &filter_;
568 }
569 else
570 {
571 if( !filter_end )
572 filter_end = reinterpret_cast<filter*>(&filter_list);
573
574 *reinterpret_cast<filter**>(filter_end) = &filter_;
575 filter_end = reinterpret_cast<filter*>(&filter_.next_filter_in_pipeline);
576 *reinterpret_cast<filter**>(filter_end) = NULL;
577 }
578 if( (filter_.my_filter_mode & filter_.version_mask) >= __TBB_PIPELINE_VERSION(5) ) {
579 if( filter_.is_serial() ) {
580 if( filter_.is_bound() )
581 has_thread_bound_filters = true;
582 filter_.my_input_buffer = new internal::input_buffer( filter_.is_ordered(), filter_.is_bound() );
583 }
584 else {
585 if(filter_.prev_filter_in_pipeline) {
586 if(filter_.prev_filter_in_pipeline->is_bound()) {
587 // successors to bound filters must have an input_buffer
588 filter_.my_input_buffer = new internal::input_buffer( /*is_ordered*/false, false );
589 }
590 }
591 else { // input filter
592 if(filter_.object_may_be_null() ) {
593 //TODO: buffer only needed to hold TLS; could improve
594 filter_.my_input_buffer = new internal::input_buffer( /*is_ordered*/false, false );
595 filter_.my_input_buffer->create_my_tls();
596 }
597 }
598 }
599 } else {
600 if( filter_.is_serial() ) {
601 filter_.my_input_buffer = new internal::input_buffer( filter_.is_ordered(), false );
602 }
603 }
604
605 }
606
remove_filter(filter & filter_)607 void pipeline::remove_filter( filter& filter_ ) {
608 __TBB_ASSERT( filter_.prev_filter_in_pipeline!=filter::not_in_pipeline(), "filter not part of pipeline" );
609 __TBB_ASSERT( filter_.next_filter_in_pipeline!=filter::not_in_pipeline(), "filter not part of pipeline" );
610 __TBB_ASSERT( !end_counter, "invocation of remove_filter on running pipeline" );
611 if (&filter_ == filter_list)
612 filter_list = filter_.next_filter_in_pipeline;
613 else {
614 __TBB_ASSERT( filter_.prev_filter_in_pipeline, "filter list broken?" );
615 filter_.prev_filter_in_pipeline->next_filter_in_pipeline = filter_.next_filter_in_pipeline;
616 }
617 if (&filter_ == filter_end)
618 filter_end = filter_.prev_filter_in_pipeline;
619 else {
620 __TBB_ASSERT( filter_.next_filter_in_pipeline, "filter list broken?" );
621 filter_.next_filter_in_pipeline->prev_filter_in_pipeline = filter_.prev_filter_in_pipeline;
622 }
623 if( internal::input_buffer* b = filter_.my_input_buffer ) {
624 delete b;
625 filter_.my_input_buffer = NULL;
626 }
627 filter_.next_filter_in_pipeline = filter_.prev_filter_in_pipeline = filter::not_in_pipeline();
628 if ( (filter_.my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(5) )
629 filter_.next_segment = NULL;
630 filter_.my_pipeline = NULL;
631 }
632
run(size_t max_number_of_live_tokens,tbb::task_group_context & context)633 void pipeline::run( size_t max_number_of_live_tokens
634 #if __TBB_TASK_GROUP_CONTEXT
635 , tbb::task_group_context& context
636 #endif
637 ) {
638 __TBB_ASSERT( max_number_of_live_tokens>0, "pipeline::run must have at least one token" );
639 __TBB_ASSERT( !end_counter, "pipeline already running?" );
640 if( filter_list ) {
641 internal::pipeline_cleaner my_pipeline_cleaner(*this);
642 end_of_input = false;
643 input_tokens = internal::Token(max_number_of_live_tokens);
644 if(has_thread_bound_filters) {
645 // release input filter if thread-bound
646 if(filter_list->is_bound()) {
647 filter_list->my_input_buffer->sema_V();
648 }
649 }
650 #if __TBB_TASK_GROUP_CONTEXT
651 end_counter = new( task::allocate_root(context) ) internal::pipeline_root_task( *this );
652 #else
653 end_counter = new( task::allocate_root() ) internal::pipeline_root_task( *this );
654 #endif
655 // Start execution of tasks
656 task::spawn_root_and_wait( *end_counter );
657
658 if(has_thread_bound_filters) {
659 for(filter* f = filter_list->next_filter_in_pipeline; f; f=f->next_filter_in_pipeline) {
660 if(f->is_bound()) {
661 f->my_input_buffer->sema_V(); // wake to end
662 }
663 }
664 }
665 }
666 }
667
668 #if __TBB_TASK_GROUP_CONTEXT
run(size_t max_number_of_live_tokens)669 void pipeline::run( size_t max_number_of_live_tokens ) {
670 if( filter_list ) {
671 // Construct task group context with the exception propagation mode expected
672 // by the pipeline caller.
673 uintptr_t ctx_traits = filter_list->my_filter_mode & filter::exact_exception_propagation ?
674 task_group_context::default_traits :
675 task_group_context::default_traits & ~task_group_context::exact_exception;
676 task_group_context context(task_group_context::bound, ctx_traits);
677 run(max_number_of_live_tokens, context);
678 }
679 }
680 #endif // __TBB_TASK_GROUP_CONTEXT
681
has_more_work()682 bool filter::has_more_work() {
683 __TBB_ASSERT(my_pipeline, NULL);
684 __TBB_ASSERT(my_input_buffer, "has_more_work() called for filter with no input buffer");
685 return (internal::tokendiff_t)(my_pipeline->token_counter - my_input_buffer->low_token) != 0;
686 }
687
~filter()688 filter::~filter() {
689 if ( (my_filter_mode & version_mask) >= __TBB_PIPELINE_VERSION(3) ) {
690 if ( next_filter_in_pipeline != filter::not_in_pipeline() )
691 my_pipeline->remove_filter(*this);
692 else
693 __TBB_ASSERT( prev_filter_in_pipeline == filter::not_in_pipeline(), "probably filter list is broken" );
694 } else {
695 __TBB_ASSERT( next_filter_in_pipeline==filter::not_in_pipeline(), "cannot destroy filter that is part of pipeline" );
696 }
697 }
698
699 void
set_end_of_input()700 filter::set_end_of_input() {
701 __TBB_ASSERT(my_input_buffer, NULL);
702 __TBB_ASSERT(object_may_be_null(), NULL);
703 if(is_serial()) {
704 my_pipeline->end_of_input = true;
705 }
706 else {
707 __TBB_ASSERT(my_input_buffer->end_of_input_tls_allocated, NULL);
708 my_input_buffer->set_my_tls_end_of_input();
709 }
710 }
711
process_item()712 thread_bound_filter::result_type thread_bound_filter::process_item() {
713 return internal_process_item(true);
714 }
715
try_process_item()716 thread_bound_filter::result_type thread_bound_filter::try_process_item() {
717 return internal_process_item(false);
718 }
719
internal_process_item(bool is_blocking)720 thread_bound_filter::result_type thread_bound_filter::internal_process_item(bool is_blocking) {
721 __TBB_ASSERT(my_pipeline != NULL,"It's not supposed that process_item is called for a filter that is not in a pipeline.");
722 internal::task_info info;
723 info.reset();
724
725 if( my_pipeline->end_of_input && !has_more_work() )
726 return end_of_stream;
727
728 if( !prev_filter_in_pipeline ) {
729 if( my_pipeline->end_of_input )
730 return end_of_stream;
731 while( my_pipeline->input_tokens == 0 ) {
732 if( !is_blocking )
733 return item_not_available;
734 my_input_buffer->sema_P();
735 }
736 info.my_object = (*this)(info.my_object);
737 if( info.my_object ) {
738 __TBB_ASSERT(my_pipeline->input_tokens > 0, "Token failed in thread-bound filter");
739 my_pipeline->input_tokens--;
740 if( is_ordered() ) {
741 info.my_token = my_pipeline->token_counter;
742 info.my_token_ready = true;
743 }
744 my_pipeline->token_counter++; // ideally, with relaxed semantics
745 } else {
746 my_pipeline->end_of_input = true;
747 return end_of_stream;
748 }
749 } else { /* this is not an input filter */
750 while( !my_input_buffer->has_item() ) {
751 if( !is_blocking ) {
752 return item_not_available;
753 }
754 my_input_buffer->sema_P();
755 if( my_pipeline->end_of_input && !has_more_work() ) {
756 return end_of_stream;
757 }
758 }
759 if( !my_input_buffer->return_item(info, /*advance*/true) ) {
760 __TBB_ASSERT(false,"return_item failed");
761 }
762 info.my_object = (*this)(info.my_object);
763 }
764 if( next_filter_in_pipeline ) {
765 if ( !next_filter_in_pipeline->my_input_buffer->put_token(info,/*force_put=*/true) ) {
766 __TBB_ASSERT(false, "Couldn't put token after thread-bound buffer");
767 }
768 } else {
769 size_t ntokens_avail = ++(my_pipeline->input_tokens);
770 if( my_pipeline->filter_list->is_bound() ) {
771 if( ntokens_avail == 1 ) {
772 my_pipeline->filter_list->my_input_buffer->sema_V();
773 }
774 }
775 }
776
777 return success;
778 }
779
780 } // tbb
781
782