1 /*
2 Copyright 2005-2014 Intel Corporation. All Rights Reserved.
3
4 This file is part of Threading Building Blocks. Threading Building Blocks is free software;
5 you can redistribute it and/or modify it under the terms of the GNU General Public License
6 version 2 as published by the Free Software Foundation. Threading Building Blocks is
7 distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
8 implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
9 See the GNU General Public License for more details. You should have received a copy of
10 the GNU General Public License along with Threading Building Blocks; if not, write to the
11 Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
12
13 As a special exception, you may use this file as part of a free software library without
14 restriction. Specifically, if other files instantiate templates or use macros or inline
15 functions from this file, or you compile this file and link it with other files to produce
16 an executable, this file does not by itself cause the resulting executable to be covered
17 by the GNU General Public License. This exception does not however invalidate any other
18 reasons why the executable file might be covered by the GNU General Public License.
19 */
20
21 #include "scheduler.h"
22
23 #include "itt_notify.h"
24
25 namespace tbb {
26
27 #if __TBB_TASK_GROUP_CONTEXT
28
29 using namespace internal;
30
31 //------------------------------------------------------------------------
32 // captured_exception
33 //------------------------------------------------------------------------
34
duplicate_string(const char * src)35 inline char* duplicate_string ( const char* src ) {
36 char* dst = NULL;
37 if ( src ) {
38 size_t len = strlen(src) + 1;
39 dst = (char*)allocate_via_handler_v3(len);
40 strncpy (dst, src, len);
41 }
42 return dst;
43 }
44
~captured_exception()45 captured_exception::~captured_exception () throw() {
46 clear();
47 }
48
set(const char * a_name,const char * info)49 void captured_exception::set ( const char* a_name, const char* info ) throw() {
50 my_exception_name = duplicate_string( a_name );
51 my_exception_info = duplicate_string( info );
52 }
53
clear()54 void captured_exception::clear () throw() {
55 deallocate_via_handler_v3 (const_cast<char*>(my_exception_name));
56 deallocate_via_handler_v3 (const_cast<char*>(my_exception_info));
57 }
58
move()59 captured_exception* captured_exception::move () throw() {
60 captured_exception *e = (captured_exception*)allocate_via_handler_v3(sizeof(captured_exception));
61 if ( e ) {
62 ::new (e) captured_exception();
63 e->my_exception_name = my_exception_name;
64 e->my_exception_info = my_exception_info;
65 e->my_dynamic = true;
66 my_exception_name = my_exception_info = NULL;
67 }
68 return e;
69 }
70
destroy()71 void captured_exception::destroy () throw() {
72 __TBB_ASSERT ( my_dynamic, "Method destroy can be used only on objects created by clone or allocate" );
73 if ( my_dynamic ) {
74 this->captured_exception::~captured_exception();
75 deallocate_via_handler_v3 (this);
76 }
77 }
78
allocate(const char * a_name,const char * info)79 captured_exception* captured_exception::allocate ( const char* a_name, const char* info ) {
80 captured_exception *e = (captured_exception*)allocate_via_handler_v3( sizeof(captured_exception) );
81 if ( e ) {
82 ::new (e) captured_exception(a_name, info);
83 e->my_dynamic = true;
84 }
85 return e;
86 }
87
name() const88 const char* captured_exception::name() const throw() {
89 return my_exception_name;
90 }
91
what() const92 const char* captured_exception::what() const throw() {
93 return my_exception_info;
94 }
95
96
97 //------------------------------------------------------------------------
98 // tbb_exception_ptr
99 //------------------------------------------------------------------------
100
101 #if !TBB_USE_CAPTURED_EXCEPTION
102
103 namespace internal {
104
105 template<typename T>
AllocateExceptionContainer(const T & src)106 tbb_exception_ptr* AllocateExceptionContainer( const T& src ) {
107 tbb_exception_ptr *eptr = (tbb_exception_ptr*)allocate_via_handler_v3( sizeof(tbb_exception_ptr) );
108 if ( eptr )
109 new (eptr) tbb_exception_ptr(src);
110 return eptr;
111 }
112
allocate()113 tbb_exception_ptr* tbb_exception_ptr::allocate () {
114 return AllocateExceptionContainer( std::current_exception() );
115 }
116
allocate(const tbb_exception &)117 tbb_exception_ptr* tbb_exception_ptr::allocate ( const tbb_exception& ) {
118 return AllocateExceptionContainer( std::current_exception() );
119 }
120
allocate(captured_exception & src)121 tbb_exception_ptr* tbb_exception_ptr::allocate ( captured_exception& src ) {
122 tbb_exception_ptr *res = AllocateExceptionContainer( src );
123 src.destroy();
124 return res;
125 }
126
destroy()127 void tbb_exception_ptr::destroy () throw() {
128 this->tbb_exception_ptr::~tbb_exception_ptr();
129 deallocate_via_handler_v3 (this);
130 }
131
132 } // namespace internal
133 #endif /* !TBB_USE_CAPTURED_EXCEPTION */
134
135
136 //------------------------------------------------------------------------
137 // task_group_context
138 //------------------------------------------------------------------------
139
~task_group_context()140 task_group_context::~task_group_context () {
141 if ( __TBB_load_relaxed(my_kind) == binding_completed ) {
142 if ( governor::is_set(my_owner) ) {
143 // Local update of the context list
144 uintptr_t local_count_snapshot = my_owner->my_context_state_propagation_epoch;
145 my_owner->my_local_ctx_list_update.store<relaxed>(1);
146 // Prevent load of nonlocal update flag from being hoisted before the
147 // store to local update flag.
148 atomic_fence();
149 if ( my_owner->my_nonlocal_ctx_list_update.load<relaxed>() ) {
150 spin_mutex::scoped_lock lock(my_owner->my_context_list_mutex);
151 my_node.my_prev->my_next = my_node.my_next;
152 my_node.my_next->my_prev = my_node.my_prev;
153 my_owner->my_local_ctx_list_update.store<relaxed>(0);
154 }
155 else {
156 my_node.my_prev->my_next = my_node.my_next;
157 my_node.my_next->my_prev = my_node.my_prev;
158 // Release fence is necessary so that update of our neighbors in
159 // the context list was committed when possible concurrent destroyer
160 // proceeds after local update flag is reset by the following store.
161 my_owner->my_local_ctx_list_update.store<release>(0);
162 if ( local_count_snapshot != the_context_state_propagation_epoch ) {
163 // Another thread was propagating cancellation request when we removed
164 // ourselves from the list. We must ensure that it is not accessing us
165 // when this destructor finishes. We'll be able to acquire the lock
166 // below only after the other thread finishes with us.
167 spin_mutex::scoped_lock lock(my_owner->my_context_list_mutex);
168 }
169 }
170 }
171 else {
172 // Nonlocal update of the context list
173 // Synchronizes with generic_scheduler::cleanup_local_context_list()
174 // TODO: evaluate and perhaps relax, or add some lock instead
175 if ( internal::as_atomic(my_kind).fetch_and_store(dying) == detached ) {
176 my_node.my_prev->my_next = my_node.my_next;
177 my_node.my_next->my_prev = my_node.my_prev;
178 }
179 else {
180 //TODO: evaluate and perhaps relax
181 my_owner->my_nonlocal_ctx_list_update.fetch_and_increment<full_fence>();
182 //TODO: evaluate and perhaps remove
183 spin_wait_until_eq( my_owner->my_local_ctx_list_update, 0u );
184 my_owner->my_context_list_mutex.lock();
185 my_node.my_prev->my_next = my_node.my_next;
186 my_node.my_next->my_prev = my_node.my_prev;
187 my_owner->my_context_list_mutex.unlock();
188 //TODO: evaluate and perhaps relax
189 my_owner->my_nonlocal_ctx_list_update.fetch_and_decrement<full_fence>();
190 }
191 }
192 }
193 #if __TBB_FP_CONTEXT
194 internal::punned_cast<cpu_ctl_env*>(&my_cpu_ctl_env)->~cpu_ctl_env();
195 #endif
196 poison_value(my_version_and_traits);
197 if ( my_exception )
198 my_exception->destroy();
199 ITT_STACK(itt_caller != ITT_CALLER_NULL, caller_destroy, itt_caller);
200 }
201
init()202 void task_group_context::init () {
203 __TBB_STATIC_ASSERT ( sizeof(my_version_and_traits) >= 4, "Layout of my_version_and_traits must be reconsidered on this platform" );
204 __TBB_STATIC_ASSERT ( sizeof(task_group_context) == 2 * NFS_MaxLineSize, "Context class has wrong size - check padding and members alignment" );
205 __TBB_ASSERT ( (uintptr_t(this) & (sizeof(my_cancellation_requested) - 1)) == 0, "Context is improperly aligned" );
206 __TBB_ASSERT ( __TBB_load_relaxed(my_kind) == isolated || __TBB_load_relaxed(my_kind) == bound, "Context can be created only as isolated or bound" );
207 my_parent = NULL;
208 my_cancellation_requested = 0;
209 my_exception = NULL;
210 my_owner = NULL;
211 my_state = 0;
212 itt_caller = ITT_CALLER_NULL;
213 #if __TBB_TASK_PRIORITY
214 my_priority = normalized_normal_priority;
215 #endif /* __TBB_TASK_PRIORITY */
216 #if __TBB_FP_CONTEXT
217 __TBB_STATIC_ASSERT( sizeof(my_cpu_ctl_env) == sizeof(internal::uint64_t), "The reserved space for FPU settings are not equal sizeof(uint64_t)" );
218 __TBB_STATIC_ASSERT( sizeof(cpu_ctl_env) <= sizeof(my_cpu_ctl_env), "FPU settings storage does not fit to uint64_t" );
219 suppress_unused_warning( my_cpu_ctl_env.space );
220
221 cpu_ctl_env &ctl = *internal::punned_cast<cpu_ctl_env*>(&my_cpu_ctl_env);
222 new ( &ctl ) cpu_ctl_env;
223 if ( my_version_and_traits & fp_settings )
224 ctl.get_env();
225 #endif
226 }
227
register_with(generic_scheduler * local_sched)228 void task_group_context::register_with ( generic_scheduler *local_sched ) {
229 __TBB_ASSERT( local_sched, NULL );
230 my_owner = local_sched;
231 // state propagation logic assumes new contexts are bound to head of the list
232 my_node.my_prev = &local_sched->my_context_list_head;
233 // Notify threads that may be concurrently destroying contexts registered
234 // in this scheduler's list that local list update is underway.
235 local_sched->my_local_ctx_list_update.store<relaxed>(1);
236 // Prevent load of global propagation epoch counter from being hoisted before
237 // speculative stores above, as well as load of nonlocal update flag from
238 // being hoisted before the store to local update flag.
239 atomic_fence();
240 // Finalize local context list update
241 if ( local_sched->my_nonlocal_ctx_list_update.load<relaxed>() ) {
242 spin_mutex::scoped_lock lock(my_owner->my_context_list_mutex);
243 local_sched->my_context_list_head.my_next->my_prev = &my_node;
244 my_node.my_next = local_sched->my_context_list_head.my_next;
245 my_owner->my_local_ctx_list_update.store<relaxed>(0);
246 local_sched->my_context_list_head.my_next = &my_node;
247 }
248 else {
249 local_sched->my_context_list_head.my_next->my_prev = &my_node;
250 my_node.my_next = local_sched->my_context_list_head.my_next;
251 my_owner->my_local_ctx_list_update.store<release>(0);
252 // Thread-local list of contexts allows concurrent traversal by another thread
253 // while propagating state change. To ensure visibility of my_node's members
254 // to the concurrently traversing thread, the list's head is updated by means
255 // of store-with-release.
256 __TBB_store_with_release(local_sched->my_context_list_head.my_next, &my_node);
257 }
258 }
259
bind_to(generic_scheduler * local_sched)260 void task_group_context::bind_to ( generic_scheduler *local_sched ) {
261 __TBB_ASSERT ( __TBB_load_relaxed(my_kind) == binding_required, "Already bound or isolated?" );
262 __TBB_ASSERT ( !my_parent, "Parent is set before initial binding" );
263 my_parent = local_sched->my_innermost_running_task->prefix().context;
264 #if __TBB_FP_CONTEXT
265 // Inherit FPU settings only if the context has not captured FPU settings yet.
266 if ( !(my_version_and_traits & fp_settings) )
267 copy_fp_settings(*my_parent);
268 #endif
269
270 // Condition below prevents unnecessary thrashing parent context's cache line
271 if ( !(my_parent->my_state & may_have_children) )
272 my_parent->my_state |= may_have_children; // full fence is below
273 if ( my_parent->my_parent ) {
274 // Even if this context were made accessible for state change propagation
275 // (by placing __TBB_store_with_release(s->my_context_list_head.my_next, &my_node)
276 // above), it still could be missed if state propagation from a grand-ancestor
277 // was underway concurrently with binding.
278 // Speculative propagation from the parent together with epoch counters
279 // detecting possibility of such a race allow to avoid taking locks when
280 // there is no contention.
281
282 // Acquire fence is necessary to prevent reordering subsequent speculative
283 // loads of parent state data out of the scope where epoch counters comparison
284 // can reliably validate it.
285 uintptr_t local_count_snapshot = __TBB_load_with_acquire( my_parent->my_owner->my_context_state_propagation_epoch );
286 // Speculative propagation of parent's state. The speculation will be
287 // validated by the epoch counters check further on.
288 my_cancellation_requested = my_parent->my_cancellation_requested;
289 #if __TBB_TASK_PRIORITY
290 my_priority = my_parent->my_priority;
291 #endif /* __TBB_TASK_PRIORITY */
292 register_with( local_sched ); // Issues full fence
293
294 // If no state propagation was detected by the following condition, the above
295 // full fence guarantees that the parent had correct state during speculative
296 // propagation before the fence. Otherwise the propagation from parent is
297 // repeated under the lock.
298 if ( local_count_snapshot != the_context_state_propagation_epoch ) {
299 // Another thread may be propagating state change right now. So resort to lock.
300 context_state_propagation_mutex_type::scoped_lock lock(the_context_state_propagation_mutex);
301 my_cancellation_requested = my_parent->my_cancellation_requested;
302 #if __TBB_TASK_PRIORITY
303 my_priority = my_parent->my_priority;
304 #endif /* __TBB_TASK_PRIORITY */
305 }
306 }
307 else {
308 register_with( local_sched ); // Issues full fence
309 // As we do not have grand-ancestors, concurrent state propagation (if any)
310 // may originate only from the parent context, and thus it is safe to directly
311 // copy the state from it.
312 my_cancellation_requested = my_parent->my_cancellation_requested;
313 #if __TBB_TASK_PRIORITY
314 my_priority = my_parent->my_priority;
315 #endif /* __TBB_TASK_PRIORITY */
316 }
317 __TBB_store_relaxed(my_kind, binding_completed);
318 }
319
320 #if __TBB_TASK_GROUP_CONTEXT
321 template <typename T>
propagate_task_group_state(T task_group_context::* mptr_state,task_group_context & src,T new_state)322 void task_group_context::propagate_task_group_state ( T task_group_context::*mptr_state, task_group_context& src, T new_state ) {
323 if (this->*mptr_state == new_state) {
324 // Nothing to do, whether descending from "src" or not, so no need to scan.
325 // Hopefully this happens often thanks to earlier invocations.
326 // This optimization is enabled by LIFO order in the context lists:
327 // - new contexts are bound to the beginning of lists;
328 // - descendants are newer than ancestors;
329 // - earlier invocations are therefore likely to "paint" long chains.
330 }
331 else if (this == &src) {
332 // This clause is disjunct from the traversal below, which skips src entirely.
333 // Note that src.*mptr_state is not necessarily still equal to new_state (another thread may have changed it again).
334 // Such interference is probably not frequent enough to aim for optimisation by writing new_state again (to make the other thread back down).
335 // Letting the other thread prevail may also be fairer.
336 }
337 else {
338 for ( task_group_context *ancestor = my_parent; ancestor != NULL; ancestor = ancestor->my_parent ) {
339 __TBB_ASSERT(internal::is_alive(ancestor->my_version_and_traits), "context tree was corrupted");
340 if ( ancestor == &src ) {
341 for ( task_group_context *ctx = this; ctx != ancestor; ctx = ctx->my_parent )
342 ctx->*mptr_state = new_state;
343 break;
344 }
345 }
346 }
347 }
348
349 template <typename T>
propagate_task_group_state(T task_group_context::* mptr_state,task_group_context & src,T new_state)350 void generic_scheduler::propagate_task_group_state ( T task_group_context::*mptr_state, task_group_context& src, T new_state ) {
351 spin_mutex::scoped_lock lock(my_context_list_mutex);
352 // Acquire fence is necessary to ensure that the subsequent node->my_next load
353 // returned the correct value in case it was just inserted in another thread.
354 // The fence also ensures visibility of the correct my_parent value.
355 context_list_node_t *node = __TBB_load_with_acquire(my_context_list_head.my_next);
356 while ( node != &my_context_list_head ) {
357 task_group_context &ctx = __TBB_get_object_ref(task_group_context, my_node, node);
358 if ( ctx.*mptr_state != new_state )
359 ctx.propagate_task_group_state( mptr_state, src, new_state );
360 node = node->my_next;
361 __TBB_ASSERT( is_alive(ctx.my_version_and_traits), "Local context list contains destroyed object" );
362 }
363 // Sync up local propagation epoch with the global one. Release fence prevents
364 // reordering of possible store to *mptr_state after the sync point.
365 __TBB_store_with_release(my_context_state_propagation_epoch, the_context_state_propagation_epoch);
366 }
367
368 template <typename T>
propagate_task_group_state(T task_group_context::* mptr_state,task_group_context & src,T new_state)369 bool market::propagate_task_group_state ( T task_group_context::*mptr_state, task_group_context& src, T new_state ) {
370 if ( !(src.my_state & task_group_context::may_have_children) )
371 return true;
372 // The whole propagation algorithm is under the lock in order to ensure correctness
373 // in case of concurrent state changes at the different levels of the context tree.
374 // See comment at the bottom of scheduler.cpp
375 context_state_propagation_mutex_type::scoped_lock lock(the_context_state_propagation_mutex);
376 if ( src.*mptr_state != new_state )
377 // Another thread has concurrently changed the state. Back down.
378 return false;
379 // Advance global state propagation epoch
380 __TBB_FetchAndAddWrelease(&the_context_state_propagation_epoch, 1);
381 // Propagate to all workers and masters and sync up their local epochs with the global one
382 unsigned num_workers = my_num_workers;
383 for ( unsigned i = 0; i < num_workers; ++i ) {
384 generic_scheduler *s = my_workers[i];
385 // If the worker is only about to be registered, skip it.
386 if ( s )
387 s->propagate_task_group_state( mptr_state, src, new_state );
388 }
389 // Propagate to all master threads (under my_arenas_list_mutex lock)
390 ForEachArena(a) { // uses lock on my_arenas_list_mutex
391 arena_slot &slot = a.my_slots[0];
392 generic_scheduler *s = slot.my_scheduler;
393 // If the master is under construction, skip it. Otherwise make sure that it does not
394 // leave its arena and its scheduler get destroyed while we accessing its data.
395 if ( s && as_atomic(slot.my_scheduler).compare_and_swap(LockedMaster, s) == s ) { //TODO: remove need in lock
396 __TBB_ASSERT( slot.my_scheduler == LockedMaster, NULL );
397 // The whole propagation sequence is locked, thus no contention is expected
398 __TBB_ASSERT( s != LockedMaster, NULL );
399 s->propagate_task_group_state( mptr_state, src, new_state );
400 __TBB_store_with_release( slot.my_scheduler, s );
401 }
402 } EndForEach();
403 return true;
404 }
405
406 template <typename T>
propagate_task_group_state(T task_group_context::* mptr_state,task_group_context & src,T new_state)407 bool arena::propagate_task_group_state ( T task_group_context::*mptr_state, task_group_context& src, T new_state ) {
408 return my_market->propagate_task_group_state( mptr_state, src, new_state );
409 }
410 #endif /* __TBB_TASK_GROUP_CONTEXT */
411
cancel_group_execution()412 bool task_group_context::cancel_group_execution () {
413 __TBB_ASSERT ( my_cancellation_requested == 0 || my_cancellation_requested == 1, "Invalid cancellation state");
414 if ( my_cancellation_requested || as_atomic(my_cancellation_requested).compare_and_swap(1, 0) ) {
415 // This task group and any descendants have already been canceled.
416 // (A newly added descendant would inherit its parent's my_cancellation_requested,
417 // not missing out on any cancellation still being propagated, and a context cannot be uncanceled.)
418 return false;
419 }
420 governor::local_scheduler()->my_arena->propagate_task_group_state( &task_group_context::my_cancellation_requested, *this, (uintptr_t)1 );
421 return true;
422 }
423
is_group_execution_cancelled() const424 bool task_group_context::is_group_execution_cancelled () const {
425 return my_cancellation_requested != 0;
426 }
427
428 // IMPORTANT: It is assumed that this method is not used concurrently!
reset()429 void task_group_context::reset () {
430 //! TODO: Add assertion that this context does not have children
431 // No fences are necessary since this context can be accessed from another thread
432 // only after stealing happened (which means necessary fences were used).
433 if ( my_exception ) {
434 my_exception->destroy();
435 my_exception = NULL;
436 }
437 my_cancellation_requested = 0;
438 }
439
440 #if __TBB_FP_CONTEXT
441 // IMPORTANT: It is assumed that this method is not used concurrently!
capture_fp_settings()442 void task_group_context::capture_fp_settings () {
443 //! TODO: Add assertion that this context does not have children
444 // No fences are necessary since this context can be accessed from another thread
445 // only after stealing happened (which means necessary fences were used).
446 cpu_ctl_env &ctl = *internal::punned_cast<cpu_ctl_env*>(&my_cpu_ctl_env);
447 if ( !(my_version_and_traits & fp_settings) ) {
448 new ( &ctl ) cpu_ctl_env;
449 my_version_and_traits |= fp_settings;
450 }
451 ctl.get_env();
452 }
453
copy_fp_settings(const task_group_context & src)454 void task_group_context::copy_fp_settings( const task_group_context &src ) {
455 __TBB_ASSERT( !(my_version_and_traits & fp_settings), "The context already has FPU settings." );
456 __TBB_ASSERT( src.my_version_and_traits & fp_settings, "The source context does not have FPU settings." );
457
458 cpu_ctl_env &ctl = *internal::punned_cast<cpu_ctl_env*>(&my_cpu_ctl_env);
459 cpu_ctl_env &src_ctl = *internal::punned_cast<cpu_ctl_env*>(&src.my_cpu_ctl_env);
460 new (&ctl) cpu_ctl_env( src_ctl );
461 my_version_and_traits |= fp_settings;
462 }
463 #endif /* __TBB_FP_CONTEXT */
464
register_pending_exception()465 void task_group_context::register_pending_exception () {
466 if ( my_cancellation_requested )
467 return;
468 #if TBB_USE_EXCEPTIONS
469 try {
470 throw;
471 } TbbCatchAll( this );
472 #endif /* TBB_USE_EXCEPTIONS */
473 }
474
475 #if __TBB_TASK_PRIORITY
set_priority(priority_t prio)476 void task_group_context::set_priority ( priority_t prio ) {
477 __TBB_ASSERT( prio == priority_low || prio == priority_normal || prio == priority_high, "Invalid priority level value" );
478 intptr_t p = normalize_priority(prio);
479 if ( my_priority == p && !(my_state & task_group_context::may_have_children))
480 return;
481 my_priority = p;
482 internal::generic_scheduler* s = governor::local_scheduler_if_initialized();
483 if ( !s || !s->my_arena->propagate_task_group_state(&task_group_context::my_priority, *this, p) )
484 return;
485 // Updating arena priority here does not eliminate necessity of checking each
486 // task priority and updating arena priority if necessary before the task execution.
487 // These checks will be necessary because:
488 // a) set_priority() may be invoked before any tasks from this task group are spawned;
489 // b) all spawned tasks from this task group are retrieved from the task pools.
490 // These cases create a time window when arena priority may be lowered.
491 s->my_market->update_arena_priority( *s->my_arena, p );
492 }
493
priority() const494 priority_t task_group_context::priority () const {
495 return static_cast<priority_t>(priority_from_normalized_rep[my_priority]);
496 }
497 #endif /* __TBB_TASK_PRIORITY */
498
499 #endif /* __TBB_TASK_GROUP_CONTEXT */
500
501 } // namespace tbb
502