1 /*
2 Copyright (c) 2005-2017 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15
16
17
18
19 */
20
21 #include "rml_tbb.h"
22 #define private public /* Sleazy trick to avoid publishing internal names in public header. */
23 #include "rml_omp.h"
24 #undef private
25
26 #include "tbb/tbb_allocator.h"
27 #include "tbb/cache_aligned_allocator.h"
28 #include "tbb/aligned_space.h"
29 #include "tbb/atomic.h"
30 #include "tbb/spin_mutex.h"
31 #include "tbb/tbb_misc.h" // Get AvailableHwConcurrency() from here.
32 #if _MSC_VER==1500 && !defined(__INTEL_COMPILER)
33 // VS2008/VC9 seems to have an issue;
34 #pragma warning( push )
35 #pragma warning( disable: 4985 )
36 #endif
37 #include "tbb/concurrent_vector.h"
38 #if _MSC_VER==1500 && !defined(__INTEL_COMPILER)
39 #pragma warning( pop )
40 #endif
41 #if _MSC_VER && defined(_Wp64)
42 // Workaround for overzealous compiler warnings
43 #pragma warning (push)
44 #pragma warning (disable: 4244)
45 #endif
46
47 #include "job_automaton.h"
48 #include "wait_counter.h"
49 #include "thread_monitor.h"
50
51 #if RML_USE_WCRM
52 #include <concrt.h>
53 #include <concrtrm.h>
54 using namespace Concurrency;
55 #include <vector>
56 #include <hash_map>
57 #define __RML_REMOVE_VIRTUAL_PROCESSORS_DISABLED 0
58 #endif /* RML_USE_WCRM */
59
60 #define STRINGIFY(x) #x
61 #define TOSTRING(x) STRINGIFY(x)
62
63 namespace rml {
64 namespace internal {
65
66 using tbb::internal::rml::tbb_client;
67 using tbb::internal::rml::tbb_server;
68
69 using __kmp::rml::omp_client;
70 using __kmp::rml::omp_server;
71
72 typedef versioned_object::version_type version_type;
73
74 #define SERVER_VERSION 2
75 #define EARLIEST_COMPATIBLE_CLIENT_VERSION 2
76
77 static const size_t cache_line_size = tbb::internal::NFS_MaxLineSize;
78
79 template<typename Server, typename Client> class generic_connection;
80 class tbb_connection_v2;
81 class omp_connection_v2;
82
83 #if RML_USE_WCRM
84 //! State of a server_thread
85 /** Below are diagrams of legal state transitions.
86
87 ts_busy
88 ^ ^
89 / \
90 / V
91 ts_done <----- ts_asleep <------> ts_idle
92 */
93
94 enum thread_state_t {
95 ts_idle,
96 ts_asleep,
97 ts_busy,
98 ts_done
99 };
100
101 //! Extra state of an omp server thread
102 enum thread_extra_state_t {
103 ts_none,
104 ts_removed,
105 ts_lent
106 };
107
108 //! Results from try_grab_for()
109 enum thread_grab_t {
110 wk_failed,
111 wk_from_asleep,
112 wk_from_idle
113 };
114
115 #else /* !RML_USE_WCRM */
116
117 //! State of a server_thread
118 /** Below are diagrams of legal state transitions.
119
120 OMP
121 ts_omp_busy
122 ^ ^
123 / \
124 / V
125 ts_asleep <-----------> ts_idle
126
127
128 ts_deactivated
129 ^ ^
130 / \
131 V \
132 ts_none <--------------> ts_reactivated
133
134 TBB
135 ts_tbb_busy
136 ^ ^
137 / \
138 / V
139 ts_asleep <-----------> ts_idle --> ts_done
140
141 For TBB only. Extra state transition.
142
143 ts_created -> ts_started -> ts_visited
144 */
145 enum thread_state_t {
146 //! Thread not doing anything useful, but running and looking for work.
147 ts_idle,
148 //! Thread not doing anything useful and is asleep */
149 ts_asleep,
150 //! Thread is enlisted into OpenMP team
151 ts_omp_busy,
152 //! Thread is busy doing TBB work.
153 ts_tbb_busy,
154 //! For tbb threads only
155 ts_done,
156 ts_created,
157 ts_started,
158 ts_visited,
159 //! For omp threads only
160 ts_none,
161 ts_deactivated,
162 ts_reactivated
163 };
164 #endif /* RML_USE_WCRM */
165
166 #if TBB_USE_ASSERT
167 #define PRODUCE_ARG(x) ,x
168 #else
169 #define PRODUCE_ARG(x)
170 #endif /* TBB_USE_ASSERT */
171
172 //! Synchronizes dispatch of OpenMP work.
173 class omp_dispatch_type {
174 typedef ::rml::job job_type;
175 omp_client* client;
176 void* cookie;
177 omp_client::size_type index;
178 tbb::atomic<job_type*> job;
179 #if TBB_USE_ASSERT
180 omp_connection_v2* server;
181 #endif /* TBB_USE_ASSERT */
182 public:
omp_dispatch_type()183 omp_dispatch_type() {job=NULL;}
184 void consume();
produce(omp_client & c,job_type * j,void * cookie_,omp_client::size_type index_ PRODUCE_ARG (omp_connection_v2 & s))185 void produce( omp_client& c, job_type* j, void* cookie_, omp_client::size_type index_ PRODUCE_ARG( omp_connection_v2& s )) {
186 __TBB_ASSERT( j, NULL );
187 __TBB_ASSERT( !job, "job already set" );
188 client = &c;
189 #if TBB_USE_ASSERT
190 server = &s;
191 #endif /* TBB_USE_ASSERT */
192 cookie = cookie_;
193 index = index_;
194 // Must be last
195 job = j;
196 }
197 };
198
199 //! A reference count.
200 /** No default constructor, because users of ref_count must be very careful about whether the
201 initial reference count is 0 or 1. */
202 class ref_count: no_copy {
203 friend class thread_map;
204 tbb::atomic<int> my_ref_count;
205 public:
ref_count(int k)206 ref_count(int k ) {my_ref_count=k;}
~ref_count()207 ~ref_count() {__TBB_ASSERT( !my_ref_count, "premature destruction of refcounted object" );}
208 //! Add one and return new value.
add_ref()209 int add_ref() {
210 int k = ++my_ref_count;
211 __TBB_ASSERT(k>=1,"reference count underflowed before add_ref");
212 return k;
213 }
214 //! Subtract one and return new value.
remove_ref()215 int remove_ref() {
216 int k = --my_ref_count;
217 __TBB_ASSERT(k>=0,"reference count underflow");
218 return k;
219 }
220 };
221
222 #if RML_USE_WCRM
223
224 #if USE_UMS_THREAD
225 #define RML_THREAD_KIND UmsThreadDefault
226 #define RML_THREAD_KIND_STRING "UmsThread"
227 #else
228 #define RML_THREAD_KIND ThreadScheduler
229 #define RML_THREAD_KIND_STRING "WinThread"
230 #endif
231
232 // Forward declaration
233 class thread_map;
234
235 static const IExecutionResource* c_remove_prepare = (IExecutionResource*)0;
236 static const IExecutionResource* c_remove_returned = (IExecutionResource*)1;
237
238 //! Server thread representation
239 class server_thread_rep : no_copy {
240 friend class thread_map;
241 friend class omp_connection_v2;
242 friend class server_thread;
243 friend class tbb_server_thread;
244 friend class omp_server_thread;
245 template<typename Connection> friend void make_job( Connection& c, typename Connection::server_thread_type& t );
246 typedef int thread_state_rep_t;
247 public:
248 //! Ctor
server_thread_rep(bool assigned,IScheduler * s,IExecutionResource * r,thread_map & map,rml::client & cl)249 server_thread_rep( bool assigned, IScheduler* s, IExecutionResource* r, thread_map& map, rml::client& cl ) :
250 uid( GetExecutionContextId() ), my_scheduler(s), my_proxy(NULL),
251 my_thread_map(map), my_client(cl), my_job(NULL)
252 {
253 my_state = assigned ? ts_busy : ts_idle;
254 my_extra_state = ts_none;
255 terminate = false;
256 my_execution_resource = r;
257 }
258 //! Dtor
~server_thread_rep()259 ~server_thread_rep() {}
260
261 //! Synchronization routine
wait_for_job()262 inline rml::job* wait_for_job() {
263 if( !my_job ) my_job = my_job_automaton.wait_for_job();
264 return my_job;
265 }
266
267 // Getters and setters
read_state() const268 inline thread_state_t read_state() const { thread_state_rep_t s = my_state; return static_cast<thread_state_t>(s); }
set_state(thread_state_t to)269 inline void set_state( thread_state_t to ) {my_state = to;}
set_removed()270 inline void set_removed() { __TBB_ASSERT( my_extra_state==ts_none, NULL ); my_extra_state = ts_removed; }
is_removed() const271 inline bool is_removed() const { return my_extra_state==ts_removed; }
is_lent() const272 inline bool is_lent() const {return my_extra_state==ts_lent;}
set_lent()273 inline void set_lent() { my_extra_state=ts_lent; }
set_returned()274 inline void set_returned() { my_extra_state=ts_none; }
get_execution_resource()275 inline IExecutionResource* get_execution_resource() { return my_execution_resource; }
get_virtual_processor()276 inline IVirtualProcessorRoot* get_virtual_processor() { return (IVirtualProcessorRoot*)get_execution_resource(); }
277
278 //! Enlist the thread for work
wakeup(thread_state_t to,thread_state_t from)279 inline bool wakeup( thread_state_t to, thread_state_t from ) {
280 __TBB_ASSERT( from==ts_asleep && (to==ts_idle||to==ts_busy||to==ts_done), NULL );
281 return my_state.compare_and_swap( to, from )==from;
282 }
283
284 //! Enlist the thread for.
285 thread_grab_t try_grab_for();
286
287 //! Destroy the client job associated with the thread
288 template<typename Connection> bool destroy_job( Connection* c );
289
290 //! Try to re-use the thread
revive(IScheduler * s,IExecutionResource * r,rml::client & c)291 void revive( IScheduler* s, IExecutionResource* r, rml::client& c ) {
292 // the variables may not have been set before a thread was told to quit
293 __TBB_ASSERT( my_scheduler==s, "my_scheduler has been altered?\n" );
294 my_scheduler = s;
295 __TBB_ASSERT( &my_client==&c, "my_client has been altered?\n" );
296 if( r ) my_execution_resource = r;
297 my_client = c;
298 my_state = ts_idle;
299 __TBB_ASSERT( my_extra_state==ts_removed, NULL );
300 my_extra_state = ts_none;
301 }
302
303 protected:
304 const int uid;
305 IScheduler* my_scheduler;
306 IThreadProxy* my_proxy;
307 tbb::atomic<IExecutionResource*> my_execution_resource; /* for non-masters, it is IVirtualProcessorRoot */
308 thread_map& my_thread_map;
309 rml::client& my_client;
310 job* my_job;
311 job_automaton my_job_automaton;
312 tbb::atomic<bool> terminate;
313 tbb::atomic<thread_state_rep_t> my_state;
314 tbb::atomic<thread_extra_state_t> my_extra_state;
315 };
316
317 //! Class that implements IExecutionContext
318 class server_thread : public IExecutionContext, public server_thread_rep {
319 friend class tbb_connection_v2;
320 friend class omp_connection_v2;
321 friend class tbb_server_thread;
322 friend class omp_server_thread;
323 friend class thread_map;
324 template<typename Connection> friend void make_job( Connection& c, typename Connection::server_thread_type& t );
325 protected:
server_thread(bool is_tbb,bool assigned,IScheduler * s,IExecutionResource * r,thread_map & map,rml::client & cl)326 server_thread( bool is_tbb, bool assigned, IScheduler* s, IExecutionResource* r, thread_map& map, rml::client& cl ) : server_thread_rep(assigned,s,r,map,cl), tbb_thread(is_tbb) {}
~server_thread()327 ~server_thread() {}
GetId() const328 unsigned int GetId() const __TBB_override { return uid; }
GetScheduler()329 IScheduler* GetScheduler() __TBB_override { return my_scheduler; }
GetProxy()330 IThreadProxy* GetProxy() __TBB_override { return my_proxy; }
SetProxy(IThreadProxy * thr_proxy)331 void SetProxy( IThreadProxy* thr_proxy ) __TBB_override { my_proxy = thr_proxy; }
332
333 private:
334 bool tbb_thread;
335 };
336
337 // Forward declaration
338 class tbb_connection_v2;
339 class omp_connection_v2;
340
341 //! TBB server thread
342 class tbb_server_thread : public server_thread {
343 friend class tbb_connection_v2;
344 public:
tbb_server_thread(bool assigned,IScheduler * s,IExecutionResource * r,tbb_connection_v2 * con,thread_map & map,rml::client & cl)345 tbb_server_thread( bool assigned, IScheduler* s, IExecutionResource* r, tbb_connection_v2* con, thread_map& map, rml::client& cl ) : server_thread(true,assigned,s,r,map,cl), my_conn(con) {
346 activation_count = 0;
347 }
~tbb_server_thread()348 ~tbb_server_thread() {}
349 void Dispatch( DispatchState* ) __TBB_override;
350 inline bool initiate_termination();
351 bool sleep_perhaps();
352 //! Switch out this thread
353 bool switch_out();
354 private:
355 tbb_connection_v2* my_conn;
356 public:
357 tbb::atomic<int> activation_count;
358 };
359
360 //! OMP server thread
361 class omp_server_thread : public server_thread {
362 friend class omp_connection_v2;
363 public:
omp_server_thread(bool assigned,IScheduler * s,IExecutionResource * r,omp_connection_v2 * con,thread_map & map,rml::client & cl)364 omp_server_thread( bool assigned, IScheduler* s, IExecutionResource* r, omp_connection_v2* con, thread_map& map, rml::client& cl ) :
365 server_thread(false,assigned,s,r,map,cl), my_conn(con), my_cookie(NULL), my_index(UINT_MAX) {}
~omp_server_thread()366 ~omp_server_thread() {}
367 void Dispatch( DispatchState* ) __TBB_override;
get_cookie()368 inline void* get_cookie() {return my_cookie;}
get_index()369 inline ::__kmp::rml::omp_client::size_type get_index() {return my_index;}
370
get_execution_resource()371 inline IExecutionResource* get_execution_resource() { return get_execution_resource(); }
initiate_termination()372 inline bool initiate_termination() { return destroy_job( (omp_connection_v2*) my_conn ); }
373 void sleep_perhaps();
374 private:
375 omp_connection_v2* my_conn;
376 void* my_cookie;
377 ::__kmp::rml::omp_client::size_type my_index;
378 omp_dispatch_type omp_data;
379 };
380
381 //! Class that implements IScheduler
382 template<typename Connection>
383 class scheduler : no_copy, public IScheduler {
384 public:
GetId() const385 unsigned int GetId() const __TBB_override {return uid;}
Statistics(unsigned int *,unsigned int *,unsigned int *)386 void Statistics( unsigned int* /*pTaskCompletionRate*/, unsigned int* /*pTaskArrivalRate*/, unsigned int* /*pNumberOfTaskEnqueued*/) __TBB_override {}
GetPolicy() const387 SchedulerPolicy GetPolicy() const __TBB_override { __TBB_ASSERT(my_policy,NULL); return *my_policy; }
AddVirtualProcessors(IVirtualProcessorRoot ** vproots,unsigned int count)388 void AddVirtualProcessors( IVirtualProcessorRoot** vproots, unsigned int count ) __TBB_override { if( !my_conn.is_closing() ) my_conn.add_virtual_processors( vproots, count); }
389 void RemoveVirtualProcessors( IVirtualProcessorRoot** vproots, unsigned int count ) __TBB_override;
NotifyResourcesExternallyIdle(IVirtualProcessorRoot ** vproots,unsigned int count)390 void NotifyResourcesExternallyIdle( IVirtualProcessorRoot** vproots, unsigned int count ) __TBB_override { __TBB_ASSERT( false, "This call is not allowed for TBB" ); }
NotifyResourcesExternallyBusy(IVirtualProcessorRoot ** vproots,unsigned int count)391 void NotifyResourcesExternallyBusy( IVirtualProcessorRoot** vproots, unsigned int count ) __TBB_override { __TBB_ASSERT( false, "This call is not allowed for TBB" ); }
392 protected:
393 scheduler( Connection& conn );
~scheduler()394 virtual ~scheduler() { __TBB_ASSERT( my_policy, NULL ); delete my_policy; }
395
396 public:
create(Connection & conn)397 static scheduler* create( Connection& conn ) {return new scheduler( conn );}
398
399 private:
400 const int uid;
401 Connection& my_conn;
402 SchedulerPolicy* my_policy;
403 };
404
405
406 /*
407 * --> ts_busy --> ts_done
408 */
409 class thread_scavenger_thread : public IExecutionContext, no_copy {
410 public:
thread_scavenger_thread(IScheduler * s,IVirtualProcessorRoot * r,thread_map & map)411 thread_scavenger_thread( IScheduler* s, IVirtualProcessorRoot* r, thread_map& map ) :
412 uid( GetExecutionContextId() ), my_scheduler(s), my_virtual_processor_root(r), my_proxy(NULL), my_thread_map(map)
413 {
414 my_state = ts_busy;
415 #if TBB_USE_ASSERT
416 activation_count = 0;
417 #endif
418 }
~thread_scavenger_thread()419 ~thread_scavenger_thread() {}
GetId() const420 unsigned int GetId() const __TBB_override { return uid; }
GetScheduler()421 IScheduler* GetScheduler() __TBB_override { return my_scheduler; }
GetProxy()422 IThreadProxy* GetProxy() __TBB_override { return my_proxy; }
SetProxy(IThreadProxy * thr_proxy)423 void SetProxy( IThreadProxy* thr_proxy ) __TBB_override { my_proxy = thr_proxy; }
424 void Dispatch( DispatchState* ) __TBB_override;
read_state()425 inline thread_state_t read_state() { return my_state; }
set_state(thread_state_t s)426 inline void set_state( thread_state_t s ) { my_state = s; }
get_virtual_processor()427 inline IVirtualProcessorRoot* get_virtual_processor() { return my_virtual_processor_root; }
428 private:
429 const int uid;
430 IScheduler* my_scheduler;
431 IVirtualProcessorRoot* my_virtual_processor_root;
432 IThreadProxy* my_proxy;
433 thread_map& my_thread_map;
434 tbb::atomic<thread_state_t> my_state;
435 #if TBB_USE_ASSERT
436 public:
437 tbb::atomic<int> activation_count;
438 #endif
439 };
440
441 static const thread_scavenger_thread* c_claimed = reinterpret_cast<thread_scavenger_thread*>(1);
442
443 struct garbage_connection_queue {
444 tbb::atomic<uintptr_t> head;
445 tbb::atomic<uintptr_t> tail;
446 static const uintptr_t empty = 0; // connection scavenger thread empty list
447 static const uintptr_t plugged = 1; // end of use of the list
448 static const uintptr_t plugged_acked = 2; // connection scavenger saw the plugged flag, and it freed all connections
449 };
450
451 //! Connection scavenger
452 /** It collects closed connection objects, wait for worker threads belonging to the connection to return to ConcRT RM
453 * then return the object to the memory manager.
454 */
455 class connection_scavenger_thread {
456 friend void assist_cleanup_connections();
457 /*
458 * connection_scavenger_thread's state
459 * ts_busy <----> ts_asleep <--
460 */
461 tbb::atomic<thread_state_t> state;
462
463 /* We steal two bits from a connection pointer to encode
464 * whether the connection is for TBB or for OMP.
465 *
466 * ----------------------------------
467 * | | | |
468 * ----------------------------------
469 * ^ ^
470 * / |
471 * 1 : tbb, 0 : omp |
472 * if set, terminate
473 */
474 // FIXME: pad these?
475 thread_monitor monitor;
476 HANDLE thr_handle;
477 #if TBB_USE_ASSERT
478 tbb::atomic<int> n_scavenger_threads;
479 #endif
480
481 public:
connection_scavenger_thread()482 connection_scavenger_thread() : thr_handle(NULL) {
483 state = ts_asleep;
484 #if TBB_USE_ASSERT
485 n_scavenger_threads = 0;
486 #endif
487 }
488
~connection_scavenger_thread()489 ~connection_scavenger_thread() {}
490
wakeup()491 void wakeup() {
492 if( state.compare_and_swap( ts_busy, ts_asleep )==ts_asleep )
493 monitor.notify();
494 }
495
496 void sleep_perhaps();
497
498 void process_requests( uintptr_t conn_ex );
499
500 static __RML_DECL_THREAD_ROUTINE thread_routine( void* arg );
501
launch()502 void launch() {
503 thread_monitor::launch( connection_scavenger_thread::thread_routine, this, NULL );
504 }
505
506 template<typename Server, typename Client>
507 void add_request( generic_connection<Server,Client>* conn_to_close );
508
509 template<typename Server, typename Client>
510 uintptr_t grab_and_prepend( generic_connection<Server,Client>* last_conn_to_close );
511 };
512
513 void free_all_connections( uintptr_t );
514
515 #endif /* RML_USE_WCRM */
516
517 #if !RML_USE_WCRM
518 class server_thread;
519
520 //! thread_map_base; we need to make the iterator type available to server_thread
521 struct thread_map_base {
522 //! A value in the map
523 class value_type {
524 public:
thread()525 server_thread& thread() {
526 __TBB_ASSERT( my_thread, "thread_map::value_type::thread() called when !my_thread" );
527 return *my_thread;
528 }
job()529 rml::job& job() {
530 __TBB_ASSERT( my_job, "thread_map::value_type::job() called when !my_job" );
531 return *my_job;
532 }
value_type()533 value_type() : my_thread(NULL), my_job(NULL) {}
wait_for_thread() const534 server_thread& wait_for_thread() const {
535 for(;;) {
536 server_thread* ptr=const_cast<server_thread*volatile&>(my_thread);
537 if( ptr )
538 return *ptr;
539 __TBB_Yield();
540 }
541 }
542 /** Shortly after when a connection is established, it is possible for the server
543 to grab a server_thread that has not yet created a job object for that server. */
wait_for_job() const544 rml::job* wait_for_job() const {
545 if( !my_job ) {
546 my_job = my_automaton.wait_for_job();
547 }
548 return my_job;
549 }
550 private:
551 server_thread* my_thread;
552 /** Marked mutable because though it is physically modified, conceptually it is a duplicate of
553 the job held by job_automaton. */
554 mutable rml::job* my_job;
555 job_automaton my_automaton;
556 // FIXME - pad out to cache line, because my_automaton is hit hard by thread()
557 friend class thread_map;
558 };
559 typedef tbb::concurrent_vector<value_type,tbb::zero_allocator<value_type,tbb::cache_aligned_allocator> > array_type;
560 };
561 #endif /* !RML_USE_WCRM */
562
563 #if _MSC_VER && !defined(__INTEL_COMPILER)
564 // Suppress overzealous compiler warnings about uninstantiable class
565 #pragma warning(push)
566 #pragma warning(disable:4510 4610)
567 #endif
568
569 template<typename T>
570 class padded: public T {
571 char pad[cache_line_size - sizeof(T)%cache_line_size];
572 };
573
574 #if _MSC_VER && !defined(__INTEL_COMPILER)
575 #pragma warning(pop)
576 #endif
577
578 // FIXME - should we pad out memory to avoid false sharing of our global variables?
579 static unsigned the_default_concurrency;
580 static tbb::atomic<int> the_balance;
581 static tbb::atomic<tbb::internal::do_once_state> rml_module_state;
582
583 #if !RML_USE_WCRM
584 //! Per thread information
585 /** ref_count holds number of clients that are using this,
586 plus 1 if a host thread owns this instance. */
587 class server_thread: public ref_count {
588 friend class thread_map;
589 template<typename Server, typename Client> friend class generic_connection;
590 friend class tbb_connection_v2;
591 friend class omp_connection_v2;
592 //! Integral type that can hold a thread_state_t
593 typedef int thread_state_rep_t;
594 tbb::atomic<thread_state_rep_t> state;
595 public:
596 thread_monitor monitor;
597 private:
598 bool is_omp_thread;
599 tbb::atomic<thread_state_rep_t> my_extra_state;
600 server_thread* link;
601 thread_map_base::array_type::iterator my_map_pos;
602 rml::server *my_conn;
603 rml::job* my_job;
604 job_automaton* my_ja;
605 size_t my_index;
606 tbb::atomic<bool> terminate;
607 omp_dispatch_type omp_dispatch;
608
609 #if TBB_USE_ASSERT
610 //! Flag used to check if thread is still using *this.
611 bool has_active_thread;
612 #endif /* TBB_USE_ASSERT */
613
614 //! Volunteer to sleep.
615 void sleep_perhaps( thread_state_t asleep );
616
617 //! Destroy job corresponding to given client
618 /** Return true if thread must quit. */
619 template<typename Connection>
620 bool destroy_job( Connection& c );
621
622 //! Do terminate the thread
623 /** Return true if thread must quit. */
624 bool do_termination();
625
626 void loop();
627 static __RML_DECL_THREAD_ROUTINE thread_routine( void* arg );
628
629 public:
630 server_thread();
631
632 ~server_thread();
633
634 //! Read the thread state
read_state() const635 thread_state_t read_state() const {
636 thread_state_rep_t s = state;
637 __TBB_ASSERT( unsigned(s)<=unsigned(ts_done), "corrupted server thread?" );
638 return thread_state_t(s);
639 }
640
641 //! Read the tbb-specific extra thread state
read_extra_state() const642 thread_state_t read_extra_state() const {
643 thread_state_rep_t s = my_extra_state;
644 return thread_state_t(s);
645 }
646
647 //! Launch a thread that is bound to *this.
648 void launch( size_t stack_size );
649
650 //! Attempt to wakeup a thread
651 /** The value "to" is the new state for the thread, if it was woken up.
652 Returns true if thread was woken up, false otherwise. */
653 bool wakeup( thread_state_t to, thread_state_t from );
654
655 //! Attempt to enslave a thread for OpenMP/TBB.
656 /** Returns true if state is successfully changed. 's' takes either ts_omp_busy or ts_tbb_busy */
657 bool try_grab_for( thread_state_t s );
658
659 #if _WIN32||_WIN64
660 //! Send the worker thread to sleep temporarily
661 void deactivate();
662
663 //! Wake the worker thread up
664 void reactivate();
665 #endif /* _WIN32||_WIN64 */
666 };
667
668 //! Bag of threads that are private to a client.
669 class private_thread_bag {
670 struct list_thread: server_thread {
671 list_thread* next;
672 };
673 //! Root of atomic linked list of list_thread
674 /** ABA problem is avoided because items are only atomically pushed, never popped. */
675 tbb::atomic<list_thread*> my_root;
676 tbb::cache_aligned_allocator<padded<list_thread> > my_allocator;
677 public:
678 //! Construct empty bag
private_thread_bag()679 private_thread_bag() {my_root=NULL;}
680
681 //! Create a fresh server_thread object.
add_one_thread()682 server_thread& add_one_thread() {
683 list_thread* t = my_allocator.allocate(1);
684 new( t ) list_thread;
685 // Atomically add to list
686 list_thread* old_root;
687 do {
688 old_root = my_root;
689 t->next = old_root;
690 } while( my_root.compare_and_swap( t, old_root )!=old_root );
691 return *t;
692 }
693
694 //! Destroy the bag and threads in it.
~private_thread_bag()695 ~private_thread_bag() {
696 while( my_root ) {
697 // Unlink thread from list.
698 list_thread* t = my_root;
699 my_root = t->next;
700 // Destroy and deallocate the thread.
701 t->~list_thread();
702 my_allocator.deallocate(static_cast<padded<list_thread>*>(t),1);
703 }
704 }
705 };
706
707 //! Forward declaration
708 void wakeup_some_tbb_threads();
709
710 //! Type-independent part of class generic_connection.
711 /** One to one map from server threads to jobs, and associated reference counting. */
712 class thread_map : public thread_map_base {
713 public:
714 typedef rml::client::size_type size_type;
715 //! ctor
thread_map(wait_counter & fc,::rml::client & client)716 thread_map( wait_counter& fc, ::rml::client& client ) :
717 all_visited_at_least_once(false), my_min_stack_size(0), my_server_ref_count(1),
718 my_client_ref_count(1), my_client(client), my_factory_counter(fc)
719 { my_unrealized_threads = 0; }
720 //! dtor
~thread_map()721 ~thread_map() {}
722 typedef array_type::iterator iterator;
begin()723 iterator begin() {return my_array.begin();}
end()724 iterator end() {return my_array.end();}
725 void bind();
726 void unbind();
727 void assist_cleanup( bool assist_null_only );
728
729 /** Returns number of unrealized threads to create. */
730 size_type wakeup_tbb_threads( size_type n );
731 bool wakeup_next_thread( iterator i, tbb_connection_v2& conn );
732 void release_tbb_threads( server_thread* t );
733 void adjust_balance( int delta );
734
735 //! Add a server_thread object to the map, but do not bind it.
736 /** Return NULL if out of unrealized threads. */
737 value_type* add_one_thread( bool is_omp_thread_ );
738
739 void bind_one_thread( rml::server& server, value_type& x );
740
741 void remove_client_ref();
add_server_ref()742 int add_server_ref() {return my_server_ref_count.add_ref();}
remove_server_ref()743 int remove_server_ref() {return my_server_ref_count.remove_ref();}
744
client() const745 ::rml::client& client() const {return my_client;}
746
get_unrealized_threads()747 size_type get_unrealized_threads() { return my_unrealized_threads; }
748
749 private:
750 private_thread_bag my_private_threads;
751 bool all_visited_at_least_once;
752 array_type my_array;
753 size_t my_min_stack_size;
754 tbb::atomic<size_type> my_unrealized_threads;
755
756 //! Number of threads referencing *this, plus one extra.
757 /** When it becomes zero, the containing server object can be safely deleted. */
758 ref_count my_server_ref_count;
759
760 //! Number of jobs that need cleanup, plus one extra.
761 /** When it becomes zero, acknowledge_close_connection is called. */
762 ref_count my_client_ref_count;
763
764 ::rml::client& my_client;
765 //! Counter owned by factory that produced this thread_map.
766 wait_counter& my_factory_counter;
767 };
768
bind_one_thread(rml::server & server,value_type & x)769 void thread_map::bind_one_thread( rml::server& server, value_type& x ) {
770 // Add one to account for the thread referencing this map hereforth.
771 server_thread& t = x.thread();
772 my_server_ref_count.add_ref();
773 my_client_ref_count.add_ref();
774 #if TBB_USE_ASSERT
775 __TBB_ASSERT( t.add_ref()==1, NULL );
776 #else
777 t.add_ref();
778 #endif
779 // Have responsibility to start the thread.
780 t.my_conn = &server;
781 t.my_ja = &x.my_automaton;
782 t.launch( my_min_stack_size );
783 /* Must wake thread up so it can fill in its "my_job" field in *this.
784 Otherwise deadlock can occur where wait_for_job spins on thread that is sleeping. */
785 __TBB_ASSERT( t.state!=ts_tbb_busy, NULL );
786 t.wakeup( ts_idle, ts_asleep );
787 }
788
add_one_thread(bool is_omp_thread_)789 thread_map::value_type* thread_map::add_one_thread( bool is_omp_thread_ ) {
790 size_type u;
791 do {
792 u = my_unrealized_threads;
793 if( !u ) return NULL;
794 } while( my_unrealized_threads.compare_and_swap(u-1,u)!=u );
795 server_thread& t = my_private_threads.add_one_thread();
796 t.is_omp_thread = is_omp_thread_;
797 __TBB_ASSERT( u>=1, NULL );
798 t.my_index = u - 1;
799 __TBB_ASSERT( t.state!=ts_tbb_busy, NULL );
800 t.my_extra_state = t.is_omp_thread ? ts_none : ts_created;
801
802 iterator i = t.my_map_pos = my_array.grow_by(1);
803 value_type& v = *i;
804 v.my_thread = &t;
805 return &v;
806 }
807
bind()808 void thread_map::bind() {
809 ++my_factory_counter;
810 my_min_stack_size = my_client.min_stack_size();
811 __TBB_ASSERT( my_unrealized_threads==0, "already called bind?" );
812 my_unrealized_threads = my_client.max_job_count();
813 }
814
unbind()815 void thread_map::unbind() {
816 // Ask each server_thread to cleanup its job for this server.
817 for( iterator i=begin(); i!=end(); ++i ) {
818 server_thread& t = i->thread();
819 t.terminate = true;
820 t.wakeup( ts_idle, ts_asleep );
821 }
822 // Remove extra ref to client.
823 remove_client_ref();
824 }
825
assist_cleanup(bool assist_null_only)826 void thread_map::assist_cleanup( bool assist_null_only ) {
827 // To avoid deadlock, the current thread *must* help out with cleanups that have not started,
828 // because the thread that created the job may be busy for a long time.
829 for( iterator i = begin(); i!=end(); ++i ) {
830 rml::job* j=0;
831 job_automaton& ja = i->my_automaton;
832 if( assist_null_only ? ja.try_plug_null() : ja.try_plug(j) ) {
833 if( j ) {
834 my_client.cleanup(*j);
835 } else {
836 // server thread did not get a chance to create a job.
837 }
838 remove_client_ref();
839 }
840 }
841 }
842
wakeup_tbb_threads(size_type n)843 thread_map::size_type thread_map::wakeup_tbb_threads( size_type n ) {
844 __TBB_ASSERT(n>0,"must specify positive number of threads to wake up");
845 iterator e = end();
846 for( iterator k=begin(); k!=e; ++k ) {
847 // If another thread added *k, there is a tiny timing window where thread() is invalid.
848 server_thread& t = k->wait_for_thread();
849 thread_state_t thr_s = t.read_state();
850 if( t.read_extra_state()==ts_created || thr_s==ts_tbb_busy || thr_s==ts_done )
851 continue;
852 if( --the_balance>=0 ) { // try to withdraw a coin from the deposit
853 while( !t.try_grab_for( ts_tbb_busy ) ) {
854 thr_s = t.read_state();
855 if( thr_s==ts_tbb_busy || thr_s==ts_done ) {
856 // we lost; move on to the next.
857 ++the_balance;
858 goto skip;
859 }
860 }
861 if( --n==0 )
862 return 0;
863 } else {
864 // overdraft.
865 ++the_balance;
866 break;
867 }
868 skip:
869 ;
870 }
871 return n<my_unrealized_threads ? n : size_type(my_unrealized_threads);
872 }
873 #else /* RML_USE_WCRM */
874
875 class thread_map : no_copy {
876 friend class omp_connection_v2;
877 typedef ::std::hash_map<uintptr_t,server_thread*> hash_map_type;
878 size_t my_min_stack_size;
879 size_t my_unrealized_threads;
880 ::rml::client& my_client;
881 //! Counter owned by factory that produced this thread_map.
882 wait_counter& my_factory_counter;
883 //! Ref counters
884 ref_count my_server_ref_count;
885 ref_count my_client_ref_count;
886 // FIXME: pad this?
887 hash_map_type my_map;
888 bool shutdown_in_progress;
889 std::vector<IExecutionResource*> original_exec_resources;
890 tbb::cache_aligned_allocator<padded<tbb_server_thread> > my_tbb_allocator;
891 tbb::cache_aligned_allocator<padded<omp_server_thread> > my_omp_allocator;
892 tbb::cache_aligned_allocator<padded<thread_scavenger_thread> > my_scavenger_allocator;
893 IResourceManager* my_concrt_resource_manager;
894 IScheduler* my_scheduler;
895 ISchedulerProxy* my_scheduler_proxy;
896 tbb::atomic<thread_scavenger_thread*> my_thread_scavenger_thread;
897 #if TBB_USE_ASSERT
898 tbb::atomic<int> n_add_vp_requests;
899 tbb::atomic<int> n_thread_scavengers_created;
900 #endif
901 public:
thread_map(wait_counter & fc,::rml::client & client)902 thread_map( wait_counter& fc, ::rml::client& client ) :
903 my_min_stack_size(0), my_client(client), my_factory_counter(fc),
904 my_server_ref_count(1), my_client_ref_count(1), shutdown_in_progress(false),
905 my_concrt_resource_manager(NULL), my_scheduler(NULL), my_scheduler_proxy(NULL)
906 {
907 my_thread_scavenger_thread = NULL;
908 #if TBB_USE_ASSERT
909 n_add_vp_requests = 0;
910 n_thread_scavengers_created;
911 #endif
912 }
913
~thread_map()914 ~thread_map() {
915 __TBB_ASSERT( n_thread_scavengers_created<=1, "too many scavenger thread created" );
916 // if thread_scavenger_thread is launched, wait for it to complete
917 if( my_thread_scavenger_thread ) {
918 __TBB_ASSERT( my_thread_scavenger_thread!=c_claimed, NULL );
919 while( my_thread_scavenger_thread->read_state()==ts_busy )
920 __TBB_Yield();
921 thread_scavenger_thread* tst = my_thread_scavenger_thread;
922 my_scavenger_allocator.deallocate(static_cast<padded<thread_scavenger_thread>*>(tst),1);
923 }
924 // deallocate thread contexts
925 for( hash_map_type::const_iterator hi=my_map.begin(); hi!=my_map.end(); ++hi ) {
926 server_thread* thr = hi->second;
927 if( thr->tbb_thread ) {
928 while( ((tbb_server_thread*)thr)->activation_count>1 )
929 __TBB_Yield();
930 ((tbb_server_thread*)thr)->~tbb_server_thread();
931 my_tbb_allocator.deallocate(static_cast<padded<tbb_server_thread>*>(thr),1);
932 } else {
933 ((omp_server_thread*)thr)->~omp_server_thread();
934 my_omp_allocator.deallocate(static_cast<padded<omp_server_thread>*>(thr),1);
935 }
936 }
937 if( my_scheduler_proxy ) {
938 my_scheduler_proxy->Shutdown();
939 my_concrt_resource_manager->Release();
940 __TBB_ASSERT( my_scheduler, NULL );
941 delete my_scheduler;
942 } else {
943 __TBB_ASSERT( !my_scheduler, NULL );
944 }
945 }
946 typedef hash_map_type::key_type key_type;
947 typedef hash_map_type::value_type value_type;
948 typedef hash_map_type::iterator iterator;
begin()949 iterator begin() {return my_map.begin();}
end()950 iterator end() {return my_map.end();}
find(key_type k)951 iterator find( key_type k ) {return my_map.find( k );}
insert(key_type k,server_thread * v)952 iterator insert( key_type k, server_thread* v ) {
953 std::pair<iterator,bool> res = my_map.insert( value_type(k,v) );
954 return res.first;
955 }
bind(IScheduler * s)956 void bind( IScheduler* s ) {
957 ++my_factory_counter;
958 if( s ) {
959 my_unrealized_threads = s->GetPolicy().GetPolicyValue( MaxConcurrency );
960 __TBB_ASSERT( my_unrealized_threads>0, NULL );
961 my_scheduler = s;
962 my_concrt_resource_manager = CreateResourceManager(); // reference count==3 when first created.
963 my_scheduler_proxy = my_concrt_resource_manager->RegisterScheduler( s, CONCRT_RM_VERSION_1 );
964 my_scheduler_proxy->RequestInitialVirtualProcessors( false );
965 }
966 }
is_closing()967 bool is_closing() { return shutdown_in_progress; }
968 void unbind( rml::server& server, ::tbb::spin_mutex& mtx );
add_client_ref()969 void add_client_ref() { my_server_ref_count.add_ref(); }
970 void remove_client_ref();
add_server_ref()971 void add_server_ref() {my_server_ref_count.add_ref();}
remove_server_ref()972 int remove_server_ref() {return my_server_ref_count.remove_ref();}
get_server_ref_count()973 int get_server_ref_count() { int k = my_server_ref_count.my_ref_count; return k; }
974 void assist_cleanup( bool assist_null_only );
975 void adjust_balance( int delta );
current_balance() const976 int current_balance() const {int k = the_balance; return k;}
client() const977 ::rml::client& client() const {return my_client;}
register_as_master(server::execution_resource_t & v) const978 void register_as_master( server::execution_resource_t& v ) const { (IExecutionResource*&)v = my_scheduler_proxy ? my_scheduler_proxy->SubscribeCurrentThread() : NULL; }
979 // Remove() should be called from the same thread that subscribed the current h/w thread (i.e., the one that
980 // called register_as_master() ).
unregister(server::execution_resource_t v) const981 void unregister( server::execution_resource_t v ) const {if( v ) ((IExecutionResource*)v)->Remove( my_scheduler );}
982 void add_virtual_processors( IVirtualProcessorRoot** vprocs, unsigned int count, tbb_connection_v2& conn, ::tbb::spin_mutex& mtx );
983 void add_virtual_processors( IVirtualProcessorRoot** vprocs, unsigned int count, omp_connection_v2& conn, ::tbb::spin_mutex& mtx );
984 void remove_virtual_processors( IVirtualProcessorRoot** vproots, unsigned count, ::tbb::spin_mutex& mtx );
985 void mark_virtual_processors_as_lent( IVirtualProcessorRoot** vproots, unsigned count, ::tbb::spin_mutex& mtx );
986 void create_oversubscribers( unsigned n, std::vector<server_thread*>& thr_vec, omp_connection_v2& conn, ::tbb::spin_mutex& mtx );
987 void wakeup_tbb_threads( int c, ::tbb::spin_mutex& mtx );
988 void mark_virtual_processors_as_returned( IVirtualProcessorRoot** vprocs, unsigned int count, tbb::spin_mutex& mtx );
addto_original_exec_resources(IExecutionResource * r,::tbb::spin_mutex & mtx)989 inline void addto_original_exec_resources( IExecutionResource* r, ::tbb::spin_mutex& mtx ) {
990 ::tbb::spin_mutex::scoped_lock lck(mtx);
991 __TBB_ASSERT( !is_closing(), "trying to register master while connection is being shutdown?" );
992 original_exec_resources.push_back( r );
993 }
994 #if !__RML_REMOVE_VIRTUAL_PROCESSORS_DISABLED
995 void allocate_thread_scavenger( IExecutionResource* v );
996 #endif
get_thread_scavenger()997 inline thread_scavenger_thread* get_thread_scavenger() { return my_thread_scavenger_thread; }
998 };
999
1000 garbage_connection_queue connections_to_reclaim;
1001 connection_scavenger_thread connection_scavenger;
1002
1003 #endif /* !RML_USE_WCRM */
1004
1005 //------------------------------------------------------------------------
1006 // generic_connection
1007 //------------------------------------------------------------------------
1008
1009 template<typename Server, typename Client>
1010 struct connection_traits {};
1011
1012 // head of the active tbb connections
1013 static tbb::atomic<uintptr_t> active_tbb_connections;
1014 static tbb::atomic<int> current_tbb_conn_readers;
1015 static size_t current_tbb_conn_reader_epoch;
1016 static tbb::atomic<size_t> close_tbb_connection_event_count;
1017
1018 #if RML_USE_WCRM
1019 template<typename Connection>
1020 void make_job( Connection& c, server_thread& t );
1021 #endif
1022
1023 template<typename Server, typename Client>
1024 class generic_connection: public Server, no_copy {
version() const1025 version_type version() const __TBB_override {return SERVER_VERSION;}
yield()1026 void yield() __TBB_override {thread_monitor::yield();}
independent_thread_number_changed(int delta)1027 void independent_thread_number_changed( int delta ) __TBB_override { my_thread_map.adjust_balance( -delta ); }
default_concurrency() const1028 unsigned default_concurrency() const __TBB_override { return the_default_concurrency; }
1029 friend void wakeup_some_tbb_threads();
1030 friend class connection_scavenger_thread;
1031
1032 protected:
1033 thread_map my_thread_map;
1034 generic_connection* next_conn;
1035 size_t my_ec;
1036 #if RML_USE_WCRM
1037 // FIXME: pad it?
1038 tbb::spin_mutex map_mtx;
1039 IScheduler* my_scheduler;
do_open(IScheduler * s)1040 void do_open( IScheduler* s ) {
1041 my_scheduler = s;
1042 my_thread_map.bind( s );
1043 }
is_closing()1044 bool is_closing() { return my_thread_map.is_closing(); }
1045 void request_close_connection( bool existing );
1046 #else
do_open()1047 void do_open() {my_thread_map.bind();}
1048 void request_close_connection( bool );
1049 #endif /* RML_USE_WCRM */
1050 //! Make destructor virtual
~generic_connection()1051 virtual ~generic_connection() {}
1052 #if !RML_USE_WCRM
generic_connection(wait_counter & fc,Client & c)1053 generic_connection( wait_counter& fc, Client& c ) : my_thread_map(fc,c), next_conn(NULL), my_ec(0) {}
1054 #else
generic_connection(wait_counter & fc,Client & c)1055 generic_connection( wait_counter& fc, Client& c ) :
1056 my_thread_map(fc,c), next_conn(NULL), my_ec(0), map_mtx(), my_scheduler(NULL) {}
1057 void add_virtual_processors( IVirtualProcessorRoot** vprocs, unsigned int count );
1058 void remove_virtual_processors( IVirtualProcessorRoot** vprocs, unsigned int count );
notify_resources_externally_busy(IVirtualProcessorRoot ** vprocs,unsigned int count)1059 void notify_resources_externally_busy( IVirtualProcessorRoot** vprocs, unsigned int count ) { my_thread_map.mark_virtual_processors_as_lent( vprocs, count, map_mtx ); }
notify_resources_externally_idle(IVirtualProcessorRoot ** vprocs,unsigned int count)1060 void notify_resources_externally_idle( IVirtualProcessorRoot** vprocs, unsigned int count ) {
1061 my_thread_map.mark_virtual_processors_as_returned( vprocs, count, map_mtx );
1062 }
1063 #endif /* !RML_USE_WCRM */
1064
1065 public:
1066 typedef Server server_type;
1067 typedef Client client_type;
client() const1068 Client& client() const {return static_cast<Client&>(my_thread_map.client());}
set_scratch_ptr(job & j,void * ptr)1069 void set_scratch_ptr( job& j, void* ptr ) { ::rml::server::scratch_ptr(j) = ptr; }
1070 #if RML_USE_WCRM
1071 template<typename Connection>
1072 friend void make_job( Connection& c, server_thread& t );
add_server_ref()1073 void add_server_ref () {my_thread_map.add_server_ref();}
remove_server_ref()1074 void remove_server_ref() {if( my_thread_map.remove_server_ref()==0 ) delete this;}
add_client_ref()1075 void add_client_ref () {my_thread_map.add_client_ref();}
remove_client_ref()1076 void remove_client_ref() {my_thread_map.remove_client_ref();}
1077 #else /* !RML_USE_WCRM */
add_server_ref()1078 int add_server_ref () {return my_thread_map.add_server_ref();}
remove_server_ref()1079 void remove_server_ref() {if( my_thread_map.remove_server_ref()==0 ) delete this;}
remove_client_ref()1080 void remove_client_ref() {my_thread_map.remove_client_ref();}
1081 void make_job( server_thread& t, job_automaton& ja );
1082 #endif /* RML_USE_WCRM */
get_addr(uintptr_t addr_ex)1083 static generic_connection* get_addr( uintptr_t addr_ex ) {
1084 return reinterpret_cast<generic_connection*>( addr_ex&~(uintptr_t)3 );
1085 }
1086 };
1087
1088 //------------------------------------------------------------------------
1089 // TBB server
1090 //------------------------------------------------------------------------
1091
1092 template<>
1093 struct connection_traits<tbb_server,tbb_client> {
1094 static const bool assist_null_only = true;
1095 static const bool is_tbb = true;
1096 };
1097
1098 //! Represents a server/client binding.
1099 /** The internal representation uses inheritance for the server part and a pointer for the client part. */
1100 class tbb_connection_v2: public generic_connection<tbb_server,tbb_client> {
1101 void adjust_job_count_estimate( int delta ) __TBB_override;
1102 #if !RML_USE_WCRM
1103 #if _WIN32||_WIN64
register_master(rml::server::execution_resource_t &)1104 void register_master ( rml::server::execution_resource_t& /*v*/ ) __TBB_override {}
unregister_master(rml::server::execution_resource_t)1105 void unregister_master ( rml::server::execution_resource_t /*v*/ ) __TBB_override {}
1106 #endif
1107 #else
register_master(rml::server::execution_resource_t & v)1108 void register_master ( rml::server::execution_resource_t& v ) __TBB_override {
1109 my_thread_map.register_as_master(v);
1110 if( v ) ++nesting;
1111 }
unregister_master(rml::server::execution_resource_t v)1112 void unregister_master ( rml::server::execution_resource_t v ) __TBB_override {
1113 if( v ) {
1114 __TBB_ASSERT( nesting>0, NULL );
1115 if( --nesting==0 ) {
1116 #if !__RML_REMOVE_VIRTUAL_PROCESSORS_DISABLED
1117 my_thread_map.allocate_thread_scavenger( (IExecutionResource*)v );
1118 #endif
1119 }
1120 }
1121 my_thread_map.unregister(v);
1122 }
create_scheduler()1123 IScheduler* create_scheduler() {return( scheduler<tbb_connection_v2>::create( *this ) );}
1124 friend void free_all_connections( uintptr_t );
1125 friend class scheduler<tbb_connection_v2>;
1126 friend class execution_context;
1127 friend class connection_scavenger_thread;
1128 #endif /* RML_USE_WCRM */
1129 friend void wakeup_some_tbb_threads();
1130 //! Estimate on number of jobs without threads working on them.
1131 tbb::atomic<int> my_slack;
1132 friend class dummy_class_to_shut_up_gratuitous_warning_from_gcc_3_2_3;
1133 #if TBB_USE_ASSERT
1134 tbb::atomic<int> my_job_count_estimate;
1135 #endif /* TBB_USE_ASSERT */
1136
1137 tbb::atomic<int> n_adjust_job_count_requests;
1138 #if RML_USE_WCRM
1139 tbb::atomic<int> nesting;
1140 #endif
1141
1142 // dtor
1143 ~tbb_connection_v2();
1144
1145 public:
1146 #if RML_USE_WCRM
1147 typedef tbb_server_thread server_thread_type;
1148 #endif
1149 //! True if there is slack that try_process can use.
has_slack() const1150 bool has_slack() const {return my_slack>0;}
1151
1152 #if RML_USE_WCRM
try_process(job & job)1153 bool try_process( job& job )
1154 #else
1155 bool try_process( server_thread& t, job& job )
1156 #endif
1157 {
1158 bool visited = false;
1159 // No check for my_slack>0 here because caller is expected to do that check.
1160 int k = --my_slack;
1161 if( k>=0 ) {
1162 #if !RML_USE_WCRM
1163 t.my_extra_state = ts_visited; // remember the thread paid a trip to process() at least once
1164 #endif
1165 client().process(job);
1166 visited = true;
1167 }
1168 ++my_slack;
1169 return visited;
1170 }
1171
tbb_connection_v2(wait_counter & fc,tbb_client & client)1172 tbb_connection_v2( wait_counter& fc, tbb_client& client ) : generic_connection<tbb_server,tbb_client>(fc,client)
1173 {
1174 my_slack = 0;
1175 #if RML_USE_WCRM
1176 nesting = 0;
1177 #endif
1178 #if TBB_USE_ASSERT
1179 my_job_count_estimate = 0;
1180 #endif /* TBB_USE_ASSERT */
1181 __TBB_ASSERT( !my_slack, NULL );
1182
1183 #if RML_USE_WCRM
1184 do_open( client.max_job_count()>0 ? create_scheduler() : NULL );
1185 #else
1186 do_open();
1187 #endif /* !RML_USE_WCRM */
1188 n_adjust_job_count_requests = 0;
1189
1190 // Acquire head of active_tbb_connections & push the connection into the list
1191 uintptr_t conn;
1192 do {
1193 for( ; (conn=active_tbb_connections)&1; )
1194 __TBB_Yield();
1195 } while( active_tbb_connections.compare_and_swap( conn|1, conn )!=conn );
1196
1197 this->next_conn = generic_connection<tbb_server,tbb_client>::get_addr(conn);
1198 // Update and release head of active_tbb_connections
1199 active_tbb_connections = (uintptr_t) this; // set and release
1200 }
wakeup_tbb_threads(unsigned n)1201 inline void wakeup_tbb_threads( unsigned n ) {
1202 my_thread_map.wakeup_tbb_threads( n
1203 #if RML_USE_WCRM
1204 , map_mtx
1205 #endif
1206 );
1207 }
1208 #if RML_USE_WCRM
get_nesting_level()1209 inline int get_nesting_level() { return nesting; }
1210 #else
wakeup_next_thread(thread_map::iterator i)1211 inline bool wakeup_next_thread( thread_map::iterator i ) {return my_thread_map.wakeup_next_thread( i, *this );}
get_unrealized_threads()1212 inline thread_map::size_type get_unrealized_threads () {return my_thread_map.get_unrealized_threads();}
1213 #endif /* !RML_USE_WCRM */
1214 };
1215
1216 //------------------------------------------------------------------------
1217 // OpenMP server
1218 //------------------------------------------------------------------------
1219
1220 template<>
1221 struct connection_traits<omp_server,omp_client> {
1222 static const bool assist_null_only = false;
1223 static const bool is_tbb = false;
1224 };
1225
1226 class omp_connection_v2: public generic_connection<omp_server,omp_client> {
1227 #if !RML_USE_WCRM
current_balance() const1228 int current_balance() const __TBB_override {return the_balance;}
1229 #else
1230 friend void free_all_connections( uintptr_t );
1231 friend class scheduler<omp_connection_v2>;
1232 int current_balance() const __TBB_override {return my_thread_map.current_balance();}
1233 #endif /* !RML_USE_WCRM */
1234 int try_increase_load( size_type n, bool strict ) __TBB_override;
1235 void decrease_load( size_type n ) __TBB_override;
1236 void get_threads( size_type request_size, void* cookie, job* array[] ) __TBB_override;
1237 #if !RML_USE_WCRM
1238 #if _WIN32||_WIN64
register_master(rml::server::execution_resource_t &)1239 void register_master ( rml::server::execution_resource_t& /*v*/ ) __TBB_override {}
unregister_master(rml::server::execution_resource_t)1240 void unregister_master ( rml::server::execution_resource_t /*v*/ ) __TBB_override {}
1241 #endif
1242 #else
register_master(rml::server::execution_resource_t & v)1243 void register_master ( rml::server::execution_resource_t& v ) __TBB_override {
1244 my_thread_map.register_as_master( v );
1245 my_thread_map.addto_original_exec_resources( (IExecutionResource*)v, map_mtx );
1246 }
unregister_master(rml::server::execution_resource_t v)1247 void unregister_master ( rml::server::execution_resource_t v ) __TBB_override { my_thread_map.unregister(v); }
1248 #endif /* !RML_USE_WCRM */
1249 #if _WIN32||_WIN64
1250 void deactivate( rml::job* j ) __TBB_override;
1251 void reactivate( rml::job* j ) __TBB_override;
1252 #endif /* _WIN32||_WIN64 */
1253 #if RML_USE_WCRM
1254 public:
1255 typedef omp_server_thread server_thread_type;
1256 private:
create_scheduler()1257 IScheduler* create_scheduler() {return( scheduler<omp_connection_v2>::create( *this ) );}
1258 #endif /* RML_USE_WCRM */
1259 public:
1260 #if TBB_USE_ASSERT
1261 //! Net change in delta caused by this connection.
1262 /** Should be zero when connection is broken */
1263 tbb::atomic<int> net_delta;
1264 #endif /* TBB_USE_ASSERT */
1265
omp_connection_v2(wait_counter & fc,omp_client & client)1266 omp_connection_v2( wait_counter& fc, omp_client& client ) : generic_connection<omp_server,omp_client>(fc,client) {
1267 #if TBB_USE_ASSERT
1268 net_delta = 0;
1269 #endif /* TBB_USE_ASSERT */
1270 #if RML_USE_WCRM
1271 do_open( create_scheduler() );
1272 #else
1273 do_open();
1274 #endif /* RML_USE_WCRM */
1275 }
~omp_connection_v2()1276 ~omp_connection_v2() {__TBB_ASSERT( net_delta==0, "net increase/decrease of load is nonzero" );}
1277 };
1278
1279 #if !RML_USE_WCRM
1280 /* to deal with cases where the machine is oversubscribed; we want each thread to trip to try_process() at least once */
1281 /* this should not involve computing the_balance */
wakeup_next_thread(thread_map::iterator this_thr,tbb_connection_v2 & conn)1282 bool thread_map::wakeup_next_thread( thread_map::iterator this_thr, tbb_connection_v2& conn ) {
1283 if( all_visited_at_least_once )
1284 return false;
1285
1286 iterator e = end();
1287 retry:
1288 bool exist = false;
1289 iterator k=this_thr;
1290 for( ++k; k!=e; ++k ) {
1291 // If another thread added *k, there is a tiny timing window where thread() is invalid.
1292 server_thread& t = k->wait_for_thread();
1293 if( t.my_extra_state!=ts_visited )
1294 exist = true;
1295 if( t.read_state()!=ts_tbb_busy && t.my_extra_state==ts_started )
1296 if( t.try_grab_for( ts_tbb_busy ) )
1297 return true;
1298 }
1299 for( k=begin(); k!=this_thr; ++k ) {
1300 server_thread& t = k->wait_for_thread();
1301 if( t.my_extra_state!=ts_visited )
1302 exist = true;
1303 if( t.read_state()!=ts_tbb_busy && t.my_extra_state==ts_started )
1304 if( t.try_grab_for( ts_tbb_busy ) )
1305 return true;
1306 }
1307
1308 if( exist )
1309 if( conn.has_slack() )
1310 goto retry;
1311 else
1312 all_visited_at_least_once = true;
1313 return false;
1314 }
1315
release_tbb_threads(server_thread * t)1316 void thread_map::release_tbb_threads( server_thread* t ) {
1317 for( ; t; t = t->link ) {
1318 while( t->read_state()!=ts_asleep )
1319 __TBB_Yield();
1320 t->my_extra_state = ts_started;
1321 }
1322 }
1323 #endif /* !RML_USE_WCRM */
1324
adjust_balance(int delta)1325 void thread_map::adjust_balance( int delta ) {
1326 int new_balance = the_balance += delta;
1327 if( new_balance>0 && 0>=new_balance-delta /*== old the_balance*/ )
1328 wakeup_some_tbb_threads();
1329 }
1330
remove_client_ref()1331 void thread_map::remove_client_ref() {
1332 int k = my_client_ref_count.remove_ref();
1333 if( k==0 ) {
1334 // Notify factory that thread has crossed back into RML.
1335 --my_factory_counter;
1336 // Notify client that RML is done with the client object.
1337 my_client.acknowledge_close_connection();
1338 }
1339 }
1340
1341 #if RML_USE_WCRM
1342 /** Not a member of generic_connection because we need Connection to be the derived class. */
1343 template<typename Connection>
make_job(Connection & c,typename Connection::server_thread_type & t)1344 void make_job( Connection& c, typename Connection::server_thread_type& t ) {
1345 if( t.my_job_automaton.try_acquire() ) {
1346 rml::job* j = t.my_client.create_one_job();
1347 __TBB_ASSERT( j!=NULL, "client:::create_one_job returned NULL" );
1348 __TBB_ASSERT( (intptr_t(j)&1)==0, "client::create_one_job returned misaligned job" );
1349 t.my_job_automaton.set_and_release( j );
1350 c.set_scratch_ptr( *j, (void*) &t );
1351 }
1352 }
1353 #endif /* RML_USE_WCRM */
1354
1355 #if _MSC_VER && !defined(__INTEL_COMPILER)
1356 // Suppress "conditional expression is constant" warning.
1357 #pragma warning( push )
1358 #pragma warning( disable: 4127 )
1359 #endif
1360 #if RML_USE_WCRM
1361 template<typename Server, typename Client>
request_close_connection(bool exiting)1362 void generic_connection<Server,Client>::request_close_connection( bool exiting ) {
1363 // for TBB connections, exiting should always be false
1364 if( connection_traits<Server,Client>::is_tbb )
1365 __TBB_ASSERT( !exiting, NULL);
1366 #if TBB_USE_ASSERT
1367 else if( exiting )
1368 reinterpret_cast<omp_connection_v2*>(this)->net_delta = 0;
1369 #endif
1370 if( exiting ) {
1371 uintptr_t tail = connections_to_reclaim.tail;
1372 while( connections_to_reclaim.tail.compare_and_swap( garbage_connection_queue::plugged, tail )!=tail )
1373 __TBB_Yield();
1374 my_thread_map.unbind( *this, map_mtx );
1375 my_thread_map.assist_cleanup( connection_traits<Server,Client>::assist_null_only );
1376 // It is assumed that the client waits for all other threads to terminate before
1377 // calling request_close_connection with true. Thus, it is safe to return all
1378 // outstanding connection objects that are reachable. It is possible that there may
1379 // be some unreachable connection objects lying somewhere.
1380 free_all_connections( connection_scavenger.grab_and_prepend( this ) );
1381 return;
1382 }
1383 #else /* !RML_USE_WCRM */
1384 template<typename Server, typename Client>
1385 void generic_connection<Server,Client>::request_close_connection( bool ) {
1386 #endif /* RML_USE_WCRM */
1387 if( connection_traits<Server,Client>::is_tbb ) {
1388 // acquire the head of active tbb connections
1389 uintptr_t conn;
1390 do {
1391 for( ; (conn=active_tbb_connections)&1; )
1392 __TBB_Yield();
1393 } while( active_tbb_connections.compare_and_swap( conn|1, conn )!=conn );
1394
1395 // Locate the current connection
1396 generic_connection* pred_conn = NULL;
1397 generic_connection* curr_conn = (generic_connection*) conn;
1398 for( ; curr_conn && curr_conn!=this; curr_conn=curr_conn->next_conn )
1399 pred_conn = curr_conn;
1400 __TBB_ASSERT( curr_conn==this, "the current connection is not in the list?" );
1401
1402 // Remove this from the list
1403 if( pred_conn ) {
1404 pred_conn->next_conn = curr_conn->next_conn;
1405 active_tbb_connections = reinterpret_cast<uintptr_t>(generic_connection<tbb_server,tbb_client>::get_addr(active_tbb_connections)); // release it
1406 } else
1407 active_tbb_connections = (uintptr_t) curr_conn->next_conn; // update & release it
1408 curr_conn->next_conn = NULL;
1409 // Increment the tbb connection close event count
1410 my_ec = ++close_tbb_connection_event_count;
1411 // Wait happens in tbb_connection_v2::~tbb_connection_v2()
1412 }
1413 #if RML_USE_WCRM
1414 my_thread_map.unbind( *this, map_mtx );
1415 my_thread_map.assist_cleanup( connection_traits<Server,Client>::assist_null_only );
1416 connection_scavenger.add_request( this );
1417 #else
1418 my_thread_map.unbind();
1419 my_thread_map.assist_cleanup( connection_traits<Server,Client>::assist_null_only );
1420 // Remove extra reference
1421 remove_server_ref();
1422 #endif
1423 }
1424 #if _MSC_VER && !defined(__INTEL_COMPILER)
1425 #pragma warning( pop )
1426 #endif
1427
1428 #if RML_USE_WCRM
1429
1430 template<typename Server, typename Client>
1431 void generic_connection<Server,Client>::add_virtual_processors( IVirtualProcessorRoot** vproots, unsigned int count )
1432 {}
1433
1434 template<>
1435 void generic_connection<tbb_server,tbb_client>::add_virtual_processors( IVirtualProcessorRoot** vproots, unsigned int count )
1436 {
1437 my_thread_map.add_virtual_processors( vproots, count, (tbb_connection_v2&)*this, map_mtx );
1438 }
1439 template<>
1440 void generic_connection<omp_server,omp_client>::add_virtual_processors( IVirtualProcessorRoot** vproots, unsigned int count )
1441 {
1442 // For OMP, since it uses ScheudlerPolicy of MinThreads==MaxThreads, this is called once when
1443 // RequestInitialVirtualProcessors() is called.
1444 my_thread_map.add_virtual_processors( vproots, count, (omp_connection_v2&)*this, map_mtx );
1445 }
1446
1447 template<typename Server, typename Client>
1448 void generic_connection<Server,Client>::remove_virtual_processors( IVirtualProcessorRoot** vproots, unsigned int count )
1449 {
1450 __TBB_ASSERT( false, "should not be called" );
1451 }
1452 /* For OMP, RemoveVirtualProcessors() will never be called. */
1453
1454 template<>
1455 void generic_connection<tbb_server,tbb_client>::remove_virtual_processors( IVirtualProcessorRoot** vproots, unsigned int count )
1456 {
1457 my_thread_map.remove_virtual_processors( vproots, count, map_mtx );
1458 }
1459
1460 void tbb_connection_v2::adjust_job_count_estimate( int delta ) {
1461 #if TBB_USE_ASSERT
1462 my_job_count_estimate += delta;
1463 #endif /* TBB_USE_ASSERT */
1464 // Atomically update slack.
1465 int c = my_slack+=delta;
1466 if( c>0 ) {
1467 ++n_adjust_job_count_requests;
1468 my_thread_map.wakeup_tbb_threads( c, map_mtx );
1469 --n_adjust_job_count_requests;
1470 }
1471 }
1472 #endif /* RML_USE_WCRM */
1473
1474 tbb_connection_v2::~tbb_connection_v2() {
1475 #if TBB_USE_ASSERT
1476 if( my_job_count_estimate!=0 ) {
1477 fprintf(stderr, "TBB client tried to disconnect with non-zero net job count estimate of %d\n", int(my_job_count_estimate ));
1478 abort();
1479 }
1480 __TBB_ASSERT( !my_slack, "attempt to destroy tbb_server with nonzero slack" );
1481 __TBB_ASSERT( this!=static_cast<tbb_connection_v2*>(generic_connection<tbb_server,tbb_client >::get_addr(active_tbb_connections)), "request_close_connection() must be called" );
1482 #endif /* TBB_USE_ASSERT */
1483 #if !RML_USE_WCRM
1484 // If there are other threads ready for work, give them coins
1485 if( the_balance>0 )
1486 wakeup_some_tbb_threads();
1487 #endif
1488 // Someone might be accessing my data members
1489 while( current_tbb_conn_readers>0 && (ptrdiff_t)(my_ec-current_tbb_conn_reader_epoch)>0 )
1490 __TBB_Yield();
1491 }
1492
1493 #if !RML_USE_WCRM
1494 template<typename Server, typename Client>
1495 void generic_connection<Server,Client>::make_job( server_thread& t, job_automaton& ja ) {
1496 if( ja.try_acquire() ) {
1497 rml::job* j = client().create_one_job();
1498 __TBB_ASSERT( j!=NULL, "client:::create_one_job returned NULL" );
1499 __TBB_ASSERT( (intptr_t(j)&1)==0, "client::create_one_job returned misaligned job" );
1500 ja.set_and_release( j );
1501 __TBB_ASSERT( t.my_conn && t.my_ja && t.my_job==NULL, NULL );
1502 t.my_job = j;
1503 set_scratch_ptr( *j, (void*) &t );
1504 }
1505 }
1506
1507 void tbb_connection_v2::adjust_job_count_estimate( int delta ) {
1508 #if TBB_USE_ASSERT
1509 my_job_count_estimate += delta;
1510 #endif /* TBB_USE_ASSERT */
1511 // Atomically update slack.
1512 int c = my_slack+=delta;
1513 if( c>0 ) {
1514 ++n_adjust_job_count_requests;
1515 // The client has work to do and there are threads available
1516 thread_map::size_type n = my_thread_map.wakeup_tbb_threads(c);
1517
1518 server_thread* new_threads_anchor = NULL;
1519 thread_map::size_type i;
1520 {
1521 tbb::internal::affinity_helper fpa;
1522 for( i=0; i<n; ++i ) {
1523 // Obtain unrealized threads
1524 thread_map::value_type* k = my_thread_map.add_one_thread( false );
1525 if( !k )
1526 // No unrealized threads left.
1527 break;
1528 // Eagerly start the thread off.
1529 fpa.protect_affinity_mask( /*restore_process_mask=*/true );
1530 my_thread_map.bind_one_thread( *this, *k );
1531 server_thread& t = k->thread();
1532 __TBB_ASSERT( !t.link, NULL );
1533 t.link = new_threads_anchor;
1534 new_threads_anchor = &t;
1535 }
1536 // Implicit destruction of fpa resets original affinity mask.
1537 }
1538
1539 thread_map::size_type j=0;
1540 for( ; the_balance>0 && j<i; ++j ) {
1541 if( --the_balance>=0 ) {
1542 // Withdraw a coin from the bank
1543 __TBB_ASSERT( new_threads_anchor, NULL );
1544
1545 server_thread* t = new_threads_anchor;
1546 new_threads_anchor = t->link;
1547 while( !t->try_grab_for( ts_tbb_busy ) )
1548 __TBB_Yield();
1549 t->my_extra_state = ts_started;
1550 } else {
1551 // Overdraft. return it to the bank
1552 ++the_balance;
1553 break;
1554 }
1555 }
1556 __TBB_ASSERT( i-j!=0||new_threads_anchor==NULL, NULL );
1557 // Mark the ones that did not get started as eligible for being snatched.
1558 if( new_threads_anchor )
1559 my_thread_map.release_tbb_threads( new_threads_anchor );
1560
1561 --n_adjust_job_count_requests;
1562 }
1563 }
1564 #endif /* RML_USE_WCRM */
1565
1566 #if RML_USE_WCRM
1567 int omp_connection_v2::try_increase_load( size_type n, bool strict ) {
1568 __TBB_ASSERT(int(n)>=0,NULL);
1569 if( strict ) {
1570 the_balance -= int(n);
1571 } else {
1572 int avail, old;
1573 do {
1574 avail = the_balance;
1575 if( avail<=0 ) {
1576 // No atomic read-write-modify operation necessary.
1577 return avail;
1578 }
1579 // Don't read the_system_balance; if it changes, compare_and_swap will fail anyway.
1580 old = the_balance.compare_and_swap( int(n)<avail ? avail-n : 0, avail );
1581 } while( old!=avail );
1582 if( int(n)>avail )
1583 n=avail;
1584 }
1585 #if TBB_USE_ASSERT
1586 net_delta += n;
1587 #endif /* TBB_USE_ASSERT */
1588 return n;
1589 }
1590
1591 void omp_connection_v2::decrease_load( size_type /*n*/ ) {}
1592
1593 void omp_connection_v2::get_threads( size_type request_size, void* cookie, job* array[] ) {
1594 unsigned index = 0;
1595 std::vector<omp_server_thread*> enlisted(request_size);
1596 std::vector<thread_grab_t> to_activate(request_size);
1597
1598 if( request_size==0 ) return;
1599
1600 {
1601 tbb::spin_mutex::scoped_lock lock(map_mtx);
1602
1603 __TBB_ASSERT( !is_closing(), "try to get threads while connection is being shutdown?" );
1604
1605 for( int scan=0; scan<2; ++scan ) {
1606 for( thread_map::iterator i=my_thread_map.begin(); i!=my_thread_map.end(); ++i ) {
1607 omp_server_thread* thr = (omp_server_thread*) (*i).second;
1608 // in the first scan, skip VPs that are lent
1609 if( scan==0 && thr->is_lent() ) continue;
1610 thread_grab_t res = thr->try_grab_for();
1611 if( res!=wk_failed ) {// && if is not busy by some other scheduler
1612 to_activate[index] = res;
1613 enlisted[index] = thr;
1614 if( ++index==request_size )
1615 goto activate_threads;
1616 }
1617 }
1618 }
1619 }
1620
1621 activate_threads:
1622
1623 for( unsigned i=0; i<index; ++i ) {
1624 omp_server_thread* thr = enlisted[i];
1625 if( to_activate[i]==wk_from_asleep )
1626 thr->get_virtual_processor()->Activate( thr );
1627 job* j = thr->wait_for_job();
1628 array[i] = j;
1629 thr->omp_data.produce( client(), j, cookie, i PRODUCE_ARG(*this) );
1630 }
1631
1632 if( index==request_size )
1633 return;
1634
1635 // If we come to this point, it must be because dynamic==false
1636 // Create Oversubscribers..
1637
1638 // Note that our policy is such that MinConcurrency==MaxConcurrency.
1639 // RM will deliver MaxConcurrency of VirtualProcessors and no more.
1640 __TBB_ASSERT( request_size>index, NULL );
1641 unsigned n = request_size - index;
1642 std::vector<server_thread*> thr_vec(n);
1643 typedef std::vector<server_thread*>::iterator iterator_thr;
1644 my_thread_map.create_oversubscribers( n, thr_vec, *this, map_mtx );
1645 for( iterator_thr ti=thr_vec.begin(); ti!=thr_vec.end(); ++ti ) {
1646 omp_server_thread* thr = (omp_server_thread*) *ti;
1647 __TBB_ASSERT( thr, "thread not created?" );
1648 // Thread is already grabbed; since it is newly created, we need to activate it.
1649 thr->get_virtual_processor()->Activate( thr );
1650 job* j = thr->wait_for_job();
1651 array[index] = j;
1652 thr->omp_data.produce( client(), j, cookie, index PRODUCE_ARG(*this) );
1653 ++index;
1654 }
1655 }
1656
1657 #if _WIN32||_WIN64
1658 void omp_connection_v2::deactivate( rml::job* j )
1659 {
1660 my_thread_map.adjust_balance(1);
1661 #if TBB_USE_ASSERT
1662 net_delta -= 1;
1663 #endif
1664 omp_server_thread* thr = (omp_server_thread*) scratch_ptr( *j );
1665 (thr->get_virtual_processor())->Deactivate( thr );
1666 }
1667
1668 void omp_connection_v2::reactivate( rml::job* j )
1669 {
1670 // Should not adjust the_balance because OMP client is supposed to
1671 // do try_increase_load() to reserve the threads to use.
1672 omp_server_thread* thr = (omp_server_thread*) scratch_ptr( *j );
1673 (thr->get_virtual_processor())->Activate( thr );
1674 }
1675 #endif /* !_WIN32||_WIN64 */
1676
1677 #endif /* RML_USE_WCRM */
1678
1679 //! Wake up some available tbb threads
1680 void wakeup_some_tbb_threads()
1681 {
1682 /* First, atomically grab the connection, then increase the server ref count to keep
1683 it from being released prematurely. Second, check if the balance is available for TBB
1684 and the tbb conneciton has slack to exploit. If the answer is true, go ahead and
1685 try to wake some up. */
1686 if( generic_connection<tbb_server,tbb_client >::get_addr(active_tbb_connections)==0 )
1687 // the next connection will see the change; return.
1688 return;
1689
1690 start_it_over:
1691 int n_curr_readers = ++current_tbb_conn_readers;
1692 if( n_curr_readers>1 ) // I lost
1693 return;
1694 // if n_curr_readers==1, i am the first one, so I will take responsibility for waking tbb threads up.
1695
1696 // update the current epoch
1697 current_tbb_conn_reader_epoch = close_tbb_connection_event_count;
1698
1699 // read and clear
1700 // Newly added connection will not invalidate the pointer, and it will
1701 // compete with the current one to claim coins.
1702 // One that is about to close the connection increments the event count
1703 // after it removes the connection from the list. But it will keep around
1704 // the connection until all readers including this one catch up. So, reading
1705 // the head and clearing the lock bit should be o.k.
1706 generic_connection<tbb_server,tbb_client>* next_conn_wake_up = generic_connection<tbb_server,tbb_client>::get_addr( active_tbb_connections );
1707
1708 for( ; next_conn_wake_up; ) {
1709 /* some threads are creating tbb server threads; they may not see my changes made to the_balance */
1710 /* When a thread is in adjust_job_count_estimate() to increase the slack
1711 RML tries to activate worker threads on behalf of the requesting thread
1712 by repeatedly drawing a coin from the bank optimistically and grabbing a
1713 thread. If it finds the bank overdrafted, it returns the coin back to
1714 the bank and returns the control to the thread (return from the method).
1715 There lies a tiny timing hole.
1716
1717 When the overdraft occurs (note that multiple masters may be in
1718 adjust_job_count_estimate() so the_balance can be any negative value) and
1719 a worker returns from the TBB work at that moment, its returning the coin
1720 does not bump up the_balance over 0, so it happily returns from
1721 wakeup_some_tbb_threads() without attempting to give coins to worker threads
1722 that are ready.
1723 */
1724 while( ((tbb_connection_v2*)next_conn_wake_up)->n_adjust_job_count_requests>0 )
1725 __TBB_Yield();
1726
1727 int bal = the_balance;
1728 n_curr_readers = current_tbb_conn_readers; // get the snapshot
1729 if( bal<=0 ) break;
1730 // if the connection is deleted, the following will immediately return because its slack would be 0 or less.
1731
1732 tbb_connection_v2* tbb_conn = (tbb_connection_v2*)next_conn_wake_up;
1733 int my_slack = tbb_conn->my_slack;
1734 if( my_slack>0 ) tbb_conn->wakeup_tbb_threads( my_slack );
1735 next_conn_wake_up = next_conn_wake_up->next_conn;
1736 }
1737
1738 int delta = current_tbb_conn_readers -= n_curr_readers;
1739 //if delta>0, more threads entered the routine since this one took the snapshot
1740 if( delta>0 ) {
1741 current_tbb_conn_readers = 0;
1742 if( the_balance>0 && generic_connection<tbb_server,tbb_client >::get_addr(active_tbb_connections)!=0 )
1743 goto start_it_over;
1744 }
1745
1746 // Signal any connection that is waiting for me to complete my access that I am done.
1747 current_tbb_conn_reader_epoch = close_tbb_connection_event_count;
1748 }
1749
1750 #if !RML_USE_WCRM
1751 int omp_connection_v2::try_increase_load( size_type n, bool strict ) {
1752 __TBB_ASSERT(int(n)>=0,NULL);
1753 if( strict ) {
1754 the_balance -= int(n);
1755 } else {
1756 int avail, old;
1757 do {
1758 avail = the_balance;
1759 if( avail<=0 ) {
1760 // No atomic read-write-modify operation necessary.
1761 return avail;
1762 }
1763 // don't read the_balance; if it changes, compare_and_swap will fail anyway.
1764 old = the_balance.compare_and_swap( int(n)<avail ? avail-n : 0, avail );
1765 } while( old!=avail );
1766 if( int(n)>avail )
1767 n=avail;
1768 }
1769 #if TBB_USE_ASSERT
1770 net_delta += n;
1771 #endif /* TBB_USE_ASSERT */
1772 return n;
1773 }
1774
1775 void omp_connection_v2::decrease_load( size_type n ) {
1776 __TBB_ASSERT(int(n)>=0,NULL);
1777 my_thread_map.adjust_balance(int(n));
1778 #if TBB_USE_ASSERT
1779 net_delta -= n;
1780 #endif /* TBB_USE_ASSERT */
1781 }
1782
1783 void omp_connection_v2::get_threads( size_type request_size, void* cookie, job* array[] ) {
1784
1785 if( !request_size )
1786 return;
1787
1788 unsigned index = 0;
1789 for(;;) { // don't return until all request_size threads are grabbed.
1790 // Need to grab some threads
1791 thread_map::iterator k_end=my_thread_map.end();
1792 for( thread_map::iterator k=my_thread_map.begin(); k!=k_end; ++k ) {
1793 // If another thread added *k, there is a tiny timing window where thread() is invalid.
1794 server_thread& t = k->wait_for_thread();
1795 if( t.try_grab_for( ts_omp_busy ) ) {
1796 // The preincrement instead of post-increment of index is deliberate.
1797 job* j = k->wait_for_job();
1798 array[index] = j;
1799 t.omp_dispatch.produce( client(), j, cookie, index PRODUCE_ARG(*this) );
1800 if( ++index==request_size )
1801 return;
1802 }
1803 }
1804 // Need to allocate more threads
1805 for( unsigned i=index; i<request_size; ++i ) {
1806 __TBB_ASSERT( index<request_size, NULL );
1807 thread_map::value_type* k = my_thread_map.add_one_thread( true );
1808 #if TBB_USE_ASSERT
1809 if( !k ) {
1810 // Client erred
1811 __TBB_ASSERT(false, "server::get_threads: exceeded job_count\n");
1812 }
1813 #endif
1814 my_thread_map.bind_one_thread( *this, *k );
1815 server_thread& t = k->thread();
1816 if( t.try_grab_for( ts_omp_busy ) ) {
1817 job* j = k->wait_for_job();
1818 array[index] = j;
1819 // The preincrement instead of post-increment of index is deliberate.
1820 t.omp_dispatch.produce( client(), j, cookie, index PRODUCE_ARG(*this) );
1821 if( ++index==request_size )
1822 return;
1823 } // else someone else snatched it.
1824 }
1825 }
1826 }
1827 #endif /* !RML_USE_WCRM */
1828
1829 //------------------------------------------------------------------------
1830 // Methods of omp_dispatch_type
1831 //------------------------------------------------------------------------
1832 void omp_dispatch_type::consume() {
1833 // Wait for short window between when master sets state of this thread to ts_omp_busy
1834 // and master thread calls produce.
1835 job_type* j;
1836 tbb::internal::atomic_backoff backoff;
1837 while( (j = job)==NULL ) backoff.pause();
1838 job = static_cast<job_type*>(NULL);
1839 client->process(*j,cookie,index);
1840 #if TBB_USE_ASSERT
1841 // Return of method process implies "decrease_load" from client's viewpoint, even though
1842 // the actual adjustment of the_balance only happens when this thread really goes to sleep.
1843 --server->net_delta;
1844 #endif /* TBB_USE_ASSERT */
1845 }
1846
1847 #if !RML_USE_WCRM
1848 #if _WIN32||_WIN64
1849 void omp_connection_v2::deactivate( rml::job* j )
1850 {
1851 #if TBB_USE_ASSERT
1852 net_delta -= 1;
1853 #endif
1854 __TBB_ASSERT( j, NULL );
1855 server_thread* thr = (server_thread*) scratch_ptr( *j );
1856 thr->deactivate();
1857 }
1858
1859 void omp_connection_v2::reactivate( rml::job* j )
1860 {
1861 // Should not adjust the_balance because OMP client is supposed to
1862 // do try_increase_load() to reserve the threads to use.
1863 __TBB_ASSERT( j, NULL );
1864 server_thread* thr = (server_thread*) scratch_ptr( *j );
1865 thr->reactivate();
1866 }
1867 #endif /* _WIN32||_WIN64 */
1868
1869 //------------------------------------------------------------------------
1870 // Methods of server_thread
1871 //------------------------------------------------------------------------
1872
1873 server_thread::server_thread() :
1874 ref_count(0),
1875 link(NULL),
1876 my_map_pos(),
1877 my_conn(NULL), my_job(NULL), my_ja(NULL)
1878 {
1879 state = ts_idle;
1880 terminate = false;
1881 #if TBB_USE_ASSERT
1882 has_active_thread = false;
1883 #endif /* TBB_USE_ASSERT */
1884 }
1885
1886 server_thread::~server_thread() {
1887 __TBB_ASSERT( !has_active_thread, NULL );
1888 }
1889
1890 #if _MSC_VER && !defined(__INTEL_COMPILER)
1891 // Suppress overzealous compiler warnings about an initialized variable 'sink_for_alloca' not referenced
1892 #pragma warning(push)
1893 #pragma warning(disable:4189)
1894 #endif
1895 __RML_DECL_THREAD_ROUTINE server_thread::thread_routine( void* arg ) {
1896 server_thread* self = static_cast<server_thread*>(arg);
1897 AVOID_64K_ALIASING( self->my_index );
1898 #if TBB_USE_ASSERT
1899 __TBB_ASSERT( !self->has_active_thread, NULL );
1900 self->has_active_thread = true;
1901 #endif /* TBB_USE_ASSERT */
1902 self->loop();
1903 return 0;
1904 }
1905 #if _MSC_VER && !defined(__INTEL_COMPILER)
1906 #pragma warning(pop)
1907 #endif
1908
1909 void server_thread::launch( size_t stack_size ) {
1910 #if USE_WINTHREAD
1911 thread_monitor::launch( thread_routine, this, stack_size, &this->my_index );
1912 #else
1913 thread_monitor::launch( thread_routine, this, stack_size );
1914 #endif /* USE_PTHREAD */
1915 }
1916
1917 void server_thread::sleep_perhaps( thread_state_t asleep ) {
1918 if( terminate ) return;
1919 __TBB_ASSERT( asleep==ts_asleep, NULL );
1920 thread_monitor::cookie c;
1921 monitor.prepare_wait(c);
1922 if( state.compare_and_swap( asleep, ts_idle )==ts_idle ) {
1923 if( !terminate ) {
1924 monitor.commit_wait(c);
1925 // Someone else woke me up. The compare_and_swap further below deals with spurious wakeups.
1926 } else {
1927 monitor.cancel_wait();
1928 }
1929 thread_state_t s = read_state();
1930 if( s==ts_asleep ) {
1931 state.compare_and_swap( ts_idle, ts_asleep );
1932 // I woke myself up, either because I cancelled the wait or suffered a spurious wakeup.
1933 } else {
1934 // Someone else woke me up; there the_balance is decremented by 1. -- tbb only
1935 if( !is_omp_thread ) {
1936 __TBB_ASSERT( s==ts_tbb_busy||s==ts_idle, NULL );
1937 }
1938 }
1939 } else {
1940 // someone else made it busy ; see try_grab_for when state==ts_idle.
1941 __TBB_ASSERT( state==ts_omp_busy||state==ts_tbb_busy, NULL );
1942 monitor.cancel_wait();
1943 }
1944 __TBB_ASSERT( read_state()!=asleep, "a thread can only put itself to sleep" );
1945 }
1946
1947 bool server_thread::wakeup( thread_state_t to, thread_state_t from ) {
1948 bool success = false;
1949 __TBB_ASSERT( from==ts_asleep && (to==ts_idle||to==ts_omp_busy||to==ts_tbb_busy), NULL );
1950 if( state.compare_and_swap( to, from )==from ) {
1951 if( !is_omp_thread ) __TBB_ASSERT( to==ts_idle||to==ts_tbb_busy, NULL );
1952 // There is a small timing window that permits balance to become negative,
1953 // but such occurrences are probably rare enough to not worry about, since
1954 // at worst the result is slight temporary oversubscription.
1955 monitor.notify();
1956 success = true;
1957 }
1958 return success;
1959 }
1960
1961 //! Attempt to change a thread's state to ts_omp_busy, and waking it up if necessary.
1962 bool server_thread::try_grab_for( thread_state_t target_state ) {
1963 bool success = false;
1964 switch( read_state() ) {
1965 case ts_asleep:
1966 success = wakeup( target_state, ts_asleep );
1967 break;
1968 case ts_idle:
1969 success = state.compare_and_swap( target_state, ts_idle )==ts_idle;
1970 break;
1971 default:
1972 // Thread is not available to be part of an OpenMP thread team.
1973 break;
1974 }
1975 return success;
1976 }
1977
1978 #if _WIN32||_WIN64
1979 void server_thread::deactivate() {
1980 thread_state_t es = (thread_state_t) my_extra_state.fetch_and_store( ts_deactivated );
1981 __TBB_ASSERT( my_extra_state==ts_deactivated, "someone else tampered with my_extra_state?" );
1982 if( es==ts_none )
1983 state = ts_idle;
1984 else
1985 __TBB_ASSERT( es==ts_reactivated, "Cannot call deactivate() while in ts_deactivated" );
1986 // only the thread can transition itself from ts_deactivted to ts_none
1987 __TBB_ASSERT( my_extra_state==ts_deactivated, "someone else tampered with my_extra_state?" );
1988 my_extra_state = ts_none; // release the critical section
1989 int bal = ++the_balance;
1990 if( bal>0 )
1991 wakeup_some_tbb_threads();
1992 if( es==ts_none )
1993 sleep_perhaps( ts_asleep );
1994 }
1995
1996 void server_thread::reactivate() {
1997 thread_state_t es;
1998 do {
1999 while( (es=read_extra_state())==ts_deactivated )
2000 __TBB_Yield();
2001 if( es==ts_reactivated ) {
2002 __TBB_ASSERT( false, "two Reactivate() calls in a row. Should not happen" );
2003 return;
2004 }
2005 __TBB_ASSERT( es==ts_none, NULL );
2006 } while( (thread_state_t)my_extra_state.compare_and_swap( ts_reactivated, ts_none )!=ts_none );
2007 if( state!=ts_omp_busy ) {
2008 my_extra_state = ts_none;
2009 while( !try_grab_for( ts_omp_busy ) )
2010 __TBB_Yield();
2011 }
2012 }
2013 #endif /* _WIN32||_WIN64 */
2014
2015
2016 template<typename Connection>
2017 bool server_thread::destroy_job( Connection& c ) {
2018 __TBB_ASSERT( !is_omp_thread||(state==ts_idle||state==ts_omp_busy), NULL );
2019 __TBB_ASSERT( is_omp_thread||(state==ts_idle||state==ts_tbb_busy), NULL );
2020 if( !is_omp_thread ) {
2021 __TBB_ASSERT( state==ts_idle||state==ts_tbb_busy, NULL );
2022 if( state==ts_idle )
2023 state.compare_and_swap( ts_done, ts_idle );
2024 // 'state' may be set to ts_tbb_busy by another thread.
2025
2026 if( state==ts_tbb_busy ) { // return the coin to the deposit
2027 // need to deposit first to let the next connection see the change
2028 ++the_balance;
2029 state = ts_done; // no other thread changes the state when it is ts_*_busy
2030 }
2031 }
2032 if( job_automaton* ja = my_ja ) {
2033 rml::job* j;
2034 if( ja->try_plug(j) ) {
2035 __TBB_ASSERT( j, NULL );
2036 c.client().cleanup(*j);
2037 c.remove_client_ref();
2038 } else {
2039 // Some other thread took responsibility for cleaning up the job.
2040 }
2041 }
2042 // Must do remove client reference first, because execution of
2043 // c.remove_ref() can cause *this to be destroyed.
2044 int k = remove_ref();
2045 __TBB_ASSERT_EX( k==0, "more than one references?" );
2046 #if TBB_USE_ASSERT
2047 has_active_thread = false;
2048 #endif /* TBB_USE_ASSERT */
2049 c.remove_server_ref();
2050 return true;
2051 }
2052
2053 bool server_thread::do_termination() {
2054 if( is_omp_thread )
2055 return destroy_job( *static_cast<omp_connection_v2*>(my_conn) );
2056 else
2057 return destroy_job( *static_cast<tbb_connection_v2*>(my_conn) );
2058 }
2059
2060 //! Loop that each thread executes
2061 void server_thread::loop() {
2062 if( is_omp_thread )
2063 static_cast<omp_connection_v2*>(my_conn)->make_job( *this, *my_ja );
2064 else
2065 static_cast<tbb_connection_v2*>(my_conn)->make_job( *this, *my_ja );
2066 for(;;) {
2067 __TBB_Yield();
2068 if( state==ts_idle )
2069 sleep_perhaps( ts_asleep );
2070
2071 // Check whether I should quit.
2072 if( terminate )
2073 if( do_termination() )
2074 return;
2075
2076 // read the state
2077 thread_state_t s = read_state();
2078 __TBB_ASSERT( s==ts_idle||s==ts_omp_busy||s==ts_tbb_busy, NULL );
2079
2080 if( s==ts_omp_busy ) {
2081 // Enslaved by OpenMP team.
2082 omp_dispatch.consume();
2083 /* here wake tbb threads up if feasible */
2084 if( ++the_balance>0 )
2085 wakeup_some_tbb_threads();
2086 state = ts_idle;
2087 } else if( s==ts_tbb_busy ) {
2088 // do some TBB work.
2089 __TBB_ASSERT( my_conn && my_job, NULL );
2090 tbb_connection_v2& conn = *static_cast<tbb_connection_v2*>(my_conn);
2091 // give openmp higher priority
2092 bool has_coin = true;
2093 if( conn.has_slack() ) {
2094 // it has the coin, it should trip to the scheduler at least once as long as its slack is positive
2095 do {
2096 if( conn.try_process( *this, *my_job ) )
2097 if( conn.has_slack() && the_balance>=0 )
2098 has_coin = !conn.wakeup_next_thread( my_map_pos );
2099 } while( has_coin && conn.has_slack() && the_balance>=0 );
2100 }
2101 state = ts_idle;
2102 if( has_coin ) {
2103 ++the_balance; // return the coin back to the deposit
2104 if( conn.has_slack() ) { // a new adjust_job_request_estimate() is in progress
2105 // it may have missed my changes to state and/or the_balance
2106 if( --the_balance>=0 ) { // try to grab the coin back
2107 // I got the coin
2108 if( state.compare_and_swap( ts_tbb_busy, ts_idle )!=ts_idle )
2109 ++the_balance; // someone else enlisted me.
2110 } else {
2111 // overdraft. return the coin
2112 ++the_balance;
2113 }
2114 } // else the new request will see my changes to state & the_balance.
2115 }
2116 /* here wake tbb threads up if feasible */
2117 if( the_balance>0 )
2118 wakeup_some_tbb_threads();
2119 }
2120 }
2121 }
2122 #endif /* !RML_USE_WCRM */
2123
2124 #if RML_USE_WCRM
2125
2126 class tbb_connection_v2;
2127 class omp_connection_v2;
2128
2129 #define CREATE_SCHEDULER_POLICY(policy,min_thrs,max_thrs,stack_size) \
2130 try { \
2131 policy = new SchedulerPolicy (7, \
2132 SchedulerKind, RML_THREAD_KIND, /*defined in _rml_serer_msrt.h*/ \
2133 MinConcurrency, min_thrs, \
2134 MaxConcurrency, max_thrs, \
2135 TargetOversubscriptionFactor, 1, \
2136 ContextStackSize, stack_size/1000, /*ConcRT:kB, iRML:bytes*/ \
2137 ContextPriority, THREAD_PRIORITY_NORMAL, \
2138 DynamicProgressFeedback, ProgressFeedbackDisabled ); \
2139 } catch ( invalid_scheduler_policy_key & ) { \
2140 __TBB_ASSERT( false, "invalid scheduler policy key exception caught" );\
2141 } catch ( invalid_scheduler_policy_value & ) { \
2142 __TBB_ASSERT( false, "invalid scheduler policy value exception caught" );\
2143 }
2144
2145 static unsigned int core_count;
2146 static tbb::atomic<int> core_count_inited;
2147
2148
2149 static unsigned int get_processor_count()
2150 {
2151 if( core_count_inited!=2 ) {
2152 if( core_count_inited.compare_and_swap( 1, 0 )==0 ) {
2153 core_count = GetProcessorCount();
2154 core_count_inited = 2;
2155 } else {
2156 tbb::internal::spin_wait_until_eq( core_count_inited, 2 );
2157 }
2158 }
2159 return core_count;
2160 }
2161
2162 template<typename Connection>
2163 scheduler<Connection>::scheduler( Connection& conn ) : uid(GetSchedulerId()), my_conn(conn) {}
2164
2165 template<>
2166 scheduler<tbb_connection_v2>::scheduler( tbb_connection_v2& conn ) : uid(GetSchedulerId()), my_conn(conn)
2167 {
2168 rml::client& cl = my_conn.client();
2169 unsigned max_job_count = cl.max_job_count();
2170 unsigned count = get_processor_count();
2171 __TBB_ASSERT( max_job_count>0, "max job count must be positive" );
2172 __TBB_ASSERT( count>1, "The processor count must be greater than 1" );
2173 if( max_job_count>count-1) max_job_count = count-1;
2174 CREATE_SCHEDULER_POLICY( my_policy, 0, max_job_count, cl.min_stack_size() );
2175 }
2176
2177 #if __RML_REMOVE_VIRTUAL_PROCESSORS_DISABLED
2178 template<>
2179 void scheduler<tbb_connection_v2>::RemoveVirtualProcessors( IVirtualProcessorRoot**, unsigned int)
2180 {
2181 }
2182 #else
2183 template<>
2184 void scheduler<tbb_connection_v2>::RemoveVirtualProcessors( IVirtualProcessorRoot** vproots, unsigned int count )
2185 {
2186 if( !my_conn.is_closing() )
2187 my_conn.remove_virtual_processors( vproots, count );
2188 }
2189 #endif
2190
2191 template<>
2192 void scheduler<tbb_connection_v2>::NotifyResourcesExternallyIdle( IVirtualProcessorRoot** /*vproots*/, unsigned int /*count*/)
2193 {
2194 __TBB_ASSERT( false, "NotifyResourcesExternallyIdle() is not allowed for TBB" );
2195 }
2196
2197 template<>
2198 void scheduler<tbb_connection_v2>::NotifyResourcesExternallyBusy( IVirtualProcessorRoot** /*vproots*/, unsigned int /*count*/ )
2199 {
2200 __TBB_ASSERT( false, "NotifyResourcesExternallyBusy() is not allowed for TBB" );
2201 }
2202
2203 template<>
2204 scheduler<omp_connection_v2>::scheduler( omp_connection_v2& conn ) : uid(GetSchedulerId()), my_conn(conn)
2205 {
2206 unsigned count = get_processor_count();
2207 rml::client& cl = my_conn.client();
2208 __TBB_ASSERT( count>1, "The processor count must be greater than 1" );
2209 CREATE_SCHEDULER_POLICY( my_policy, count-1, count-1, cl.min_stack_size() );
2210 }
2211
2212 template<>
2213 void scheduler<omp_connection_v2>::RemoveVirtualProcessors( IVirtualProcessorRoot** /*vproots*/, unsigned int /*count*/ ) {
2214 __TBB_ASSERT( false, "RemoveVirtualProcessors() is not allowed for OMP" );
2215 }
2216
2217 template<>
2218 void scheduler<omp_connection_v2>::NotifyResourcesExternallyIdle( IVirtualProcessorRoot** vproots, unsigned int count ){
2219 if( !my_conn.is_closing() )
2220 my_conn.notify_resources_externally_idle( vproots, count );
2221 }
2222
2223 template<>
2224 void scheduler<omp_connection_v2>::NotifyResourcesExternallyBusy( IVirtualProcessorRoot** vproots, unsigned int count ){
2225 if( !my_conn.is_closing() )
2226 my_conn.notify_resources_externally_busy( vproots, count );
2227 }
2228
2229 /* ts_idle, ts_asleep, ts_busy */
2230 void tbb_server_thread::Dispatch( DispatchState* ) {
2231 // Activate() will resume a thread right after Deactivate() as if it returns from the call
2232 tbb_connection_v2* tbb_conn = static_cast<tbb_connection_v2*>(my_conn);
2233 make_job( *tbb_conn, *this );
2234
2235 for( ;; ) {
2236 // Try to wake some tbb threads if the balance is positive.
2237 // When a thread is added by ConcRT and enter here for the first time,
2238 // the thread may wake itself up (i.e., atomically change its state to ts_busy.
2239 if( the_balance>0 )
2240 wakeup_some_tbb_threads();
2241 if( read_state()!=ts_busy )
2242 if( sleep_perhaps() )
2243 return;
2244 if( terminate )
2245 if( initiate_termination() )
2246 return;
2247 if( read_state()==ts_busy ) {
2248 // this thread has a coin (i.e., state=ts_busy; it should trip to the scheduler at least once
2249 if ( tbb_conn->has_slack() ) {
2250 do {
2251 tbb_conn->try_process( *wait_for_job() );
2252 } while( tbb_conn->has_slack() && the_balance>=0 && !is_removed() );
2253 }
2254 __TBB_ASSERT( read_state()==ts_busy, "thread is not in busy state after returning from process()" );
2255 // see remove_virtual_processors()
2256 if( my_state.compare_and_swap( ts_idle, ts_busy )==ts_busy ) {
2257 int bal = ++the_balance;
2258 if( tbb_conn->has_slack() ) {
2259 // slack is positive, volunteer to help
2260 bal = --the_balance; // try to grab the coin back
2261 if( bal>=0 ) { // got the coin back
2262 if( my_state.compare_and_swap( ts_busy, ts_idle )!=ts_idle )
2263 ++the_balance; // someone else enlisted me.
2264 // else my_state is ts_busy, I will come back to tbb_conn->try_process().
2265 } else {
2266 // overdraft. return the coin
2267 ++the_balance;
2268 }
2269 } // else the new request will see my changes to state & the_balance.
2270 } else {
2271 __TBB_ASSERT( false, "someone tampered with my state" );
2272 }
2273 } // someone else might set the state to something other than ts_idle
2274 }
2275 }
2276
2277 void omp_server_thread::Dispatch( DispatchState* ) {
2278 // Activate() will resume a thread right after Deactivate() as if it returns from the call
2279 make_job( *static_cast<omp_connection_v2*>(my_conn), *this );
2280
2281 for( ;; ) {
2282 if( read_state()!=ts_busy )
2283 sleep_perhaps();
2284 if( terminate ) {
2285 if( initiate_termination() )
2286 return;
2287 }
2288 if( read_state()==ts_busy ) {
2289 omp_data.consume();
2290 __TBB_ASSERT( read_state()==ts_busy, "thread is not in busy state after returning from process()" );
2291 my_thread_map.adjust_balance( 1 );
2292 set_state( ts_idle );
2293 }
2294 // someone else might set the state to something other than ts_idle
2295 }
2296 }
2297
2298 //! Attempt to change a thread's state to ts_omp_busy, and waking it up if necessary.
2299 thread_grab_t server_thread_rep::try_grab_for() {
2300 thread_grab_t res = wk_failed;
2301 thread_state_t s = read_state();
2302 switch( s ) {
2303 case ts_asleep:
2304 if( wakeup( ts_busy, ts_asleep ) )
2305 res = wk_from_asleep;
2306 __TBB_ASSERT( res==wk_failed||read_state()==ts_busy, NULL );
2307 break;
2308 case ts_idle:
2309 if( my_state.compare_and_swap( ts_busy, ts_idle )==ts_idle )
2310 res = wk_from_idle;
2311 // At this point a thread is grabbed (i.e., its state has changed to ts_busy.
2312 // It is possible that the thread 1) processes the job, returns from process() and
2313 // sets its state ts_idle again. In some cases, it even sets its state to ts_asleep.
2314 break;
2315 default:
2316 break;
2317 }
2318 return res;
2319 }
2320
2321 bool tbb_server_thread::switch_out() {
2322 thread_state_t s = read_state();
2323 __TBB_ASSERT( s==ts_asleep||s==ts_busy, NULL );
2324 // This thread comes back from the TBB scheduler, and changed its state to ts_asleep successfully.
2325 // The master enlisted it and woke it up by Activate()'ing it; now it is emerging from Deactivated().
2326 // ConcRT requested for removal of the vp associated with the thread, and RML marks it removed.
2327 // Now, it has ts_busy, and removed. -- we should remove it.
2328 IExecutionResource* old_vp = my_execution_resource;
2329 if( s==ts_busy ) {
2330 ++the_balance;
2331 my_state = ts_asleep;
2332 }
2333 IThreadProxy* proxy = my_proxy;
2334 __TBB_ASSERT( proxy, NULL );
2335 my_execution_resource = (IExecutionResource*) c_remove_prepare;
2336 old_vp->Remove( my_scheduler );
2337 my_execution_resource = (IExecutionResource*) c_remove_returned;
2338 int cnt = --activation_count;
2339 __TBB_ASSERT_EX( cnt==0||cnt==1, "too many activations?" );
2340 proxy->SwitchOut();
2341 if( terminate ) {
2342 bool activated = activation_count==1;
2343 #if TBB_USE_ASSERT
2344 /* In a rare sequence of events, a thread comes out of SwitchOut with activation_count==1.
2345 * 1) The thread is SwitchOut'ed.
2346 * 2) AddVirtualProcessors() arrived and the thread is Activated.
2347 * 3) The thread is coming out of SwitchOut().
2348 * 4) request_close_connection arrives and inform the thread that it is time to terminate.
2349 * 5) The thread hits the check and falls into the path with 'activated==true'.
2350 * In that case, do the clean-up but do not switch to the thread scavenger; rather simply return to RM.
2351 */
2352 if( activated ) {
2353 // thread is 'revived' in add_virtual_processors after being Activated().
2354 // so, if the thread extra state is still marked 'removed', it will shortly change to 'none'
2355 // i.e., !is_remove(). The thread state is changed to ts_idle before the extra state, so
2356 // the thread's state should be either ts_idle or ts_done.
2357 while( is_removed() )
2358 __TBB_Yield();
2359 thread_state_t s = read_state();
2360 __TBB_ASSERT( s==ts_idle || s==ts_done, NULL );
2361 }
2362 #endif
2363 __TBB_ASSERT( my_state==ts_asleep||my_state==ts_idle, NULL );
2364 // it is possible that in make_job() the thread may not have a chance to create a job.
2365 // my_job may not be set if the thread did not get a chance to process client's job (i.e., call try_process())
2366 rml::job* j;
2367 if( my_job_automaton.try_plug(j) ) {
2368 __TBB_ASSERT( j, NULL );
2369 my_client.cleanup(*j);
2370 my_conn->remove_client_ref();
2371 }
2372 // Must do remove client reference first, because execution of
2373 // c.remove_ref() can cause *this to be destroyed.
2374 if( !activated )
2375 proxy->SwitchTo( my_thread_map.get_thread_scavenger(), Idle );
2376 my_conn->remove_server_ref();
2377 return true;
2378 }
2379 // We revive a thread in add_virtual_processors() after we Activate the thread on a new virtual processor.
2380 // So briefly wait until the thread's my_execution_resource gets set.
2381 while( get_virtual_processor()==c_remove_returned )
2382 __TBB_Yield();
2383 return false;
2384 }
2385
2386 bool tbb_server_thread::sleep_perhaps () {
2387 if( terminate ) return false;
2388 thread_state_t s = read_state();
2389 if( s==ts_idle ) {
2390 if( my_state.compare_and_swap( ts_asleep, ts_idle )==ts_idle ) {
2391 // If a thread is between read_state() and compare_and_swap(), and the master tries to terminate,
2392 // the master's compare_and_swap() will fail because the thread's state is ts_idle.
2393 // We need to check if terminate is true or not before letting the thread go to sleep,
2394 // otherwise we will miss the terminate signal.
2395 if( !terminate ) {
2396 if( !is_removed() ) {
2397 --activation_count;
2398 get_virtual_processor()->Deactivate( this );
2399 }
2400 if( is_removed() ) {
2401 if( switch_out() )
2402 return true;
2403 __TBB_ASSERT( my_execution_resource>c_remove_returned, NULL );
2404 }
2405 // in add_virtual_processors(), when we revive a thread, we change its state after Activate the thread
2406 // in that case the state may be ts_asleep for a short period
2407 while( read_state()==ts_asleep )
2408 __TBB_Yield();
2409 } else {
2410 if( my_state.compare_and_swap( ts_done, ts_asleep )!=ts_asleep ) {
2411 --activation_count;
2412 // unbind() changed my state. It will call Activate(). So issue a matching Deactivate()
2413 get_virtual_processor()->Deactivate( this );
2414 }
2415 }
2416 }
2417 } else {
2418 __TBB_ASSERT( s==ts_busy, NULL );
2419 }
2420 return false;
2421 }
2422
2423 void omp_server_thread::sleep_perhaps () {
2424 if( terminate ) return;
2425 thread_state_t s = read_state();
2426 if( s==ts_idle ) {
2427 if( my_state.compare_and_swap( ts_asleep, ts_idle )==ts_idle ) {
2428 // If a thread is between read_state() and compare_and_swap(), and the master tries to terminate,
2429 // the master's compare_and_swap() will fail because the thread's state is ts_idle.
2430 // We need to check if terminate is true or not before letting the thread go to sleep,
2431 // otherwise we will miss the terminate signal.
2432 if( !terminate ) {
2433 get_virtual_processor()->Deactivate( this );
2434 __TBB_ASSERT( !is_removed(), "OMP threads should not be deprived of a virtual processor" );
2435 __TBB_ASSERT( read_state()!=ts_asleep, NULL );
2436 } else {
2437 if( my_state.compare_and_swap( ts_done, ts_asleep )!=ts_asleep )
2438 // unbind() changed my state. It will call Activate(). So issue a matching Deactivate()
2439 get_virtual_processor()->Deactivate( this );
2440 }
2441 }
2442 } else {
2443 __TBB_ASSERT( s==ts_busy, NULL );
2444 }
2445 }
2446
2447 bool tbb_server_thread::initiate_termination() {
2448 if( read_state()==ts_busy ) {
2449 int bal = ++the_balance;
2450 if( bal>0 ) wakeup_some_tbb_threads();
2451 }
2452 return destroy_job( (tbb_connection_v2*) my_conn );
2453 }
2454
2455 template<typename Connection>
2456 bool server_thread_rep::destroy_job( Connection* c ) {
2457 __TBB_ASSERT( my_state!=ts_asleep, NULL );
2458 rml::job* j;
2459 if( my_job_automaton.try_plug(j) ) {
2460 __TBB_ASSERT( j, NULL );
2461 my_client.cleanup(*j);
2462 c->remove_client_ref();
2463 }
2464 // Must do remove client reference first, because execution of
2465 // c.remove_ref() can cause *this to be destroyed.
2466 c->remove_server_ref();
2467 return true;
2468 }
2469
2470 void thread_map::assist_cleanup( bool assist_null_only ) {
2471 // To avoid deadlock, the current thread *must* help out with cleanups that have not started,
2472 // because the thread that created the job may be busy for a long time.
2473 for( iterator i = begin(); i!=end(); ++i ) {
2474 rml::job* j=0;
2475 server_thread* thr = (*i).second;
2476 job_automaton& ja = thr->my_job_automaton;
2477 if( assist_null_only ? ja.try_plug_null() : ja.try_plug(j) ) {
2478 if( j ) {
2479 my_client.cleanup(*j);
2480 } else {
2481 // server thread did not get a chance to create a job.
2482 }
2483 remove_client_ref();
2484 }
2485 }
2486 }
2487
2488 void thread_map::add_virtual_processors( IVirtualProcessorRoot** vproots, unsigned int count, tbb_connection_v2& conn, ::tbb::spin_mutex& mtx )
2489 {
2490 #if TBB_USE_ASSERT
2491 int req_cnt = ++n_add_vp_requests;
2492 __TBB_ASSERT( req_cnt==1, NULL );
2493 #endif
2494 std::vector<thread_map::iterator> vec(count);
2495 std::vector<tbb_server_thread*> tvec(count);
2496 iterator end;
2497
2498 {
2499 tbb::spin_mutex::scoped_lock lck( mtx );
2500 __TBB_ASSERT( my_map.size()==0||count==1, NULL );
2501 end = my_map.end(); //remember 'end' at the time of 'find'
2502 // find entries in the map for those VPs that were previously added and then removed.
2503 for( size_t i=0; i<count; ++i ) {
2504 vec[i] = my_map.find( (key_type) vproots[i] );
2505 #if TBB_USE_DEBUG
2506 if( vec[i]!=end ) {
2507 tbb_server_thread* t = (tbb_server_thread*) (*vec[i]).second;
2508 IVirtualProcessorRoot* v = t->get_virtual_processor();
2509 __TBB_ASSERT( v==c_remove_prepare||v==c_remove_returned, NULL );
2510 }
2511 #endif
2512 }
2513
2514 iterator nxt = my_map.begin();
2515 for( size_t i=0; i<count; ++i ) {
2516 if( vec[i]!=end ) {
2517 #if TBB_USE_ASSERT
2518 tbb_server_thread* t = (tbb_server_thread*) (*vec[i]).second;
2519 __TBB_ASSERT( t->read_state()==ts_asleep, NULL );
2520 IVirtualProcessorRoot* r = t->get_virtual_processor();
2521 __TBB_ASSERT( r==c_remove_prepare||r==c_remove_returned, NULL );
2522 #endif
2523 continue;
2524 }
2525
2526 if( my_unrealized_threads>0 ) {
2527 --my_unrealized_threads;
2528 } else {
2529 __TBB_ASSERT( nxt!=end, "nxt should not be thread_map::iterator::end" );
2530 // find a removed thread context for i
2531 for( ; nxt!=end; ++nxt ) {
2532 tbb_server_thread* t = (tbb_server_thread*) (*nxt).second;
2533 if( t->is_removed() && t->read_state()==ts_asleep && t->get_virtual_processor()==c_remove_returned ) {
2534 vec[i] = nxt++;
2535 break;
2536 }
2537 }
2538 // break target
2539 if( vec[i]==end ) // ignore excessive VP.
2540 vproots[i] = NULL;
2541 }
2542 }
2543 }
2544
2545 for( size_t i=0; i<count; ++i ) {
2546 __TBB_ASSERT( !tvec[i], NULL );
2547 if( vec[i]==end ) {
2548 if( vproots[i] ) {
2549 tvec[i] = my_tbb_allocator.allocate(1);
2550 new ( tvec[i] ) tbb_server_thread( false, my_scheduler, (IExecutionResource*)vproots[i], &conn, *this, my_client );
2551 }
2552 #if TBB_USE_ASSERT
2553 } else {
2554 tbb_server_thread* t = (tbb_server_thread*) (*vec[i]).second;
2555 __TBB_ASSERT( t->GetProxy(), "Proxy is cleared?" );
2556 #endif
2557 }
2558 }
2559
2560 {
2561 tbb::spin_mutex::scoped_lock lck( mtx );
2562
2563 bool closing = is_closing();
2564
2565 for( size_t i=0; i<count; ++i ) {
2566 if( vec[i]==end ) {
2567 if( vproots[i] ) {
2568 thread_map::key_type key = (thread_map::key_type) vproots[i];
2569 vec[i] = insert( key, (server_thread*) tvec[i] );
2570 my_client_ref_count.add_ref();
2571 my_server_ref_count.add_ref();
2572 }
2573 } else if( !closing ) {
2574 tbb_server_thread* t = (tbb_server_thread*) (*vec[i]).second;
2575
2576 if( (*vec[i]).first!=(thread_map::key_type)vproots[i] ) {
2577 my_map.erase( vec[i] );
2578 thread_map::key_type key = (thread_map::key_type) vproots[i];
2579 __TBB_ASSERT( key, NULL );
2580 vec[i] = insert( key, t );
2581 }
2582 __TBB_ASSERT( t->read_state()==ts_asleep, NULL );
2583 // We did not decrement server/client ref count when a thread is removed.
2584 // So, don't increment server/client ref count here.
2585 }
2586 }
2587
2588 // we could check is_closing() earlier. That requires marking the newly allocated server_thread objects
2589 // that are not inserted into the thread_map, and deallocate them. Doing so seems more cumbersome
2590 // than simply adding these to the thread_map and let thread_map's destructor take care of reclamation.
2591 __TBB_ASSERT( closing==is_closing(), NULL );
2592 if( closing ) return;
2593 }
2594
2595 for( size_t i=0; i<count; ++i ) {
2596 if( vproots[i] ) {
2597 tbb_server_thread* t = (tbb_server_thread*) (*vec[i]).second;
2598 __TBB_ASSERT( tvec[i]!=NULL||t->GetProxy(), "Proxy is cleared?" );
2599 if( t->is_removed() )
2600 __TBB_ASSERT( t->get_virtual_processor()==c_remove_returned, NULL );
2601 int cnt = ++t->activation_count;
2602 __TBB_ASSERT_EX( cnt==0||cnt==1, NULL );
2603 vproots[i]->Activate( t );
2604 if( t->is_removed() )
2605 t->revive( my_scheduler, vproots[i], my_client );
2606 }
2607 }
2608 #if TBB_USE_ASSERT
2609 req_cnt = --n_add_vp_requests;
2610 __TBB_ASSERT( req_cnt==0, NULL );
2611 #endif
2612 }
2613
2614 void thread_map::remove_virtual_processors( IVirtualProcessorRoot** vproots, unsigned count, ::tbb::spin_mutex& mtx ) {
2615 if( my_map.size()==0 )
2616 return;
2617 tbb::spin_mutex::scoped_lock lck( mtx );
2618
2619 if( is_closing() ) return;
2620
2621 for( unsigned int c=0; c<count; ++c ) {
2622 iterator i = my_map.find( (key_type) vproots[c] );
2623 if( i==my_map.end() ) {
2624 thread_scavenger_thread* tst = my_thread_scavenger_thread;
2625 if( !tst ) {
2626 // Remove unknown vp from my scheduler;
2627 vproots[c]->Remove( my_scheduler );
2628 } else {
2629 while( (tst=my_thread_scavenger_thread)==c_claimed )
2630 __TBB_Yield();
2631 if( vproots[c]!=tst->get_virtual_processor() )
2632 vproots[c]->Remove( my_scheduler );
2633 }
2634 continue;
2635 }
2636 tbb_server_thread* thr = (tbb_server_thread*) (*i).second;
2637 __TBB_ASSERT( thr->tbb_thread, "incorrect type of server_thread" );
2638 thr->set_removed();
2639 if( thr->read_state()==ts_asleep ) {
2640 while( thr->activation_count>0 ) {
2641 if( thr->get_virtual_processor()<=c_remove_returned )
2642 break;
2643 __TBB_Yield();
2644 }
2645 if( thr->get_virtual_processor()>c_remove_returned ) {
2646 // the thread is in Deactivated state
2647 ++thr->activation_count;
2648 // wake the thread up so that it Switches Out itself.
2649 thr->get_virtual_processor()->Activate( thr );
2650 } // else, it is Switched Out
2651 } // else the thread will see that it is removed and proceed to switch itself out without Deactivation
2652 }
2653 }
2654
2655 void thread_map::add_virtual_processors( IVirtualProcessorRoot** vproots, unsigned int count, omp_connection_v2& conn, ::tbb::spin_mutex& mtx )
2656 {
2657 std::vector<thread_map::iterator> vec(count);
2658 std::vector<server_thread*> tvec(count);
2659 iterator end;
2660
2661 {
2662 tbb::spin_mutex::scoped_lock lck( mtx );
2663 // read the map
2664 end = my_map.end(); //remember 'end' at the time of 'find'
2665 for( size_t i=0; i<count; ++i )
2666 vec[i] = my_map.find( (key_type) vproots[i] );
2667 }
2668
2669 for( size_t i=0; i<count; ++i ) {
2670 __TBB_ASSERT( !tvec[i], NULL );
2671 if( vec[i]==end ) {
2672 tvec[i] = my_omp_allocator.allocate(1);
2673 new ( tvec[i] ) omp_server_thread( false, my_scheduler, (IExecutionResource*)vproots[i], &conn, *this, my_client );
2674 }
2675 }
2676
2677 {
2678 tbb::spin_mutex::scoped_lock lck( mtx );
2679
2680 for( size_t i=0; i<count; ++i ) {
2681 if( vec[i]==my_map.end() ) {
2682 thread_map::key_type key = (thread_map::key_type) vproots[i];
2683 vec[i] = insert( key, tvec[i] );
2684 my_client_ref_count.add_ref();
2685 my_server_ref_count.add_ref();
2686 }
2687 }
2688
2689 // we could check is_closing() earlier. That requires marking the newly allocated server_thread objects
2690 // that are not inserted into the thread_map, and deallocate them. Doing so seems more cumbersome
2691 // than simply adding these to the thread_map and let thread_map's destructor take care of reclamation.
2692 if( is_closing() ) return;
2693 }
2694
2695 for( size_t i=0; i<count; ++i )
2696 vproots[i]->Activate( (*vec[i]).second );
2697
2698 {
2699 tbb::spin_mutex::scoped_lock lck( mtx );
2700 for( size_t i=0; i<count; ++i )
2701 original_exec_resources.push_back( vproots[i] );
2702 }
2703 }
2704
2705 void thread_map::mark_virtual_processors_as_lent( IVirtualProcessorRoot** vproots, unsigned count, ::tbb::spin_mutex& mtx ) {
2706 tbb::spin_mutex::scoped_lock lck( mtx );
2707
2708 if( is_closing() ) return;
2709
2710 iterator end = my_map.end();
2711 for( unsigned int c=0; c<count; ++c ) {
2712 iterator i = my_map.find( (key_type) vproots[c] );
2713 if( i==end ) {
2714 // The vproc has not been added to the map in create_oversubscribers()
2715 my_map.insert( hash_map_type::value_type( (key_type) vproots[c], (server_thread*)1 ) );
2716 } else {
2717 server_thread* thr = (*i).second;
2718 if( ((uintptr_t)thr)&~(uintptr_t)1 ) {
2719 __TBB_ASSERT( !thr->is_removed(), "incorrectly removed" );
2720 ((omp_server_thread*)thr)->set_lent();
2721 }
2722 }
2723 }
2724 }
2725
2726 void thread_map::create_oversubscribers( unsigned n, std::vector<server_thread*>& thr_vec, omp_connection_v2& conn, ::tbb::spin_mutex& mtx ) {
2727 std::vector<IExecutionResource*> curr_exec_rsc;
2728 {
2729 tbb::spin_mutex::scoped_lock lck( mtx );
2730 curr_exec_rsc = original_exec_resources; // copy construct
2731 }
2732 typedef std::vector<IExecutionResource*>::iterator iterator_er;
2733 typedef ::std::vector<std::pair<hash_map_type::key_type, hash_map_type::mapped_type> > hash_val_vector_t;
2734 hash_val_vector_t v_vec(n);
2735 iterator_er begin = curr_exec_rsc.begin();
2736 iterator_er end = curr_exec_rsc.end();
2737 iterator_er i = begin;
2738 for( unsigned c=0; c<n; ++c ) {
2739 IVirtualProcessorRoot* vpr = my_scheduler_proxy->CreateOversubscriber( *i );
2740 omp_server_thread* t = new ( my_omp_allocator.allocate(1) ) omp_server_thread( true, my_scheduler, (IExecutionResource*)vpr, &conn, *this, my_client );
2741 thr_vec[c] = t;
2742 v_vec[c] = hash_map_type::value_type( (key_type) vpr, t );
2743 if( ++i==end ) i = begin;
2744 }
2745
2746 {
2747 tbb::spin_mutex::scoped_lock lck( mtx );
2748
2749 if( is_closing() ) return;
2750
2751 iterator end = my_map.end();
2752 unsigned c = 0;
2753 for( hash_val_vector_t::iterator vi=v_vec.begin(); vi!=v_vec.end(); ++vi, ++c ) {
2754 iterator i = my_map.find( (key_type) (*vi).first );
2755 if( i==end ) {
2756 my_map.insert( *vi );
2757 } else {
2758 // the vproc has not been added to the map in mark_virtual_processors_as_returned();
2759 uintptr_t lent = (uintptr_t) (*i).second;
2760 __TBB_ASSERT( lent<=1, "vproc map entry added incorrectly?");
2761 (*i).second = thr_vec[c];
2762 if( lent )
2763 ((omp_server_thread*)thr_vec[c])->set_lent();
2764 else
2765 ((omp_server_thread*)thr_vec[c])->set_returned();
2766 }
2767 my_client_ref_count.add_ref();
2768 my_server_ref_count.add_ref();
2769 }
2770 }
2771 }
2772
2773 void thread_map::wakeup_tbb_threads( int c, ::tbb::spin_mutex& mtx ) {
2774 std::vector<tbb_server_thread*> vec(c);
2775
2776 size_t idx = 0;
2777 {
2778 tbb::spin_mutex::scoped_lock lck( mtx );
2779
2780 if( is_closing() ) return;
2781 // only one RML thread is in here to wake worker threads up.
2782
2783 int bal = the_balance;
2784 int cnt = c<bal ? c : bal;
2785
2786 if( cnt<=0 ) { return; }
2787
2788 for( iterator i=begin(); i!=end(); ++i ) {
2789 tbb_server_thread* thr = (tbb_server_thread*) (*i).second;
2790 // ConcRT RM should take threads away from TBB scheduler instead of lending them to another scheduler
2791 if( thr->is_removed() )
2792 continue;
2793
2794 if( --the_balance>=0 ) {
2795 thread_grab_t res;
2796 while( (res=thr->try_grab_for())!=wk_from_idle ) {
2797 if( res==wk_from_asleep ) {
2798 vec[idx++] = thr;
2799 break;
2800 } else {
2801 thread_state_t s = thr->read_state();
2802 if( s==ts_busy ) {// failed because already assigned. move on.
2803 ++the_balance;
2804 goto skip;
2805 }
2806 }
2807 }
2808 thread_state_t s = thr->read_state();
2809 __TBB_ASSERT_EX( s==ts_busy, "should have set the state to ts_busy" );
2810 if( --cnt==0 )
2811 break;
2812 } else {
2813 // overdraft
2814 ++the_balance;
2815 break;
2816 }
2817 skip:
2818 ;
2819 }
2820 }
2821
2822 for( size_t i=0; i<idx; ++i ) {
2823 tbb_server_thread* thr = vec[i];
2824 __TBB_ASSERT( thr, NULL );
2825 thread_state_t s = thr->read_state();
2826 __TBB_ASSERT_EX( s==ts_busy, "should have set the state to ts_busy" );
2827 ++thr->activation_count;
2828 thr->get_virtual_processor()->Activate( thr );
2829 }
2830
2831 }
2832
2833 void thread_map::mark_virtual_processors_as_returned( IVirtualProcessorRoot** vprocs, unsigned int count, tbb::spin_mutex& mtx ) {
2834 {
2835 tbb::spin_mutex::scoped_lock lck( mtx );
2836
2837 if( is_closing() ) return;
2838
2839 iterator end = my_map.end();
2840 for(unsigned c=0; c<count; ++c ) {
2841 iterator i = my_map.find( (key_type) vprocs[c] );
2842 if( i==end ) {
2843 // the vproc has not been added to the map in create_oversubscribers()
2844 my_map.insert( hash_map_type::value_type( (key_type) vprocs[c], static_cast<server_thread*>(0) ) );
2845 } else {
2846 omp_server_thread* thr = (omp_server_thread*) (*i).second;
2847 if( ((uintptr_t)thr)&~(uintptr_t)1 ) {
2848 __TBB_ASSERT( !thr->is_removed(), "incorrectly removed" );
2849 // we should not make any assumption on the initial state of an added vproc.
2850 thr->set_returned();
2851 }
2852 }
2853 }
2854 }
2855 }
2856
2857
2858 void thread_map::unbind( rml::server& /*server*/, tbb::spin_mutex& mtx ) {
2859 {
2860 tbb::spin_mutex::scoped_lock lck( mtx );
2861 shutdown_in_progress = true; // ignore any callbacks from ConcRT RM
2862
2863 // Ask each server_thread to cleanup its job for this server.
2864 for( iterator i = begin(); i!=end(); ++i ) {
2865 server_thread* t = (*i).second;
2866 t->terminate = true;
2867 if( t->is_removed() ) {
2868 // This is for TBB only as ConcRT RM does not request OMP schedulers to remove virtual processors
2869 if( t->read_state()==ts_asleep ) {
2870 __TBB_ASSERT( my_thread_scavenger_thread, "this is TBB connection; thread_scavenger_thread must be allocated" );
2871 // thread is on its way to switch_out; see remove_virtual_processors() where
2872 // the thread is Activated() to bring it back from 'Deactivated' in sleep_perhaps()
2873 // now assume that the thread will go to SwitchOut()
2874 #if TBB_USE_ASSERT
2875 while( t->get_virtual_processor()>c_remove_returned )
2876 __TBB_Yield();
2877 #endif
2878 // A removed thread is supposed to proceed to SwithcOut.
2879 // There, we remove client&server references.
2880 }
2881 } else {
2882 if( t->wakeup( ts_done, ts_asleep ) ) {
2883 if( t->tbb_thread )
2884 ++((tbb_server_thread*)t)->activation_count;
2885 t->get_virtual_processor()->Activate( t );
2886 // We mark in the thread_map such that when termination sequence started, we ignore
2887 // all notification from ConcRT RM.
2888 }
2889 }
2890 }
2891 }
2892 // Remove extra ref to client.
2893 remove_client_ref();
2894
2895 if( my_thread_scavenger_thread ) {
2896 thread_scavenger_thread* tst;
2897 while( (tst=my_thread_scavenger_thread)==c_claimed )
2898 __TBB_Yield();
2899 #if TBB_USE_ASSERT
2900 ++my_thread_scavenger_thread->activation_count;
2901 #endif
2902 tst->get_virtual_processor()->Activate( tst );
2903 }
2904 }
2905
2906 #if !__RML_REMOVE_VIRTUAL_PROCESSORS_DISABLED
2907 void thread_map::allocate_thread_scavenger( IExecutionResource* v )
2908 {
2909 if( my_thread_scavenger_thread>c_claimed ) return;
2910 thread_scavenger_thread* c = my_thread_scavenger_thread.fetch_and_store((thread_scavenger_thread*)c_claimed);
2911 if( c==NULL ) { // successfully claimed
2912 add_server_ref();
2913 #if TBB_USE_ASSERT
2914 ++n_thread_scavengers_created;
2915 #endif
2916 __TBB_ASSERT( v, NULL );
2917 IVirtualProcessorRoot* vpr = my_scheduler_proxy->CreateOversubscriber( v );
2918 my_thread_scavenger_thread = c = new ( my_scavenger_allocator.allocate(1) ) thread_scavenger_thread( my_scheduler, vpr, *this );
2919 #if TBB_USE_ASSERT
2920 ++c->activation_count;
2921 #endif
2922 vpr->Activate( c );
2923 } else if( c>c_claimed ) {
2924 my_thread_scavenger_thread = c;
2925 }
2926 }
2927 #endif
2928
2929 void thread_scavenger_thread::Dispatch( DispatchState* )
2930 {
2931 __TBB_ASSERT( my_proxy, NULL );
2932 #if TBB_USE_ASSERT
2933 --activation_count;
2934 #endif
2935 get_virtual_processor()->Deactivate( this );
2936 for( thread_map::iterator i=my_thread_map.begin(); i!=my_thread_map.end(); ++i ) {
2937 tbb_server_thread* t = (tbb_server_thread*) (*i).second;
2938 if( t->read_state()==ts_asleep && t->is_removed() ) {
2939 while( t->get_execution_resource()!=c_remove_returned )
2940 __TBB_Yield();
2941 my_proxy->SwitchTo( t, Blocking );
2942 }
2943 }
2944 get_virtual_processor()->Remove( my_scheduler );
2945 my_thread_map.remove_server_ref();
2946 // signal to the connection scavenger that i am done with the map.
2947 __TBB_ASSERT( activation_count==1, NULL );
2948 set_state( ts_done );
2949 }
2950
2951 //! Windows "DllMain" that handles startup and shutdown of dynamic library.
2952 extern "C" bool WINAPI DllMain( HINSTANCE /*hinstDLL*/, DWORD fwdReason, LPVOID lpvReserved ) {
2953 void assist_cleanup_connections();
2954 if( fwdReason==DLL_PROCESS_DETACH ) {
2955 // dll is being unloaded
2956 if( !lpvReserved ) // if FreeLibrary has been called
2957 assist_cleanup_connections();
2958 }
2959 return true;
2960 }
2961
2962 void free_all_connections( uintptr_t conn_ex ) {
2963 while( conn_ex ) {
2964 bool is_tbb = (conn_ex&2)>0;
2965 //clear extra bits
2966 uintptr_t curr_conn = conn_ex & ~(uintptr_t)3;
2967 __TBB_ASSERT( curr_conn, NULL );
2968
2969 // Wait for worker threads to return
2970 if( is_tbb ) {
2971 tbb_connection_v2* tbb_conn = reinterpret_cast<tbb_connection_v2*>(curr_conn);
2972 conn_ex = reinterpret_cast<uintptr_t>(tbb_conn->next_conn);
2973 while( tbb_conn->my_thread_map.remove_server_ref()>0 )
2974 __TBB_Yield();
2975 delete tbb_conn;
2976 } else {
2977 omp_connection_v2* omp_conn = reinterpret_cast<omp_connection_v2*>(curr_conn);
2978 conn_ex = reinterpret_cast<uintptr_t>(omp_conn->next_conn);
2979 while( omp_conn->my_thread_map.remove_server_ref()>0 )
2980 __TBB_Yield();
2981 delete omp_conn;
2982 }
2983 }
2984 }
2985
2986 void assist_cleanup_connections()
2987 {
2988 //signal to connection_scavenger_thread to terminate
2989 uintptr_t tail = connections_to_reclaim.tail;
2990 while( connections_to_reclaim.tail.compare_and_swap( garbage_connection_queue::plugged, tail )!=tail ) {
2991 __TBB_Yield();
2992 tail = connections_to_reclaim.tail;
2993 }
2994
2995 __TBB_ASSERT( connection_scavenger.state==ts_busy || connection_scavenger.state==ts_asleep, NULL );
2996 // Scavenger thread may be busy freeing connections
2997 DWORD thr_exit_code = STILL_ACTIVE;
2998 while( connection_scavenger.state==ts_busy ) {
2999 if( GetExitCodeThread( connection_scavenger.thr_handle, &thr_exit_code )>0 )
3000 if( thr_exit_code!=STILL_ACTIVE )
3001 break;
3002 __TBB_Yield();
3003 thr_exit_code = STILL_ACTIVE;
3004 }
3005 if( connection_scavenger.state==ts_asleep && thr_exit_code==STILL_ACTIVE )
3006 connection_scavenger.wakeup(); // wake the connection scavenger thread up
3007
3008 // it is possible that the connection scavenger thread already exited. Take over its responsibility.
3009 if( tail && connections_to_reclaim.tail!=garbage_connection_queue::plugged_acked ) {
3010 // atomically claim the head of the list.
3011 uintptr_t head = connections_to_reclaim.head.fetch_and_store( garbage_connection_queue::empty );
3012 if( head==garbage_connection_queue::empty )
3013 head = tail;
3014 connection_scavenger.process_requests( head );
3015 }
3016 __TBB_ASSERT( connections_to_reclaim.tail==garbage_connection_queue::plugged||connections_to_reclaim.tail==garbage_connection_queue::plugged_acked, "someone else added a request after termination has initiated" );
3017 __TBB_ASSERT( (unsigned)the_balance==the_default_concurrency, NULL );
3018 }
3019
3020 void connection_scavenger_thread::sleep_perhaps() {
3021 uintptr_t tail = connections_to_reclaim.tail;
3022 // connections_to_reclaim.tail==garbage_connection_queue::plugged --> terminate,
3023 // connections_to_reclaim.tail>garbage_connection_queue::plugged : we got work to do
3024 if( tail>=garbage_connection_queue::plugged ) return;
3025 __TBB_ASSERT( !tail, NULL );
3026 thread_monitor::cookie c;
3027 monitor.prepare_wait(c);
3028 if( state.compare_and_swap( ts_asleep, ts_busy )==ts_busy ) {
3029 if( connections_to_reclaim.tail!=garbage_connection_queue::plugged ) {
3030 monitor.commit_wait(c);
3031 // Someone else woke me up. The compare_and_swap further below deals with spurious wakeups.
3032 } else {
3033 monitor.cancel_wait();
3034 }
3035 thread_state_t s = state;
3036 if( s==ts_asleep ) // if spurious wakeup.
3037 state.compare_and_swap( ts_busy, ts_asleep );
3038 // I woke myself up, either because I cancelled the wait or suffered a spurious wakeup.
3039 } else {
3040 __TBB_ASSERT( false, "someone else tampered with my state" );
3041 }
3042 __TBB_ASSERT( state==ts_busy, "a thread can only put itself to sleep" );
3043 }
3044
3045 void connection_scavenger_thread::process_requests( uintptr_t conn_ex )
3046 {
3047 __TBB_ASSERT( conn_ex>1, NULL );
3048 __TBB_ASSERT( n_scavenger_threads==1||connections_to_reclaim.tail==garbage_connection_queue::plugged, "more than one connection_scavenger_thread being active?" );
3049
3050 bool done = false;
3051 while( !done ) {
3052 bool is_tbb = (conn_ex&2)>0;
3053 //clear extra bits
3054 uintptr_t curr_conn = conn_ex & ~(uintptr_t)3;
3055
3056 // no contention. there is only one connection_scavenger_thread!!
3057 uintptr_t next_conn;
3058 tbb_connection_v2* tbb_conn = NULL;
3059 omp_connection_v2* omp_conn = NULL;
3060 // Wait for worker threads to return
3061 if( is_tbb ) {
3062 tbb_conn = reinterpret_cast<tbb_connection_v2*>(curr_conn);
3063 next_conn = reinterpret_cast<uintptr_t>(tbb_conn->next_conn);
3064 while( tbb_conn->my_thread_map.get_server_ref_count()>1 )
3065 __TBB_Yield();
3066 } else {
3067 omp_conn = reinterpret_cast<omp_connection_v2*>(curr_conn);
3068 next_conn = reinterpret_cast<uintptr_t>(omp_conn->next_conn);
3069 while( omp_conn->my_thread_map.get_server_ref_count()>1 )
3070 __TBB_Yield();
3071 }
3072
3073 //someone else may try to write into this connection object.
3074 //So access next_conn field first before remove the extra server ref count.
3075
3076 if( next_conn==0 ) {
3077 uintptr_t tail = connections_to_reclaim.tail;
3078 if( tail==garbage_connection_queue::plugged ) {
3079 tail = garbage_connection_queue::plugged_acked; // connection scavenger saw the flag, and it freed all connections.
3080 done = true;
3081 } else if( tail==conn_ex ) {
3082 if( connections_to_reclaim.tail.compare_and_swap( garbage_connection_queue::empty, tail )==tail ) {
3083 __TBB_ASSERT( !connections_to_reclaim.head, NULL );
3084 done = true;
3085 }
3086 }
3087
3088 if( !done ) {
3089 // A new connection to close is added to connections_to_reclaim.tail;
3090 // Wait for curr_conn->next_conn to be set.
3091 if( is_tbb ) {
3092 while( !tbb_conn->next_conn )
3093 __TBB_Yield();
3094 conn_ex = reinterpret_cast<uintptr_t>(tbb_conn->next_conn);
3095 } else {
3096 while( !omp_conn->next_conn )
3097 __TBB_Yield();
3098 conn_ex = reinterpret_cast<uintptr_t>(omp_conn->next_conn);
3099 }
3100 }
3101 } else {
3102 conn_ex = next_conn;
3103 }
3104 __TBB_ASSERT( conn_ex, NULL );
3105 if( is_tbb )
3106 // remove extra server ref count; this will trigger Shutdown/Release of ConcRT RM
3107 tbb_conn->remove_server_ref();
3108 else
3109 // remove extra server ref count; this will trigger Shutdown/Release of ConcRT RM
3110 omp_conn->remove_server_ref();
3111 }
3112 }
3113
3114 __RML_DECL_THREAD_ROUTINE connection_scavenger_thread::thread_routine( void* arg ) {
3115 connection_scavenger_thread* thr = (connection_scavenger_thread*) arg;
3116 thr->state = ts_busy;
3117 thr->thr_handle = GetCurrentThread();
3118 #if TBB_USE_ASSERT
3119 ++thr->n_scavenger_threads;
3120 #endif
3121 for(;;) {
3122 __TBB_Yield();
3123 thr->sleep_perhaps();
3124 if( connections_to_reclaim.tail==garbage_connection_queue::plugged || connections_to_reclaim.tail==garbage_connection_queue::plugged_acked ) {
3125 thr->state = ts_asleep;
3126 return 0;
3127 }
3128
3129 __TBB_ASSERT( connections_to_reclaim.tail!=garbage_connection_queue::plugged_acked, NULL );
3130 __TBB_ASSERT( connections_to_reclaim.tail>garbage_connection_queue::plugged && (connections_to_reclaim.tail&garbage_connection_queue::plugged)==0 , NULL );
3131 while( connections_to_reclaim.head==garbage_connection_queue::empty )
3132 __TBB_Yield();
3133 uintptr_t head = connections_to_reclaim.head.fetch_and_store( garbage_connection_queue::empty );
3134 thr->process_requests( head );
3135 wakeup_some_tbb_threads();
3136 }
3137 }
3138
3139 template<typename Server, typename Client>
3140 void connection_scavenger_thread::add_request( generic_connection<Server,Client>* conn_to_close )
3141 {
3142 uintptr_t conn_ex = (uintptr_t)conn_to_close | (connection_traits<Server,Client>::is_tbb<<1);
3143 __TBB_ASSERT( !conn_to_close->next_conn, NULL );
3144 const uintptr_t old_tail_ex = connections_to_reclaim.tail.fetch_and_store(conn_ex);
3145 __TBB_ASSERT( old_tail_ex==0||old_tail_ex>garbage_connection_queue::plugged_acked, "Unloading DLL called while this connection is being closed?" );
3146
3147 if( old_tail_ex==garbage_connection_queue::empty )
3148 connections_to_reclaim.head = conn_ex;
3149 else {
3150 bool is_tbb = (old_tail_ex&2)>0;
3151 uintptr_t old_tail = old_tail_ex & ~(uintptr_t)3;
3152 if( is_tbb )
3153 reinterpret_cast<tbb_connection_v2*>(old_tail)->next_conn = reinterpret_cast<tbb_connection_v2*>(conn_ex);
3154 else
3155 reinterpret_cast<omp_connection_v2*>(old_tail)->next_conn = reinterpret_cast<omp_connection_v2*>(conn_ex);
3156 }
3157
3158 if( state==ts_asleep )
3159 wakeup();
3160 }
3161
3162 template<>
3163 uintptr_t connection_scavenger_thread::grab_and_prepend( generic_connection<tbb_server,tbb_client>* /*last_conn_to_close*/ ) { return 0;}
3164
3165 template<>
3166 uintptr_t connection_scavenger_thread::grab_and_prepend( generic_connection<omp_server,omp_client>* last_conn_to_close )
3167 {
3168 uintptr_t conn_ex = (uintptr_t)last_conn_to_close;
3169 uintptr_t head = connections_to_reclaim.head.fetch_and_store( garbage_connection_queue::empty );
3170 reinterpret_cast<omp_connection_v2*>(last_conn_to_close)->next_conn = reinterpret_cast<omp_connection_v2*>(head);
3171 return conn_ex;
3172 }
3173
3174 extern "C" ULONGLONG NTAPI VerSetConditionMask( ULONGLONG, DWORD, BYTE);
3175
3176 bool is_windows7_or_later ()
3177 {
3178 try {
3179 return GetOSVersion()>=IResourceManager::Win7OrLater;
3180 } catch( ... ) {
3181 return false;
3182 }
3183 }
3184
3185 #endif /* RML_USE_WCRM */
3186
3187 template<typename Connection, typename Server, typename Client>
3188 static factory::status_type connect( factory& f, Server*& server, Client& client ) {
3189 server = new Connection(*static_cast<wait_counter*>(f.scratch_ptr),client);
3190 return factory::st_success;
3191 }
3192
3193 void init_rml_module () {
3194 the_balance = the_default_concurrency = tbb::internal::AvailableHwConcurrency() - 1;
3195 #if RML_USE_WCRM
3196 connection_scavenger.launch();
3197 #endif
3198 }
3199
3200 extern "C" factory::status_type __RML_open_factory( factory& f, version_type& server_version, version_type client_version ) {
3201 // Hack to keep this library from being closed by causing the first client's dlopen to not have a corresponding dlclose.
3202 // This code will be removed once we figure out how to do shutdown of the RML perfectly.
3203 static tbb::atomic<bool> one_time_flag;
3204 if( one_time_flag.compare_and_swap(true,false)==false) {
3205 __TBB_ASSERT( (size_t)f.library_handle!=factory::c_dont_unload, NULL );
3206 #if _WIN32||_WIN64
3207 f.library_handle = reinterpret_cast<HMODULE>(factory::c_dont_unload);
3208 #else
3209 f.library_handle = reinterpret_cast<void*>(factory::c_dont_unload);
3210 #endif
3211 }
3212 // End of hack
3213
3214 // Initialize the_balance only once
3215 tbb::internal::atomic_do_once ( &init_rml_module, rml_module_state );
3216
3217 server_version = SERVER_VERSION;
3218 f.scratch_ptr = 0;
3219 if( client_version==0 ) {
3220 return factory::st_incompatible;
3221 #if RML_USE_WCRM
3222 } else if ( !is_windows7_or_later() ) {
3223 #if TBB_USE_DEBUG
3224 fprintf(stderr, "This version of the RML library requires Windows 7 to run on.\nConnection request denied.\n");
3225 #endif
3226 return factory::st_incompatible;
3227 #endif
3228 } else {
3229 #if TBB_USE_DEBUG
3230 if( client_version<EARLIEST_COMPATIBLE_CLIENT_VERSION )
3231 fprintf(stderr, "This client library is too old for the current RML server.\nThe connection request is granted but oversubscription/undersubscription may occur.\n");
3232 #endif
3233 f.scratch_ptr = new wait_counter;
3234 return factory::st_success;
3235 }
3236 }
3237
3238 extern "C" void __RML_close_factory( factory& f ) {
3239 if( wait_counter* fc = static_cast<wait_counter*>(f.scratch_ptr) ) {
3240 f.scratch_ptr = 0;
3241 fc->wait();
3242 size_t bal = the_balance;
3243 f.scratch_ptr = (void*)bal;
3244 delete fc;
3245 }
3246 }
3247
3248 void call_with_build_date_str( ::rml::server_info_callback_t cb, void* arg );
3249
3250 }} // rml::internal
3251
3252 namespace tbb {
3253 namespace internal {
3254 namespace rml {
3255
__TBB_make_rml_server(tbb_factory & f,tbb_server * & server,tbb_client & client)3256 extern "C" tbb_factory::status_type __TBB_make_rml_server( tbb_factory& f, tbb_server*& server, tbb_client& client ) {
3257 return ::rml::internal::connect< ::rml::internal::tbb_connection_v2>(f,server,client);
3258 }
3259
__TBB_call_with_my_server_info(::rml::server_info_callback_t cb,void * arg)3260 extern "C" void __TBB_call_with_my_server_info( ::rml::server_info_callback_t cb, void* arg ) {
3261 return ::rml::internal::call_with_build_date_str( cb, arg );
3262 }
3263
3264 }}}
3265
3266 namespace __kmp {
3267 namespace rml {
3268
__KMP_make_rml_server(omp_factory & f,omp_server * & server,omp_client & client)3269 extern "C" omp_factory::status_type __KMP_make_rml_server( omp_factory& f, omp_server*& server, omp_client& client ) {
3270 return ::rml::internal::connect< ::rml::internal::omp_connection_v2>(f,server,client);
3271 }
3272
__KMP_call_with_my_server_info(::rml::server_info_callback_t cb,void * arg)3273 extern "C" void __KMP_call_with_my_server_info( ::rml::server_info_callback_t cb, void* arg ) {
3274 return ::rml::internal::call_with_build_date_str( cb, arg );
3275 }
3276
3277 }}
3278
3279 /*
3280 * RML server info
3281 */
3282 #include "version_string.ver"
3283
3284 #ifndef __TBB_VERSION_STRINGS
3285 #pragma message("Warning: version_string.ver isn't generated properly by version_info.sh script!")
3286 #endif
3287
3288 // We use the build time as the RML server info. TBB is required to build RML, so we make it the same as the TBB build time.
3289 #ifndef __TBB_DATETIME
3290 #define __TBB_DATETIME __DATE__ " " __TIME__
3291 #endif
3292
3293 #if !RML_USE_WCRM
3294 #define RML_SERVER_BUILD_TIME "Intel(R) RML library built: " __TBB_DATETIME
3295 #define RML_SERVER_VERSION_ST "Intel(R) RML library version: v" TOSTRING(SERVER_VERSION)
3296 #else
3297 #define RML_SERVER_BUILD_TIME "Intel(R) RML library built: " __TBB_DATETIME
3298 #define RML_SERVER_VERSION_ST "Intel(R) RML library version: v" TOSTRING(SERVER_VERSION) " on ConcRT RM with " RML_THREAD_KIND_STRING
3299 #endif
3300
3301 namespace rml {
3302 namespace internal {
3303
call_with_build_date_str(::rml::server_info_callback_t cb,void * arg)3304 void call_with_build_date_str( ::rml::server_info_callback_t cb, void* arg )
3305 {
3306 (*cb)( arg, RML_SERVER_BUILD_TIME );
3307 (*cb)( arg, RML_SERVER_VERSION_ST );
3308 }
3309 }} // rml::internal
3310