1 /*
2 Copyright (c) 2005-2020 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #include "tbb/pipeline.h"
18 #include "tbb/spin_mutex.h"
19 #include "tbb/cache_aligned_allocator.h"
20 #include "itt_notify.h"
21 #include "semaphore.h"
22 #include "tls.h" // for parallel filters that do not use NULL as end_of_input
23
24
25 namespace tbb {
26
27 namespace internal {
28
29 //! This structure is used to store task information in a input buffer
30 struct task_info {
31 void* my_object;
32 //! Invalid unless a task went through an ordered stage.
33 Token my_token;
34 //! False until my_token is set.
35 bool my_token_ready;
36 //! True if my_object is valid.
37 bool is_valid;
38 //! Set to initial state (no object, no token)
resettbb::internal::task_info39 void reset() {
40 my_object = NULL;
41 my_token = 0;
42 my_token_ready = false;
43 is_valid = false;
44 }
45 };
46 //! A buffer of input items for a filter.
47 /** Each item is a task_info, inserted into a position in the buffer corresponding to a Token. */
48 class input_buffer : no_copy {
49 friend class tbb::internal::pipeline_root_task;
50 friend class tbb::filter;
51 friend class tbb::thread_bound_filter;
52 friend class tbb::internal::stage_task;
53 friend class tbb::pipeline;
54
55 typedef Token size_type;
56
57 //! Array of deferred tasks that cannot yet start executing.
58 task_info* array;
59
60 //! for thread-bound filter, semaphore for waiting, NULL otherwise.
61 semaphore* my_sem;
62
63 //! Size of array
64 /** Always 0 or a power of 2 */
65 size_type array_size;
66
67 //! Lowest token that can start executing.
68 /** All prior Token have already been seen. */
69 Token low_token;
70
71 //! Serializes updates.
72 spin_mutex array_mutex;
73
74 //! Resize "array".
75 /** Caller is responsible to acquiring a lock on "array_mutex". */
76 void grow( size_type minimum_size );
77
78 //! Initial size for "array"
79 /** Must be a power of 2 */
80 static const size_type initial_buffer_size = 4;
81
82 //! Used for out of order buffer, and for assigning my_token if is_ordered and my_token not already assigned
83 Token high_token;
84
85 //! True for ordered filter, false otherwise.
86 bool is_ordered;
87
88 //! True for thread-bound filter, false otherwise.
89 bool is_bound;
90
91 //! for parallel filters that accepts NULLs, thread-local flag for reaching end_of_input
92 typedef basic_tls<intptr_t> end_of_input_tls_t;
93 end_of_input_tls_t end_of_input_tls;
94 bool end_of_input_tls_allocated; // no way to test pthread creation of TLS
95
create_sema(size_t initial_tokens)96 void create_sema(size_t initial_tokens) { __TBB_ASSERT(!my_sem,NULL); my_sem = new internal::semaphore(initial_tokens); }
free_sema()97 void free_sema() { __TBB_ASSERT(my_sem,NULL); delete my_sem; }
sema_P()98 void sema_P() { __TBB_ASSERT(my_sem,NULL); my_sem->P(); }
sema_V()99 void sema_V() { __TBB_ASSERT(my_sem,NULL); my_sem->V(); }
100
101 public:
102 //! Construct empty buffer.
input_buffer(bool is_ordered_,bool is_bound_)103 input_buffer( bool is_ordered_, bool is_bound_ ) :
104 array(NULL), my_sem(NULL), array_size(0),
105 low_token(0), high_token(0),
106 is_ordered(is_ordered_), is_bound(is_bound_),
107 end_of_input_tls_allocated(false) {
108 grow(initial_buffer_size);
109 __TBB_ASSERT( array, NULL );
110 if(is_bound) create_sema(0);
111 }
112
113 //! Destroy the buffer.
~input_buffer()114 ~input_buffer() {
115 __TBB_ASSERT( array, NULL );
116 cache_aligned_allocator<task_info>().deallocate(array,array_size);
117 poison_pointer( array );
118 if(my_sem) {
119 free_sema();
120 }
121 if(end_of_input_tls_allocated) {
122 destroy_my_tls();
123 }
124 }
125
126 //! Put a token into the buffer.
127 /** If task information was placed into buffer, returns true;
128 otherwise returns false, informing the caller to create and spawn a task.
129 If input buffer owned by thread-bound filter and the item at
130 low_token was not valid, issue a V()
131 If the input_buffer is owned by a successor to a thread-bound filter,
132 the force_put parameter should be true to ensure the token is inserted
133 in the buffer.
134 */
put_token(task_info & info_,bool force_put=false)135 bool put_token( task_info& info_, bool force_put = false ) {
136 {
137 info_.is_valid = true;
138 spin_mutex::scoped_lock lock( array_mutex );
139 Token token;
140 bool was_empty = !array[low_token&(array_size-1)].is_valid;
141 if( is_ordered ) {
142 if( !info_.my_token_ready ) {
143 info_.my_token = high_token++;
144 info_.my_token_ready = true;
145 }
146 token = info_.my_token;
147 } else
148 token = high_token++;
149 __TBB_ASSERT( (tokendiff_t)(token-low_token)>=0, NULL );
150 if( token!=low_token || is_bound || force_put ) {
151 // Trying to put token that is beyond low_token.
152 // Need to wait until low_token catches up before dispatching.
153 if( token-low_token>=array_size )
154 grow( token-low_token+1 );
155 ITT_NOTIFY( sync_releasing, this );
156 array[token&(array_size-1)] = info_;
157 if(was_empty && is_bound) {
158 sema_V();
159 }
160 return true;
161 }
162 }
163 return false;
164 }
165
166 //! Note that processing of a token is finished.
167 /** Fires up processing of the next token, if processing was deferred. */
168 // Uses template to avoid explicit dependency on stage_task.
169 // This is only called for serial filters, and is the reason for the
170 // advance parameter in return_item (we're incrementing low_token here.)
171 // Non-TBF serial stages don't advance the token at the start because the presence
172 // of the current token in the buffer keeps another stage from being spawned.
173 template<typename StageTask>
note_done(Token token,StageTask & spawner)174 void note_done( Token token, StageTask& spawner ) {
175 task_info wakee;
176 wakee.reset();
177 {
178 spin_mutex::scoped_lock lock( array_mutex );
179 if( !is_ordered || token==low_token ) {
180 // Wake the next task
181 task_info& item = array[++low_token & (array_size-1)];
182 ITT_NOTIFY( sync_acquired, this );
183 wakee = item;
184 item.is_valid = false;
185 }
186 }
187 if( wakee.is_valid )
188 spawner.spawn_stage_task(wakee);
189 }
190
191 #if __TBB_TASK_GROUP_CONTEXT
192 //! The method destroys all data in filters to prevent memory leaks
clear(filter * my_filter)193 void clear( filter* my_filter ) {
194 long t=low_token;
195 for( size_type i=0; i<array_size; ++i, ++t ){
196 task_info& temp = array[t&(array_size-1)];
197 if (temp.is_valid ) {
198 my_filter->finalize(temp.my_object);
199 temp.is_valid = false;
200 }
201 }
202 }
203 #endif
204
205 //! return an item, invalidate the queued item, but only advance if the filter
206 // is parallel (as indicated by advance == true). If the filter is serial, leave the
207 // item in the buffer to keep another stage from being spawned.
return_item(task_info & info,bool advance)208 bool return_item(task_info& info, bool advance) {
209 spin_mutex::scoped_lock lock( array_mutex );
210 task_info& item = array[low_token&(array_size-1)];
211 ITT_NOTIFY( sync_acquired, this );
212 if( item.is_valid ) {
213 info = item;
214 item.is_valid = false;
215 if (advance) low_token++;
216 return true;
217 }
218 return false;
219 }
220
221 //! true if the current low_token is valid.
has_item()222 bool has_item() { spin_mutex::scoped_lock lock(array_mutex); return array[low_token&(array_size -1)].is_valid; }
223
224 // end_of_input signal for parallel_pipeline, parallel input filters with 0 tokens allowed.
create_my_tls()225 void create_my_tls() { int status = end_of_input_tls.create(); if(status) handle_perror(status, "TLS not allocated for filter"); end_of_input_tls_allocated = true; }
destroy_my_tls()226 void destroy_my_tls() { int status = end_of_input_tls.destroy(); if(status) handle_perror(status, "Failed to destroy filter TLS"); }
my_tls_end_of_input()227 bool my_tls_end_of_input() { return end_of_input_tls.get() != 0; }
set_my_tls_end_of_input()228 void set_my_tls_end_of_input() { end_of_input_tls.set(1); }
229 };
230
grow(size_type minimum_size)231 void input_buffer::grow( size_type minimum_size ) {
232 size_type old_size = array_size;
233 size_type new_size = old_size ? 2*old_size : initial_buffer_size;
234 while( new_size<minimum_size )
235 new_size*=2;
236 task_info* new_array = cache_aligned_allocator<task_info>().allocate(new_size);
237 task_info* old_array = array;
238 for( size_type i=0; i<new_size; ++i )
239 new_array[i].is_valid = false;
240 long t=low_token;
241 for( size_type i=0; i<old_size; ++i, ++t )
242 new_array[t&(new_size-1)] = old_array[t&(old_size-1)];
243 array = new_array;
244 array_size = new_size;
245 if( old_array )
246 cache_aligned_allocator<task_info>().deallocate(old_array,old_size);
247 }
248
249 class stage_task: public task, public task_info {
250 private:
251 friend class tbb::pipeline;
252 pipeline& my_pipeline;
253 filter* my_filter;
254 //! True if this task has not yet read the input.
255 bool my_at_start;
256
257 public:
258 //! Construct stage_task for first stage in a pipeline.
259 /** Such a stage has not read any input yet. */
stage_task(pipeline & pipeline)260 stage_task( pipeline& pipeline ) :
261 my_pipeline(pipeline),
262 my_filter(pipeline.filter_list),
263 my_at_start(true)
264 {
265 task_info::reset();
266 }
267 //! Construct stage_task for a subsequent stage in a pipeline.
stage_task(pipeline & pipeline,filter * filter_,const task_info & info)268 stage_task( pipeline& pipeline, filter* filter_, const task_info& info ) :
269 task_info(info),
270 my_pipeline(pipeline),
271 my_filter(filter_),
272 my_at_start(false)
273 {}
274 //! Roughly equivalent to the constructor of input stage task
reset()275 void reset() {
276 task_info::reset();
277 my_filter = my_pipeline.filter_list;
278 my_at_start = true;
279 }
280 //! The virtual task execution method
281 task* execute() __TBB_override;
282 #if __TBB_TASK_GROUP_CONTEXT
~stage_task()283 ~stage_task()
284 {
285 if (my_filter && my_object && (my_filter->my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(4)) {
286 __TBB_ASSERT(is_cancelled(), "Trying to finalize the task that wasn't cancelled");
287 my_filter->finalize(my_object);
288 my_object = NULL;
289 }
290 }
291 #endif // __TBB_TASK_GROUP_CONTEXT
292 //! Creates and spawns stage_task from task_info
spawn_stage_task(const task_info & info)293 void spawn_stage_task(const task_info& info)
294 {
295 stage_task* clone = new (allocate_additional_child_of(*parent()))
296 stage_task( my_pipeline, my_filter, info );
297 spawn(*clone);
298 }
299 };
300
execute()301 task* stage_task::execute() {
302 __TBB_ASSERT( !my_at_start || !my_object, NULL );
303 __TBB_ASSERT( !my_filter->is_bound(), NULL );
304 if( my_at_start ) {
305 if( my_filter->is_serial() ) {
306 my_object = (*my_filter)(my_object);
307 if( my_object || ( my_filter->object_may_be_null() && !my_pipeline.end_of_input) )
308 {
309 if( my_filter->is_ordered() ) {
310 my_token = my_pipeline.token_counter++; // ideally, with relaxed semantics
311 my_token_ready = true;
312 } else if( (my_filter->my_filter_mode & my_filter->version_mask) >= __TBB_PIPELINE_VERSION(5) ) {
313 if( my_pipeline.has_thread_bound_filters )
314 my_pipeline.token_counter++; // ideally, with relaxed semantics
315 }
316 if( !my_filter->next_filter_in_pipeline ) { // we're only filter in pipeline
317 reset();
318 goto process_another_stage;
319 } else {
320 ITT_NOTIFY( sync_releasing, &my_pipeline.input_tokens );
321 if( --my_pipeline.input_tokens>0 )
322 spawn( *new( allocate_additional_child_of(*parent()) ) stage_task( my_pipeline ) );
323 }
324 } else {
325 my_pipeline.end_of_input = true;
326 return NULL;
327 }
328 } else /*not is_serial*/ {
329 if( my_pipeline.end_of_input )
330 return NULL;
331 if( (my_filter->my_filter_mode & my_filter->version_mask) >= __TBB_PIPELINE_VERSION(5) ) {
332 if( my_pipeline.has_thread_bound_filters )
333 my_pipeline.token_counter++;
334 }
335 ITT_NOTIFY( sync_releasing, &my_pipeline.input_tokens );
336 if( --my_pipeline.input_tokens>0 )
337 spawn( *new( allocate_additional_child_of(*parent()) ) stage_task( my_pipeline ) );
338 my_object = (*my_filter)(my_object);
339 if( !my_object && (!my_filter->object_may_be_null() || my_filter->my_input_buffer->my_tls_end_of_input()) )
340 {
341 my_pipeline.end_of_input = true;
342 if( (my_filter->my_filter_mode & my_filter->version_mask) >= __TBB_PIPELINE_VERSION(5) ) {
343 if( my_pipeline.has_thread_bound_filters )
344 my_pipeline.token_counter--; // fix token_counter
345 }
346 return NULL;
347 }
348 }
349 my_at_start = false;
350 } else {
351 my_object = (*my_filter)(my_object);
352 if( my_filter->is_serial() )
353 my_filter->my_input_buffer->note_done(my_token, *this);
354 }
355 my_filter = my_filter->next_filter_in_pipeline;
356 if( my_filter ) {
357 // There is another filter to execute.
358 if( my_filter->is_serial() ) {
359 // The next filter must execute tokens in order
360 if( my_filter->my_input_buffer->put_token(*this) ){
361 // Can't proceed with the same item
362 if( my_filter->is_bound() ) {
363 // Find the next non-thread-bound filter
364 do {
365 my_filter = my_filter->next_filter_in_pipeline;
366 } while( my_filter && my_filter->is_bound() );
367 // Check if there is an item ready to process
368 if( my_filter && my_filter->my_input_buffer->return_item(*this, !my_filter->is_serial()))
369 goto process_another_stage;
370 }
371 my_filter = NULL; // To prevent deleting my_object twice if exception occurs
372 return NULL;
373 }
374 }
375 } else {
376 // Reached end of the pipe.
377 size_t ntokens_avail = ++my_pipeline.input_tokens;
378 if(my_pipeline.filter_list->is_bound() ) {
379 if(ntokens_avail == 1) {
380 my_pipeline.filter_list->my_input_buffer->sema_V();
381 }
382 return NULL;
383 }
384 if( ntokens_avail>1 // Only recycle if there is one available token
385 || my_pipeline.end_of_input ) {
386 return NULL; // No need to recycle for new input
387 }
388 ITT_NOTIFY( sync_acquired, &my_pipeline.input_tokens );
389 // Recycle as an input stage task.
390 reset();
391 }
392 process_another_stage:
393 /* A semi-hackish way to reexecute the same task object immediately without spawning.
394 recycle_as_continuation marks the task for future execution,
395 and then 'this' pointer is returned to bypass spawning. */
396 recycle_as_continuation();
397 return this;
398 }
399
400 class pipeline_root_task: public task {
401 pipeline& my_pipeline;
402 bool do_segment_scanning;
403
execute()404 task* execute() __TBB_override {
405 if( !my_pipeline.end_of_input )
406 if( !my_pipeline.filter_list->is_bound() )
407 if( my_pipeline.input_tokens > 0 ) {
408 recycle_as_continuation();
409 set_ref_count(1);
410 return new( allocate_child() ) stage_task( my_pipeline );
411 }
412 if( do_segment_scanning ) {
413 filter* current_filter = my_pipeline.filter_list->next_segment;
414 /* first non-thread-bound filter that follows thread-bound one
415 and may have valid items to process */
416 filter* first_suitable_filter = current_filter;
417 while( current_filter ) {
418 __TBB_ASSERT( !current_filter->is_bound(), "filter is thread-bound?" );
419 __TBB_ASSERT( current_filter->prev_filter_in_pipeline->is_bound(), "previous filter is not thread-bound?" );
420 if( !my_pipeline.end_of_input || current_filter->has_more_work())
421 {
422 task_info info;
423 info.reset();
424 task* bypass = NULL;
425 int refcnt = 0;
426 task_list list;
427 // No new tokens are created; it's OK to process all waiting tokens.
428 // If the filter is serial, the second call to return_item will return false.
429 while( current_filter->my_input_buffer->return_item(info, !current_filter->is_serial()) ) {
430 task* t = new( allocate_child() ) stage_task( my_pipeline, current_filter, info );
431 if( ++refcnt == 1 )
432 bypass = t;
433 else // there's more than one task
434 list.push_back(*t);
435 // TODO: limit the list size (to arena size?) to spawn tasks sooner
436 __TBB_ASSERT( refcnt <= int(my_pipeline.token_counter), "token counting error" );
437 info.reset();
438 }
439 if( refcnt ) {
440 set_ref_count( refcnt );
441 if( refcnt > 1 )
442 spawn(list);
443 recycle_as_continuation();
444 return bypass;
445 }
446 current_filter = current_filter->next_segment;
447 if( !current_filter ) {
448 if( !my_pipeline.end_of_input ) {
449 recycle_as_continuation();
450 return this;
451 }
452 current_filter = first_suitable_filter;
453 __TBB_Yield();
454 }
455 } else {
456 /* The preceding pipeline segment is empty.
457 Fast-forward to the next post-TBF segment. */
458 first_suitable_filter = first_suitable_filter->next_segment;
459 current_filter = first_suitable_filter;
460 }
461 } /* while( current_filter ) */
462 return NULL;
463 } else {
464 if( !my_pipeline.end_of_input ) {
465 recycle_as_continuation();
466 return this;
467 }
468 return NULL;
469 }
470 }
471 public:
pipeline_root_task(pipeline & pipeline)472 pipeline_root_task( pipeline& pipeline ): my_pipeline(pipeline), do_segment_scanning(false)
473 {
474 __TBB_ASSERT( my_pipeline.filter_list, NULL );
475 filter* first = my_pipeline.filter_list;
476 if( (first->my_filter_mode & first->version_mask) >= __TBB_PIPELINE_VERSION(5) ) {
477 // Scanning the pipeline for segments
478 filter* head_of_previous_segment = first;
479 for( filter* subfilter=first->next_filter_in_pipeline;
480 subfilter!=NULL;
481 subfilter=subfilter->next_filter_in_pipeline )
482 {
483 if( subfilter->prev_filter_in_pipeline->is_bound() && !subfilter->is_bound() ) {
484 do_segment_scanning = true;
485 head_of_previous_segment->next_segment = subfilter;
486 head_of_previous_segment = subfilter;
487 }
488 }
489 }
490 }
491 };
492
493 #if _MSC_VER && !defined(__INTEL_COMPILER)
494 // Workaround for overzealous compiler warnings
495 // Suppress compiler warning about constant conditional expression
496 #pragma warning (disable: 4127)
497 #endif
498
499 // The class destroys end_counter and clears all input buffers if pipeline was cancelled.
500 class pipeline_cleaner: internal::no_copy {
501 pipeline& my_pipeline;
502 public:
pipeline_cleaner(pipeline & _pipeline)503 pipeline_cleaner(pipeline& _pipeline) :
504 my_pipeline(_pipeline)
505 {}
~pipeline_cleaner()506 ~pipeline_cleaner(){
507 #if __TBB_TASK_GROUP_CONTEXT
508 if (my_pipeline.end_counter->is_cancelled()) // Pipeline was cancelled
509 my_pipeline.clear_filters();
510 #endif
511 my_pipeline.end_counter = NULL;
512 }
513 };
514
515 } // namespace internal
516
inject_token(task &)517 void pipeline::inject_token( task& ) {
518 __TBB_ASSERT(false,"illegal call to inject_token");
519 }
520
521 #if __TBB_TASK_GROUP_CONTEXT
clear_filters()522 void pipeline::clear_filters() {
523 for( filter* f = filter_list; f; f = f->next_filter_in_pipeline ) {
524 if ((f->my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(4))
525 if( internal::input_buffer* b = f->my_input_buffer )
526 b->clear(f);
527 }
528 }
529 #endif
530
pipeline()531 pipeline::pipeline() :
532 filter_list(NULL),
533 filter_end(NULL),
534 end_counter(NULL),
535 end_of_input(false),
536 has_thread_bound_filters(false)
537 {
538 token_counter = 0;
539 input_tokens = 0;
540 }
541
~pipeline()542 pipeline::~pipeline() {
543 clear();
544 }
545
clear()546 void pipeline::clear() {
547 filter* next;
548 for( filter* f = filter_list; f; f=next ) {
549 if( internal::input_buffer* b = f->my_input_buffer ) {
550 delete b;
551 f->my_input_buffer = NULL;
552 }
553 next=f->next_filter_in_pipeline;
554 f->next_filter_in_pipeline = filter::not_in_pipeline();
555 if ( (f->my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(3) ) {
556 f->prev_filter_in_pipeline = filter::not_in_pipeline();
557 f->my_pipeline = NULL;
558 }
559 if ( (f->my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(5) )
560 f->next_segment = NULL;
561 }
562 filter_list = filter_end = NULL;
563 }
564
add_filter(filter & filter_)565 void pipeline::add_filter( filter& filter_ ) {
566 #if TBB_USE_ASSERT
567 if ( (filter_.my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(3) )
568 __TBB_ASSERT( filter_.prev_filter_in_pipeline==filter::not_in_pipeline(), "filter already part of pipeline?" );
569 __TBB_ASSERT( filter_.next_filter_in_pipeline==filter::not_in_pipeline(), "filter already part of pipeline?" );
570 __TBB_ASSERT( !end_counter, "invocation of add_filter on running pipeline" );
571 #endif
572 if ( (filter_.my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(3) ) {
573 filter_.my_pipeline = this;
574 filter_.prev_filter_in_pipeline = filter_end;
575 if ( filter_list == NULL)
576 filter_list = &filter_;
577 else
578 filter_end->next_filter_in_pipeline = &filter_;
579 filter_.next_filter_in_pipeline = NULL;
580 filter_end = &filter_;
581 } else {
582 if( !filter_end )
583 filter_end = reinterpret_cast<filter*>(&filter_list);
584
585 *reinterpret_cast<filter**>(filter_end) = &filter_;
586 filter_end = reinterpret_cast<filter*>(&filter_.next_filter_in_pipeline);
587 *reinterpret_cast<filter**>(filter_end) = NULL;
588 }
589 if( (filter_.my_filter_mode & filter_.version_mask) >= __TBB_PIPELINE_VERSION(5) ) {
590 if( filter_.is_serial() ) {
591 if( filter_.is_bound() )
592 has_thread_bound_filters = true;
593 filter_.my_input_buffer = new internal::input_buffer( filter_.is_ordered(), filter_.is_bound() );
594 } else {
595 if(filter_.prev_filter_in_pipeline) {
596 if(filter_.prev_filter_in_pipeline->is_bound()) {
597 // successors to bound filters must have an input_buffer
598 filter_.my_input_buffer = new internal::input_buffer( /*is_ordered*/false, false );
599 }
600 } else { // input filter
601 if(filter_.object_may_be_null() ) {
602 //TODO: buffer only needed to hold TLS; could improve
603 filter_.my_input_buffer = new internal::input_buffer( /*is_ordered*/false, false );
604 filter_.my_input_buffer->create_my_tls();
605 }
606 }
607 }
608 } else {
609 if( filter_.is_serial() ) {
610 filter_.my_input_buffer = new internal::input_buffer( filter_.is_ordered(), false );
611 }
612 }
613
614 }
615
remove_filter(filter & filter_)616 void pipeline::remove_filter( filter& filter_ ) {
617 __TBB_ASSERT( filter_.prev_filter_in_pipeline!=filter::not_in_pipeline(), "filter not part of pipeline" );
618 __TBB_ASSERT( filter_.next_filter_in_pipeline!=filter::not_in_pipeline(), "filter not part of pipeline" );
619 __TBB_ASSERT( !end_counter, "invocation of remove_filter on running pipeline" );
620 if (&filter_ == filter_list)
621 filter_list = filter_.next_filter_in_pipeline;
622 else {
623 __TBB_ASSERT( filter_.prev_filter_in_pipeline, "filter list broken?" );
624 filter_.prev_filter_in_pipeline->next_filter_in_pipeline = filter_.next_filter_in_pipeline;
625 }
626 if (&filter_ == filter_end)
627 filter_end = filter_.prev_filter_in_pipeline;
628 else {
629 __TBB_ASSERT( filter_.next_filter_in_pipeline, "filter list broken?" );
630 filter_.next_filter_in_pipeline->prev_filter_in_pipeline = filter_.prev_filter_in_pipeline;
631 }
632 if( internal::input_buffer* b = filter_.my_input_buffer ) {
633 delete b;
634 filter_.my_input_buffer = NULL;
635 }
636 filter_.next_filter_in_pipeline = filter_.prev_filter_in_pipeline = filter::not_in_pipeline();
637 if ( (filter_.my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(5) )
638 filter_.next_segment = NULL;
639 filter_.my_pipeline = NULL;
640 }
641
run(size_t max_number_of_live_tokens,tbb::task_group_context & context)642 void pipeline::run( size_t max_number_of_live_tokens
643 #if __TBB_TASK_GROUP_CONTEXT
644 , tbb::task_group_context& context
645 #endif
646 ) {
647 __TBB_ASSERT( max_number_of_live_tokens>0, "pipeline::run must have at least one token" );
648 __TBB_ASSERT( !end_counter, "pipeline already running?" );
649 if( filter_list ) {
650 internal::pipeline_cleaner my_pipeline_cleaner(*this);
651 end_of_input = false;
652 input_tokens = internal::Token(max_number_of_live_tokens);
653 if(has_thread_bound_filters) {
654 // release input filter if thread-bound
655 if(filter_list->is_bound()) {
656 filter_list->my_input_buffer->sema_V();
657 }
658 }
659 #if __TBB_TASK_GROUP_CONTEXT
660 end_counter = new( task::allocate_root(context) ) internal::pipeline_root_task( *this );
661 #else
662 end_counter = new( task::allocate_root() ) internal::pipeline_root_task( *this );
663 #endif
664 // Start execution of tasks
665 task::spawn_root_and_wait( *end_counter );
666
667 if(has_thread_bound_filters) {
668 for(filter* f = filter_list->next_filter_in_pipeline; f; f=f->next_filter_in_pipeline) {
669 if(f->is_bound()) {
670 f->my_input_buffer->sema_V(); // wake to end
671 }
672 }
673 }
674 }
675 }
676
677 #if __TBB_TASK_GROUP_CONTEXT
run(size_t max_number_of_live_tokens)678 void pipeline::run( size_t max_number_of_live_tokens ) {
679 if( filter_list ) {
680 // Construct task group context with the exception propagation mode expected
681 // by the pipeline caller.
682 uintptr_t ctx_traits = filter_list->my_filter_mode & filter::exact_exception_propagation ?
683 task_group_context::default_traits :
684 task_group_context::default_traits & ~task_group_context::exact_exception;
685 task_group_context context(task_group_context::bound, ctx_traits);
686 run(max_number_of_live_tokens, context);
687 }
688 }
689 #endif // __TBB_TASK_GROUP_CONTEXT
690
has_more_work()691 bool filter::has_more_work() {
692 __TBB_ASSERT(my_pipeline, NULL);
693 __TBB_ASSERT(my_input_buffer, "has_more_work() called for filter with no input buffer");
694 return (internal::tokendiff_t)(my_pipeline->token_counter - my_input_buffer->low_token) != 0;
695 }
696
~filter()697 filter::~filter() {
698 if ( (my_filter_mode & version_mask) >= __TBB_PIPELINE_VERSION(3) ) {
699 if ( next_filter_in_pipeline != filter::not_in_pipeline() )
700 my_pipeline->remove_filter(*this);
701 else
702 __TBB_ASSERT( prev_filter_in_pipeline == filter::not_in_pipeline(), "probably filter list is broken" );
703 } else {
704 __TBB_ASSERT( next_filter_in_pipeline==filter::not_in_pipeline(), "cannot destroy filter that is part of pipeline" );
705 }
706 }
707
set_end_of_input()708 void filter::set_end_of_input() {
709 __TBB_ASSERT(my_input_buffer, NULL);
710 __TBB_ASSERT(object_may_be_null(), NULL);
711 if(is_serial()) {
712 my_pipeline->end_of_input = true;
713 } else {
714 __TBB_ASSERT(my_input_buffer->end_of_input_tls_allocated, NULL);
715 my_input_buffer->set_my_tls_end_of_input();
716 }
717 }
718
process_item()719 thread_bound_filter::result_type thread_bound_filter::process_item() {
720 return internal_process_item(true);
721 }
722
try_process_item()723 thread_bound_filter::result_type thread_bound_filter::try_process_item() {
724 return internal_process_item(false);
725 }
726
internal_process_item(bool is_blocking)727 thread_bound_filter::result_type thread_bound_filter::internal_process_item(bool is_blocking) {
728 __TBB_ASSERT(my_pipeline != NULL,"It's not supposed that process_item is called for a filter that is not in a pipeline.");
729 internal::task_info info;
730 info.reset();
731
732 if( my_pipeline->end_of_input && !has_more_work() )
733 return end_of_stream;
734
735 if( !prev_filter_in_pipeline ) {
736 if( my_pipeline->end_of_input )
737 return end_of_stream;
738 while( my_pipeline->input_tokens == 0 ) {
739 if( !is_blocking )
740 return item_not_available;
741 my_input_buffer->sema_P();
742 }
743 info.my_object = (*this)(info.my_object);
744 if( info.my_object ) {
745 __TBB_ASSERT(my_pipeline->input_tokens > 0, "Token failed in thread-bound filter");
746 my_pipeline->input_tokens--;
747 if( is_ordered() ) {
748 info.my_token = my_pipeline->token_counter;
749 info.my_token_ready = true;
750 }
751 my_pipeline->token_counter++; // ideally, with relaxed semantics
752 } else {
753 my_pipeline->end_of_input = true;
754 return end_of_stream;
755 }
756 } else { /* this is not an input filter */
757 while( !my_input_buffer->has_item() ) {
758 if( !is_blocking ) {
759 return item_not_available;
760 }
761 my_input_buffer->sema_P();
762 if( my_pipeline->end_of_input && !has_more_work() ) {
763 return end_of_stream;
764 }
765 }
766 if( !my_input_buffer->return_item(info, /*advance*/true) ) {
767 __TBB_ASSERT(false,"return_item failed");
768 }
769 info.my_object = (*this)(info.my_object);
770 }
771 if( next_filter_in_pipeline ) {
772 if ( !next_filter_in_pipeline->my_input_buffer->put_token(info,/*force_put=*/true) ) {
773 __TBB_ASSERT(false, "Couldn't put token after thread-bound buffer");
774 }
775 } else {
776 size_t ntokens_avail = ++(my_pipeline->input_tokens);
777 if( my_pipeline->filter_list->is_bound() ) {
778 if( ntokens_avail == 1 ) {
779 my_pipeline->filter_list->my_input_buffer->sema_V();
780 }
781 }
782 }
783
784 return success;
785 }
786
787 } // tbb
788
789