1 /*
2     Copyright (c) 2005-2017 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 
16 
17 
18 
19 */
20 
21 #include "rml_tbb.h"
22 #define private public /* Sleazy trick to avoid publishing internal names in public header. */
23 #include "rml_omp.h"
24 #undef private
25 
26 #include "tbb/tbb_allocator.h"
27 #include "tbb/cache_aligned_allocator.h"
28 #include "tbb/aligned_space.h"
29 #include "tbb/atomic.h"
30 #include "tbb/spin_mutex.h"
31 #include "tbb/tbb_misc.h"           // Get AvailableHwConcurrency() from here.
32 #if _MSC_VER==1500 && !defined(__INTEL_COMPILER)
33 // VS2008/VC9 seems to have an issue;
34 #pragma warning( push )
35 #pragma warning( disable: 4985 )
36 #endif
37 #include "tbb/concurrent_vector.h"
38 #if _MSC_VER==1500 && !defined(__INTEL_COMPILER)
39 #pragma warning( pop )
40 #endif
41 #if _MSC_VER && defined(_Wp64)
42 // Workaround for overzealous compiler warnings
43 #pragma warning (push)
44 #pragma warning (disable: 4244)
45 #endif
46 
47 #include "job_automaton.h"
48 #include "wait_counter.h"
49 #include "thread_monitor.h"
50 
51 #if RML_USE_WCRM
52 #include <concrt.h>
53 #include <concrtrm.h>
54 using namespace Concurrency;
55 #include <vector>
56 #include <hash_map>
57 #define __RML_REMOVE_VIRTUAL_PROCESSORS_DISABLED 0
58 #endif /* RML_USE_WCRM */
59 
60 #define STRINGIFY(x) #x
61 #define TOSTRING(x) STRINGIFY(x)
62 
63 namespace rml {
64 namespace internal {
65 
66 using tbb::internal::rml::tbb_client;
67 using tbb::internal::rml::tbb_server;
68 
69 using __kmp::rml::omp_client;
70 using __kmp::rml::omp_server;
71 
72 typedef versioned_object::version_type version_type;
73 
74 #define SERVER_VERSION 2
75 #define EARLIEST_COMPATIBLE_CLIENT_VERSION 2
76 
77 static const size_t cache_line_size = tbb::internal::NFS_MaxLineSize;
78 
79 template<typename Server, typename Client> class generic_connection;
80 class tbb_connection_v2;
81 class omp_connection_v2;
82 
83 #if RML_USE_WCRM
84 //! State of a server_thread
85 /** Below are diagrams of legal state transitions.
86 
87                           ts_busy
88                           ^      ^
89                          /        \
90                         /          V
91     ts_done <----- ts_asleep <------> ts_idle
92 */
93 
94 enum thread_state_t {
95     ts_idle,
96     ts_asleep,
97     ts_busy,
98     ts_done
99 };
100 
101 //! Extra state of an omp server thread
102 enum thread_extra_state_t {
103     ts_none,
104     ts_removed,
105     ts_lent
106 };
107 
108 //! Results from try_grab_for()
109 enum thread_grab_t {
110     wk_failed,
111     wk_from_asleep,
112     wk_from_idle
113 };
114 
115 #else /* !RML_USE_WCRM */
116 
117 //! State of a server_thread
118 /** Below are diagrams of legal state transitions.
119 
120     OMP
121               ts_omp_busy
122               ^          ^
123              /            \
124             /              V
125     ts_asleep <-----------> ts_idle
126 
127 
128               ts_deactivated
129              ^            ^
130             /              \
131            V                \
132     ts_none  <--------------> ts_reactivated
133 
134     TBB
135               ts_tbb_busy
136               ^          ^
137              /            \
138             /              V
139     ts_asleep <-----------> ts_idle --> ts_done
140 
141     For TBB only. Extra state transition.
142 
143     ts_created -> ts_started -> ts_visited
144  */
145 enum thread_state_t {
146     //! Thread not doing anything useful, but running and looking for work.
147     ts_idle,
148     //! Thread not doing anything useful and is asleep */
149     ts_asleep,
150     //! Thread is enlisted into OpenMP team
151     ts_omp_busy,
152     //! Thread is busy doing TBB work.
153     ts_tbb_busy,
154     //! For tbb threads only
155     ts_done,
156     ts_created,
157     ts_started,
158     ts_visited,
159     //! For omp threads only
160     ts_none,
161     ts_deactivated,
162     ts_reactivated
163 };
164 #endif /* RML_USE_WCRM */
165 
166 #if TBB_USE_ASSERT
167 #define PRODUCE_ARG(x) ,x
168 #else
169 #define PRODUCE_ARG(x)
170 #endif /* TBB_USE_ASSERT */
171 
172 //! Synchronizes dispatch of OpenMP work.
173 class omp_dispatch_type {
174     typedef ::rml::job job_type;
175     omp_client* client;
176     void* cookie;
177     omp_client::size_type index;
178     tbb::atomic<job_type*> job;
179 #if TBB_USE_ASSERT
180     omp_connection_v2* server;
181 #endif /* TBB_USE_ASSERT */
182 public:
omp_dispatch_type()183     omp_dispatch_type() {job=NULL;}
184     void consume();
produce(omp_client & c,job_type * j,void * cookie_,omp_client::size_type index_ PRODUCE_ARG (omp_connection_v2 & s))185     void produce( omp_client& c, job_type* j, void* cookie_, omp_client::size_type index_ PRODUCE_ARG( omp_connection_v2& s )) {
186         __TBB_ASSERT( j, NULL );
187         __TBB_ASSERT( !job, "job already set" );
188         client = &c;
189 #if TBB_USE_ASSERT
190         server = &s;
191 #endif /* TBB_USE_ASSERT */
192         cookie = cookie_;
193         index = index_;
194         // Must be last
195         job = j;
196     }
197 };
198 
199 //! A reference count.
200 /** No default constructor, because users of ref_count must be very careful about whether the
201     initial reference count is 0 or 1. */
202 class ref_count: no_copy {
203     friend class thread_map;
204     tbb::atomic<int> my_ref_count;
205 public:
ref_count(int k)206     ref_count(int k ) {my_ref_count=k;}
~ref_count()207     ~ref_count() {__TBB_ASSERT( !my_ref_count, "premature destruction of refcounted object" );}
208     //! Add one and return new value.
add_ref()209     int add_ref() {
210         int k = ++my_ref_count;
211         __TBB_ASSERT(k>=1,"reference count underflowed before add_ref");
212         return k;
213     }
214     //! Subtract one and return new value.
remove_ref()215     int remove_ref() {
216         int k = --my_ref_count;
217         __TBB_ASSERT(k>=0,"reference count underflow");
218         return k;
219     }
220 };
221 
222 #if RML_USE_WCRM
223 
224 #if USE_UMS_THREAD
225 #define RML_THREAD_KIND UmsThreadDefault
226 #define RML_THREAD_KIND_STRING "UmsThread"
227 #else
228 #define RML_THREAD_KIND ThreadScheduler
229 #define RML_THREAD_KIND_STRING "WinThread"
230 #endif
231 
232 // Forward declaration
233 class thread_map;
234 
235 static const IExecutionResource* c_remove_prepare = (IExecutionResource*)0;
236 static const IExecutionResource* c_remove_returned = (IExecutionResource*)1;
237 
238 //! Server thread representation
239 class server_thread_rep : no_copy {
240     friend class thread_map;
241     friend class omp_connection_v2;
242     friend class server_thread;
243     friend class tbb_server_thread;
244     friend class omp_server_thread;
245     template<typename Connection> friend void make_job( Connection& c, typename Connection::server_thread_type& t );
246     typedef int thread_state_rep_t;
247 public:
248     //! Ctor
server_thread_rep(bool assigned,IScheduler * s,IExecutionResource * r,thread_map & map,rml::client & cl)249     server_thread_rep( bool assigned, IScheduler* s, IExecutionResource* r, thread_map& map, rml::client& cl ) :
250         uid( GetExecutionContextId() ), my_scheduler(s), my_proxy(NULL),
251         my_thread_map(map), my_client(cl), my_job(NULL)
252     {
253         my_state = assigned ? ts_busy : ts_idle;
254         my_extra_state = ts_none;
255         terminate = false;
256         my_execution_resource = r;
257     }
258     //! Dtor
~server_thread_rep()259     ~server_thread_rep() {}
260 
261     //! Synchronization routine
wait_for_job()262     inline rml::job* wait_for_job() {
263         if( !my_job ) my_job = my_job_automaton.wait_for_job();
264         return my_job;
265     }
266 
267     // Getters and setters
read_state() const268     inline thread_state_t read_state() const { thread_state_rep_t s = my_state; return static_cast<thread_state_t>(s); }
set_state(thread_state_t to)269     inline void set_state( thread_state_t to ) {my_state = to;}
set_removed()270     inline void set_removed() { __TBB_ASSERT( my_extra_state==ts_none, NULL ); my_extra_state = ts_removed; }
is_removed() const271     inline bool is_removed() const { return my_extra_state==ts_removed; }
is_lent() const272     inline bool is_lent() const {return my_extra_state==ts_lent;}
set_lent()273     inline void set_lent() { my_extra_state=ts_lent; }
set_returned()274     inline void set_returned() { my_extra_state=ts_none; }
get_execution_resource()275     inline IExecutionResource* get_execution_resource() { return my_execution_resource; }
get_virtual_processor()276     inline IVirtualProcessorRoot* get_virtual_processor() { return (IVirtualProcessorRoot*)get_execution_resource(); }
277 
278     //! Enlist the thread for work
wakeup(thread_state_t to,thread_state_t from)279     inline bool wakeup( thread_state_t to, thread_state_t from ) {
280         __TBB_ASSERT( from==ts_asleep && (to==ts_idle||to==ts_busy||to==ts_done), NULL );
281         return my_state.compare_and_swap( to, from )==from;
282     }
283 
284     //! Enlist the thread for.
285     thread_grab_t try_grab_for();
286 
287     //! Destroy the client job associated with the thread
288     template<typename Connection> bool destroy_job( Connection* c );
289 
290     //! Try to re-use the thread
revive(IScheduler * s,IExecutionResource * r,rml::client & c)291     void revive( IScheduler* s, IExecutionResource* r, rml::client& c ) {
292         // the variables may not have been set before a thread was told to quit
293         __TBB_ASSERT( my_scheduler==s, "my_scheduler has been altered?\n" );
294         my_scheduler = s;
295         __TBB_ASSERT( &my_client==&c, "my_client has been altered?\n" );
296         if( r ) my_execution_resource = r;
297         my_client = c;
298         my_state = ts_idle;
299         __TBB_ASSERT( my_extra_state==ts_removed, NULL );
300         my_extra_state = ts_none;
301     }
302 
303 protected:
304     const int uid;
305     IScheduler* my_scheduler;
306     IThreadProxy* my_proxy;
307     tbb::atomic<IExecutionResource*> my_execution_resource; /* for non-masters, it is IVirtualProcessorRoot */
308     thread_map& my_thread_map;
309     rml::client& my_client;
310     job* my_job;
311     job_automaton my_job_automaton;
312     tbb::atomic<bool> terminate;
313     tbb::atomic<thread_state_rep_t> my_state;
314     tbb::atomic<thread_extra_state_t> my_extra_state;
315 };
316 
317 //! Class that implements IExecutionContext
318 class server_thread : public IExecutionContext, public server_thread_rep {
319     friend class tbb_connection_v2;
320     friend class omp_connection_v2;
321     friend class tbb_server_thread;
322     friend class omp_server_thread;
323     friend class thread_map;
324     template<typename Connection> friend void make_job( Connection& c, typename Connection::server_thread_type& t );
325 protected:
server_thread(bool is_tbb,bool assigned,IScheduler * s,IExecutionResource * r,thread_map & map,rml::client & cl)326     server_thread( bool is_tbb, bool assigned, IScheduler* s, IExecutionResource* r, thread_map& map, rml::client& cl ) : server_thread_rep(assigned,s,r,map,cl), tbb_thread(is_tbb) {}
~server_thread()327     ~server_thread() {}
GetId() const328     unsigned int GetId() const __TBB_override { return uid; }
GetScheduler()329     IScheduler* GetScheduler() __TBB_override { return my_scheduler; }
GetProxy()330     IThreadProxy* GetProxy()   __TBB_override { return my_proxy; }
SetProxy(IThreadProxy * thr_proxy)331     void SetProxy( IThreadProxy* thr_proxy ) __TBB_override { my_proxy = thr_proxy; }
332 
333 private:
334     bool tbb_thread;
335 };
336 
337 // Forward declaration
338 class tbb_connection_v2;
339 class omp_connection_v2;
340 
341 //! TBB server thread
342 class tbb_server_thread : public server_thread {
343     friend class tbb_connection_v2;
344 public:
tbb_server_thread(bool assigned,IScheduler * s,IExecutionResource * r,tbb_connection_v2 * con,thread_map & map,rml::client & cl)345     tbb_server_thread( bool assigned, IScheduler* s, IExecutionResource* r, tbb_connection_v2* con, thread_map& map, rml::client& cl ) : server_thread(true,assigned,s,r,map,cl), my_conn(con) {
346         activation_count = 0;
347     }
~tbb_server_thread()348     ~tbb_server_thread() {}
349     void Dispatch( DispatchState* ) __TBB_override;
350     inline bool initiate_termination();
351     bool sleep_perhaps();
352     //! Switch out this thread
353     bool switch_out();
354 private:
355     tbb_connection_v2* my_conn;
356 public:
357     tbb::atomic<int> activation_count;
358 };
359 
360 //! OMP server thread
361 class omp_server_thread : public server_thread {
362     friend class omp_connection_v2;
363 public:
omp_server_thread(bool assigned,IScheduler * s,IExecutionResource * r,omp_connection_v2 * con,thread_map & map,rml::client & cl)364     omp_server_thread( bool assigned, IScheduler* s, IExecutionResource* r, omp_connection_v2* con, thread_map& map, rml::client& cl ) :
365         server_thread(false,assigned,s,r,map,cl), my_conn(con), my_cookie(NULL), my_index(UINT_MAX) {}
~omp_server_thread()366     ~omp_server_thread() {}
367     void Dispatch( DispatchState* ) __TBB_override;
get_cookie()368     inline void* get_cookie() {return my_cookie;}
get_index()369     inline ::__kmp::rml::omp_client::size_type get_index() {return my_index;}
370 
get_execution_resource()371     inline IExecutionResource* get_execution_resource() { return get_execution_resource(); }
initiate_termination()372     inline bool initiate_termination() { return destroy_job( (omp_connection_v2*) my_conn ); }
373     void sleep_perhaps();
374 private:
375     omp_connection_v2* my_conn;
376     void* my_cookie;
377     ::__kmp::rml::omp_client::size_type my_index;
378     omp_dispatch_type omp_data;
379 };
380 
381 //! Class that implements IScheduler
382 template<typename Connection>
383 class scheduler : no_copy, public IScheduler {
384 public:
GetId() const385     unsigned int GetId() const __TBB_override {return uid;}
Statistics(unsigned int *,unsigned int *,unsigned int *)386     void Statistics( unsigned int* /*pTaskCompletionRate*/, unsigned int* /*pTaskArrivalRate*/, unsigned int* /*pNumberOfTaskEnqueued*/) __TBB_override {}
GetPolicy() const387     SchedulerPolicy GetPolicy() const __TBB_override { __TBB_ASSERT(my_policy,NULL); return *my_policy; }
AddVirtualProcessors(IVirtualProcessorRoot ** vproots,unsigned int count)388     void AddVirtualProcessors( IVirtualProcessorRoot** vproots, unsigned int count ) __TBB_override { if( !my_conn.is_closing() ) my_conn.add_virtual_processors( vproots, count); }
389     void RemoveVirtualProcessors( IVirtualProcessorRoot** vproots, unsigned int count ) __TBB_override;
NotifyResourcesExternallyIdle(IVirtualProcessorRoot ** vproots,unsigned int count)390     void NotifyResourcesExternallyIdle( IVirtualProcessorRoot** vproots, unsigned int count ) __TBB_override { __TBB_ASSERT( false, "This call is not allowed for TBB" ); }
NotifyResourcesExternallyBusy(IVirtualProcessorRoot ** vproots,unsigned int count)391     void NotifyResourcesExternallyBusy( IVirtualProcessorRoot** vproots, unsigned int count ) __TBB_override { __TBB_ASSERT( false, "This call is not allowed for TBB" ); }
392 protected:
393     scheduler( Connection& conn );
~scheduler()394     virtual ~scheduler() { __TBB_ASSERT( my_policy, NULL ); delete my_policy; }
395 
396 public:
create(Connection & conn)397     static scheduler* create( Connection& conn ) {return new scheduler( conn );}
398 
399 private:
400     const int uid;
401     Connection& my_conn;
402     SchedulerPolicy* my_policy;
403 };
404 
405 
406 /*
407  * --> ts_busy --> ts_done
408  */
409 class thread_scavenger_thread : public IExecutionContext, no_copy {
410 public:
thread_scavenger_thread(IScheduler * s,IVirtualProcessorRoot * r,thread_map & map)411     thread_scavenger_thread( IScheduler* s, IVirtualProcessorRoot* r, thread_map& map ) :
412         uid( GetExecutionContextId() ), my_scheduler(s), my_virtual_processor_root(r), my_proxy(NULL), my_thread_map(map)
413     {
414         my_state = ts_busy;
415 #if TBB_USE_ASSERT
416         activation_count = 0;
417 #endif
418     }
~thread_scavenger_thread()419     ~thread_scavenger_thread() {}
GetId() const420     unsigned int GetId() const __TBB_override { return uid; }
GetScheduler()421     IScheduler* GetScheduler() __TBB_override { return my_scheduler; }
GetProxy()422     IThreadProxy* GetProxy()   __TBB_override { return my_proxy; }
SetProxy(IThreadProxy * thr_proxy)423     void SetProxy( IThreadProxy* thr_proxy ) __TBB_override { my_proxy = thr_proxy; }
424     void Dispatch( DispatchState* ) __TBB_override;
read_state()425     inline thread_state_t read_state() { return my_state; }
set_state(thread_state_t s)426     inline void set_state( thread_state_t s ) { my_state = s; }
get_virtual_processor()427     inline IVirtualProcessorRoot* get_virtual_processor() { return my_virtual_processor_root; }
428 private:
429     const int uid;
430     IScheduler* my_scheduler;
431     IVirtualProcessorRoot* my_virtual_processor_root;
432     IThreadProxy* my_proxy;
433     thread_map& my_thread_map;
434     tbb::atomic<thread_state_t> my_state;
435 #if TBB_USE_ASSERT
436 public:
437     tbb::atomic<int> activation_count;
438 #endif
439 };
440 
441 static const thread_scavenger_thread* c_claimed = reinterpret_cast<thread_scavenger_thread*>(1);
442 
443 struct garbage_connection_queue {
444     tbb::atomic<uintptr_t> head;
445     tbb::atomic<uintptr_t> tail;
446     static const uintptr_t empty = 0; // connection scavenger thread empty list
447     static const uintptr_t plugged = 1;  // end of use of the list
448     static const uintptr_t plugged_acked = 2;  // connection scavenger saw the plugged flag, and it freed all connections
449 };
450 
451 //! Connection scavenger
452 /** It collects closed connection objects, wait for worker threads belonging to the connection to return to ConcRT RM
453  *  then return the object to the memory manager.
454  */
455 class connection_scavenger_thread {
456     friend void assist_cleanup_connections();
457     /*
458      * connection_scavenger_thread's state
459      * ts_busy <----> ts_asleep <--
460      */
461     tbb::atomic<thread_state_t> state;
462 
463     /* We steal two bits from a connection pointer to encode
464      * whether the connection is for TBB or for OMP.
465      *
466      * ----------------------------------
467      * |                          |  |  |
468      * ----------------------------------
469      *                              ^  ^
470      *                             /   |
471      *            1 : tbb, 0 : omp     |
472      *                  if set, terminate
473      */
474     // FIXME: pad these?
475     thread_monitor monitor;
476     HANDLE thr_handle;
477 #if TBB_USE_ASSERT
478     tbb::atomic<int> n_scavenger_threads;
479 #endif
480 
481 public:
connection_scavenger_thread()482     connection_scavenger_thread() : thr_handle(NULL) {
483         state = ts_asleep;
484 #if TBB_USE_ASSERT
485         n_scavenger_threads = 0;
486 #endif
487     }
488 
~connection_scavenger_thread()489     ~connection_scavenger_thread() {}
490 
wakeup()491     void wakeup() {
492         if( state.compare_and_swap( ts_busy, ts_asleep )==ts_asleep )
493             monitor.notify();
494     }
495 
496     void sleep_perhaps();
497 
498     void process_requests( uintptr_t conn_ex );
499 
500     static __RML_DECL_THREAD_ROUTINE thread_routine( void* arg );
501 
launch()502     void launch() {
503         thread_monitor::launch( connection_scavenger_thread::thread_routine, this, NULL );
504     }
505 
506     template<typename Server, typename Client>
507     void add_request( generic_connection<Server,Client>* conn_to_close );
508 
509     template<typename Server, typename Client>
510     uintptr_t grab_and_prepend( generic_connection<Server,Client>* last_conn_to_close );
511 };
512 
513 void free_all_connections( uintptr_t );
514 
515 #endif /* RML_USE_WCRM */
516 
517 #if !RML_USE_WCRM
518 class server_thread;
519 
520 //! thread_map_base; we need to make the iterator type available to server_thread
521 struct thread_map_base {
522     //! A value in the map
523     class value_type {
524     public:
thread()525         server_thread& thread() {
526             __TBB_ASSERT( my_thread, "thread_map::value_type::thread() called when !my_thread" );
527             return *my_thread;
528         }
job()529         rml::job& job() {
530             __TBB_ASSERT( my_job, "thread_map::value_type::job() called when !my_job" );
531             return *my_job;
532         }
value_type()533         value_type() : my_thread(NULL), my_job(NULL) {}
wait_for_thread() const534         server_thread& wait_for_thread() const {
535             for(;;) {
536                 server_thread* ptr=const_cast<server_thread*volatile&>(my_thread);
537                 if( ptr )
538                     return *ptr;
539                 __TBB_Yield();
540             }
541         }
542         /** Shortly after when a connection is established, it is possible for the server
543             to grab a server_thread that has not yet created a job object for that server. */
wait_for_job() const544         rml::job* wait_for_job() const {
545             if( !my_job ) {
546                 my_job = my_automaton.wait_for_job();
547             }
548             return my_job;
549         }
550     private:
551         server_thread* my_thread;
552         /** Marked mutable because though it is physically modified, conceptually it is a duplicate of
553             the job held by job_automaton. */
554         mutable rml::job* my_job;
555         job_automaton my_automaton;
556         // FIXME - pad out to cache line, because my_automaton is hit hard by thread()
557         friend class thread_map;
558     };
559     typedef tbb::concurrent_vector<value_type,tbb::zero_allocator<value_type,tbb::cache_aligned_allocator> > array_type;
560 };
561 #endif /* !RML_USE_WCRM */
562 
563 #if _MSC_VER && !defined(__INTEL_COMPILER)
564     // Suppress overzealous compiler warnings about uninstantiable class
565     #pragma warning(push)
566     #pragma warning(disable:4510 4610)
567 #endif
568 
569 template<typename T>
570 class padded: public T {
571     char pad[cache_line_size - sizeof(T)%cache_line_size];
572 };
573 
574 #if _MSC_VER && !defined(__INTEL_COMPILER)
575     #pragma warning(pop)
576 #endif
577 
578 // FIXME - should we pad out memory to avoid false sharing of our global variables?
579 static unsigned the_default_concurrency;
580 static tbb::atomic<int> the_balance;
581 static tbb::atomic<tbb::internal::do_once_state> rml_module_state;
582 
583 #if !RML_USE_WCRM
584 //! Per thread information
585 /** ref_count holds number of clients that are using this,
586     plus 1 if a host thread owns this instance. */
587 class server_thread: public ref_count {
588     friend class thread_map;
589     template<typename Server, typename Client> friend class generic_connection;
590     friend class tbb_connection_v2;
591     friend class omp_connection_v2;
592     //! Integral type that can hold a thread_state_t
593     typedef int thread_state_rep_t;
594     tbb::atomic<thread_state_rep_t> state;
595 public:
596     thread_monitor monitor;
597 private:
598     bool    is_omp_thread;
599     tbb::atomic<thread_state_rep_t> my_extra_state;
600     server_thread* link;
601     thread_map_base::array_type::iterator my_map_pos;
602     rml::server *my_conn;
603     rml::job* my_job;
604     job_automaton* my_ja;
605     size_t my_index;
606     tbb::atomic<bool> terminate;
607     omp_dispatch_type omp_dispatch;
608 
609 #if TBB_USE_ASSERT
610     //! Flag used to check if thread is still using *this.
611     bool has_active_thread;
612 #endif /* TBB_USE_ASSERT */
613 
614     //! Volunteer to sleep.
615     void sleep_perhaps( thread_state_t asleep );
616 
617     //! Destroy job corresponding to given client
618     /** Return true if thread must quit. */
619     template<typename Connection>
620     bool destroy_job( Connection& c );
621 
622     //! Do terminate the thread
623     /** Return true if thread must quit. */
624     bool do_termination();
625 
626     void loop();
627     static __RML_DECL_THREAD_ROUTINE thread_routine( void* arg );
628 
629 public:
630     server_thread();
631 
632     ~server_thread();
633 
634     //! Read the thread state
read_state() const635     thread_state_t read_state() const {
636         thread_state_rep_t s = state;
637         __TBB_ASSERT( unsigned(s)<=unsigned(ts_done), "corrupted server thread?" );
638         return thread_state_t(s);
639     }
640 
641     //! Read the tbb-specific extra thread state
read_extra_state() const642     thread_state_t read_extra_state() const {
643         thread_state_rep_t s = my_extra_state;
644         return thread_state_t(s);
645     }
646 
647     //! Launch a thread that is bound to *this.
648     void launch( size_t stack_size );
649 
650     //! Attempt to wakeup a thread
651     /** The value "to" is the new state for the thread, if it was woken up.
652         Returns true if thread was woken up, false otherwise. */
653     bool wakeup( thread_state_t to, thread_state_t from );
654 
655     //! Attempt to enslave a thread for OpenMP/TBB.
656     /** Returns true if state is successfully changed.  's' takes either ts_omp_busy or ts_tbb_busy */
657     bool try_grab_for( thread_state_t s );
658 
659 #if _WIN32||_WIN64
660     //! Send the worker thread to sleep temporarily
661     void deactivate();
662 
663     //! Wake the worker thread up
664     void reactivate();
665 #endif /* _WIN32||_WIN64 */
666 };
667 
668 //! Bag of threads that are private to a client.
669 class private_thread_bag {
670     struct list_thread: server_thread {
671        list_thread* next;
672     };
673     //! Root of atomic linked list of list_thread
674     /** ABA problem is avoided because items are only atomically pushed, never popped. */
675     tbb::atomic<list_thread*> my_root;
676     tbb::cache_aligned_allocator<padded<list_thread> > my_allocator;
677 public:
678     //! Construct empty bag
private_thread_bag()679     private_thread_bag() {my_root=NULL;}
680 
681     //! Create a fresh server_thread object.
add_one_thread()682     server_thread& add_one_thread() {
683         list_thread* t = my_allocator.allocate(1);
684         new( t ) list_thread;
685         // Atomically add to list
686         list_thread* old_root;
687         do {
688             old_root = my_root;
689             t->next = old_root;
690         } while( my_root.compare_and_swap( t, old_root )!=old_root );
691         return *t;
692     }
693 
694     //! Destroy the bag and threads in it.
~private_thread_bag()695     ~private_thread_bag() {
696         while( my_root ) {
697             // Unlink thread from list.
698             list_thread* t = my_root;
699             my_root = t->next;
700             // Destroy and deallocate the thread.
701             t->~list_thread();
702             my_allocator.deallocate(static_cast<padded<list_thread>*>(t),1);
703         }
704     }
705 };
706 
707 //! Forward declaration
708 void wakeup_some_tbb_threads();
709 
710 //! Type-independent part of class generic_connection.
711 /** One to one map from server threads to jobs, and associated reference counting. */
712 class thread_map : public thread_map_base {
713 public:
714     typedef rml::client::size_type size_type;
715     //! ctor
thread_map(wait_counter & fc,::rml::client & client)716     thread_map( wait_counter& fc, ::rml::client& client ) :
717         all_visited_at_least_once(false), my_min_stack_size(0), my_server_ref_count(1),
718         my_client_ref_count(1), my_client(client), my_factory_counter(fc)
719     { my_unrealized_threads = 0; }
720     //! dtor
~thread_map()721     ~thread_map() {}
722     typedef array_type::iterator iterator;
begin()723     iterator begin() {return my_array.begin();}
end()724     iterator end() {return my_array.end();}
725     void bind();
726     void unbind();
727     void assist_cleanup( bool assist_null_only );
728 
729     /** Returns number of unrealized threads to create. */
730     size_type wakeup_tbb_threads( size_type n );
731     bool wakeup_next_thread( iterator i, tbb_connection_v2& conn );
732     void release_tbb_threads( server_thread* t );
733     void adjust_balance( int delta );
734 
735     //! Add a server_thread object to the map, but do not bind it.
736     /** Return NULL if out of unrealized threads. */
737     value_type* add_one_thread( bool is_omp_thread_ );
738 
739     void bind_one_thread( rml::server& server, value_type& x );
740 
741     void remove_client_ref();
add_server_ref()742     int add_server_ref() {return my_server_ref_count.add_ref();}
remove_server_ref()743     int remove_server_ref() {return my_server_ref_count.remove_ref();}
744 
client() const745     ::rml::client& client() const {return my_client;}
746 
get_unrealized_threads()747     size_type get_unrealized_threads() { return my_unrealized_threads; }
748 
749 private:
750     private_thread_bag my_private_threads;
751     bool all_visited_at_least_once;
752     array_type my_array;
753     size_t my_min_stack_size;
754     tbb::atomic<size_type> my_unrealized_threads;
755 
756     //! Number of threads referencing *this, plus one extra.
757     /** When it becomes zero, the containing server object can be safely deleted. */
758     ref_count my_server_ref_count;
759 
760     //! Number of jobs that need cleanup, plus one extra.
761     /** When it becomes zero, acknowledge_close_connection is called. */
762     ref_count my_client_ref_count;
763 
764     ::rml::client& my_client;
765     //! Counter owned by factory that produced this thread_map.
766     wait_counter& my_factory_counter;
767 };
768 
bind_one_thread(rml::server & server,value_type & x)769 void thread_map::bind_one_thread( rml::server& server, value_type& x ) {
770     // Add one to account for the thread referencing this map hereforth.
771     server_thread& t = x.thread();
772     my_server_ref_count.add_ref();
773     my_client_ref_count.add_ref();
774 #if TBB_USE_ASSERT
775     __TBB_ASSERT( t.add_ref()==1, NULL );
776 #else
777     t.add_ref();
778 #endif
779     // Have responsibility to start the thread.
780     t.my_conn = &server;
781     t.my_ja = &x.my_automaton;
782     t.launch( my_min_stack_size );
783     /* Must wake thread up so it can fill in its "my_job" field in *this.
784        Otherwise deadlock can occur where wait_for_job spins on thread that is sleeping. */
785     __TBB_ASSERT( t.state!=ts_tbb_busy, NULL );
786     t.wakeup( ts_idle, ts_asleep );
787 }
788 
add_one_thread(bool is_omp_thread_)789 thread_map::value_type* thread_map::add_one_thread( bool is_omp_thread_ ) {
790     size_type u;
791     do {
792         u = my_unrealized_threads;
793         if( !u ) return NULL;
794     } while( my_unrealized_threads.compare_and_swap(u-1,u)!=u );
795     server_thread& t = my_private_threads.add_one_thread();
796     t.is_omp_thread = is_omp_thread_;
797     __TBB_ASSERT( u>=1, NULL );
798     t.my_index = u - 1;
799     __TBB_ASSERT( t.state!=ts_tbb_busy, NULL );
800     t.my_extra_state = t.is_omp_thread ? ts_none : ts_created;
801 
802     iterator i = t.my_map_pos = my_array.grow_by(1);
803     value_type& v = *i;
804     v.my_thread = &t;
805     return &v;
806 }
807 
bind()808 void thread_map::bind() {
809     ++my_factory_counter;
810     my_min_stack_size = my_client.min_stack_size();
811     __TBB_ASSERT( my_unrealized_threads==0, "already called bind?" );
812     my_unrealized_threads = my_client.max_job_count();
813 }
814 
unbind()815 void thread_map::unbind() {
816     // Ask each server_thread to cleanup its job for this server.
817     for( iterator i=begin(); i!=end(); ++i ) {
818         server_thread& t = i->thread();
819         t.terminate = true;
820         t.wakeup( ts_idle, ts_asleep );
821     }
822     // Remove extra ref to client.
823     remove_client_ref();
824 }
825 
assist_cleanup(bool assist_null_only)826 void thread_map::assist_cleanup( bool assist_null_only ) {
827     // To avoid deadlock, the current thread *must* help out with cleanups that have not started,
828     // because the thread that created the job may be busy for a long time.
829     for( iterator i = begin(); i!=end(); ++i ) {
830         rml::job* j=0;
831         job_automaton& ja = i->my_automaton;
832         if( assist_null_only ? ja.try_plug_null() : ja.try_plug(j) ) {
833             if( j ) {
834                 my_client.cleanup(*j);
835             } else {
836                 // server thread did not get a chance to create a job.
837             }
838             remove_client_ref();
839         }
840     }
841 }
842 
wakeup_tbb_threads(size_type n)843 thread_map::size_type thread_map::wakeup_tbb_threads( size_type n ) {
844     __TBB_ASSERT(n>0,"must specify positive number of threads to wake up");
845     iterator e = end();
846     for( iterator k=begin(); k!=e; ++k ) {
847         // If another thread added *k, there is a tiny timing window where thread() is invalid.
848         server_thread& t = k->wait_for_thread();
849         thread_state_t thr_s = t.read_state();
850         if( t.read_extra_state()==ts_created || thr_s==ts_tbb_busy || thr_s==ts_done )
851             continue;
852         if( --the_balance>=0 ) { // try to withdraw a coin from the deposit
853             while( !t.try_grab_for( ts_tbb_busy ) ) {
854                 thr_s = t.read_state();
855                 if( thr_s==ts_tbb_busy || thr_s==ts_done ) {
856                     // we lost; move on to the next.
857                     ++the_balance;
858                     goto skip;
859                 }
860             }
861             if( --n==0 )
862                 return 0;
863         } else {
864             // overdraft.
865             ++the_balance;
866             break;
867         }
868 skip:
869         ;
870     }
871     return n<my_unrealized_threads ? n : size_type(my_unrealized_threads);
872 }
873 #else /* RML_USE_WCRM */
874 
875 class thread_map : no_copy {
876     friend class omp_connection_v2;
877     typedef ::std::hash_map<uintptr_t,server_thread*> hash_map_type;
878     size_t my_min_stack_size;
879     size_t my_unrealized_threads;
880     ::rml::client& my_client;
881     //! Counter owned by factory that produced this thread_map.
882     wait_counter& my_factory_counter;
883     //! Ref counters
884     ref_count my_server_ref_count;
885     ref_count my_client_ref_count;
886     // FIXME: pad this?
887     hash_map_type my_map;
888     bool shutdown_in_progress;
889     std::vector<IExecutionResource*> original_exec_resources;
890     tbb::cache_aligned_allocator<padded<tbb_server_thread> > my_tbb_allocator;
891     tbb::cache_aligned_allocator<padded<omp_server_thread> > my_omp_allocator;
892     tbb::cache_aligned_allocator<padded<thread_scavenger_thread> > my_scavenger_allocator;
893     IResourceManager* my_concrt_resource_manager;
894     IScheduler* my_scheduler;
895     ISchedulerProxy* my_scheduler_proxy;
896     tbb::atomic<thread_scavenger_thread*> my_thread_scavenger_thread;
897 #if TBB_USE_ASSERT
898     tbb::atomic<int> n_add_vp_requests;
899     tbb::atomic<int> n_thread_scavengers_created;
900 #endif
901 public:
thread_map(wait_counter & fc,::rml::client & client)902     thread_map( wait_counter& fc, ::rml::client& client ) :
903         my_min_stack_size(0), my_client(client), my_factory_counter(fc),
904         my_server_ref_count(1), my_client_ref_count(1), shutdown_in_progress(false),
905         my_concrt_resource_manager(NULL), my_scheduler(NULL), my_scheduler_proxy(NULL)
906     {
907         my_thread_scavenger_thread = NULL;
908 #if TBB_USE_ASSERT
909         n_add_vp_requests = 0;
910         n_thread_scavengers_created;
911 #endif
912     }
913 
~thread_map()914     ~thread_map() {
915         __TBB_ASSERT( n_thread_scavengers_created<=1, "too many scavenger thread created" );
916         // if thread_scavenger_thread is launched, wait for it to complete
917         if( my_thread_scavenger_thread ) {
918             __TBB_ASSERT( my_thread_scavenger_thread!=c_claimed, NULL );
919             while( my_thread_scavenger_thread->read_state()==ts_busy )
920                 __TBB_Yield();
921             thread_scavenger_thread* tst = my_thread_scavenger_thread;
922             my_scavenger_allocator.deallocate(static_cast<padded<thread_scavenger_thread>*>(tst),1);
923         }
924         // deallocate thread contexts
925         for( hash_map_type::const_iterator hi=my_map.begin(); hi!=my_map.end(); ++hi ) {
926             server_thread* thr = hi->second;
927             if( thr->tbb_thread ) {
928                 while( ((tbb_server_thread*)thr)->activation_count>1 )
929                     __TBB_Yield();
930                 ((tbb_server_thread*)thr)->~tbb_server_thread();
931                 my_tbb_allocator.deallocate(static_cast<padded<tbb_server_thread>*>(thr),1);
932             } else {
933                 ((omp_server_thread*)thr)->~omp_server_thread();
934                 my_omp_allocator.deallocate(static_cast<padded<omp_server_thread>*>(thr),1);
935             }
936         }
937         if( my_scheduler_proxy ) {
938             my_scheduler_proxy->Shutdown();
939             my_concrt_resource_manager->Release();
940             __TBB_ASSERT( my_scheduler, NULL );
941             delete my_scheduler;
942         } else {
943             __TBB_ASSERT( !my_scheduler, NULL );
944         }
945     }
946     typedef hash_map_type::key_type key_type;
947     typedef hash_map_type::value_type value_type;
948     typedef hash_map_type::iterator iterator;
begin()949     iterator begin() {return my_map.begin();}
end()950     iterator end() {return my_map.end();}
find(key_type k)951     iterator find( key_type k ) {return my_map.find( k );}
insert(key_type k,server_thread * v)952     iterator insert( key_type k, server_thread* v ) {
953         std::pair<iterator,bool> res = my_map.insert( value_type(k,v) );
954         return res.first;
955     }
bind(IScheduler * s)956     void bind( IScheduler* s ) {
957         ++my_factory_counter;
958         if( s ) {
959             my_unrealized_threads = s->GetPolicy().GetPolicyValue( MaxConcurrency );
960             __TBB_ASSERT( my_unrealized_threads>0, NULL );
961             my_scheduler = s;
962             my_concrt_resource_manager = CreateResourceManager(); // reference count==3 when first created.
963             my_scheduler_proxy = my_concrt_resource_manager->RegisterScheduler( s, CONCRT_RM_VERSION_1 );
964             my_scheduler_proxy->RequestInitialVirtualProcessors( false );
965         }
966     }
is_closing()967     bool is_closing() { return shutdown_in_progress; }
968     void unbind( rml::server& server, ::tbb::spin_mutex& mtx );
add_client_ref()969     void add_client_ref() { my_server_ref_count.add_ref(); }
970     void remove_client_ref();
add_server_ref()971     void add_server_ref() {my_server_ref_count.add_ref();}
remove_server_ref()972     int remove_server_ref() {return my_server_ref_count.remove_ref();}
get_server_ref_count()973     int get_server_ref_count() { int k = my_server_ref_count.my_ref_count; return k; }
974     void assist_cleanup( bool assist_null_only );
975     void adjust_balance( int delta );
current_balance() const976     int current_balance() const {int k = the_balance; return k;}
client() const977     ::rml::client& client() const {return my_client;}
register_as_master(server::execution_resource_t & v) const978     void register_as_master( server::execution_resource_t& v ) const { (IExecutionResource*&)v = my_scheduler_proxy ? my_scheduler_proxy->SubscribeCurrentThread() : NULL; }
979     // Remove() should be called from the same thread that subscribed the current h/w thread (i.e., the one that
980     // called register_as_master() ).
unregister(server::execution_resource_t v) const981     void unregister( server::execution_resource_t v ) const {if( v ) ((IExecutionResource*)v)->Remove( my_scheduler );}
982     void add_virtual_processors( IVirtualProcessorRoot** vprocs, unsigned int count, tbb_connection_v2& conn, ::tbb::spin_mutex& mtx );
983     void add_virtual_processors( IVirtualProcessorRoot** vprocs, unsigned int count, omp_connection_v2& conn, ::tbb::spin_mutex& mtx );
984     void remove_virtual_processors( IVirtualProcessorRoot** vproots, unsigned count, ::tbb::spin_mutex& mtx );
985     void mark_virtual_processors_as_lent( IVirtualProcessorRoot** vproots, unsigned count, ::tbb::spin_mutex& mtx );
986     void create_oversubscribers( unsigned n, std::vector<server_thread*>& thr_vec, omp_connection_v2& conn, ::tbb::spin_mutex& mtx );
987     void wakeup_tbb_threads( int c, ::tbb::spin_mutex& mtx );
988     void mark_virtual_processors_as_returned( IVirtualProcessorRoot** vprocs, unsigned int count, tbb::spin_mutex& mtx );
addto_original_exec_resources(IExecutionResource * r,::tbb::spin_mutex & mtx)989     inline void addto_original_exec_resources( IExecutionResource* r, ::tbb::spin_mutex& mtx ) {
990         ::tbb::spin_mutex::scoped_lock lck(mtx);
991         __TBB_ASSERT( !is_closing(), "trying to register master while connection is being shutdown?" );
992         original_exec_resources.push_back( r );
993     }
994 #if !__RML_REMOVE_VIRTUAL_PROCESSORS_DISABLED
995     void allocate_thread_scavenger( IExecutionResource* v );
996 #endif
get_thread_scavenger()997     inline thread_scavenger_thread* get_thread_scavenger() { return my_thread_scavenger_thread; }
998 };
999 
1000 garbage_connection_queue connections_to_reclaim;
1001 connection_scavenger_thread connection_scavenger;
1002 
1003 #endif /* !RML_USE_WCRM */
1004 
1005 //------------------------------------------------------------------------
1006 // generic_connection
1007 //------------------------------------------------------------------------
1008 
1009 template<typename Server, typename Client>
1010 struct connection_traits {};
1011 
1012 // head of the active tbb connections
1013 static tbb::atomic<uintptr_t> active_tbb_connections;
1014 static tbb::atomic<int> current_tbb_conn_readers;
1015 static size_t current_tbb_conn_reader_epoch;
1016 static tbb::atomic<size_t> close_tbb_connection_event_count;
1017 
1018 #if RML_USE_WCRM
1019 template<typename Connection>
1020 void make_job( Connection& c, server_thread& t );
1021 #endif
1022 
1023 template<typename Server, typename Client>
1024 class generic_connection: public Server, no_copy {
version() const1025     version_type version() const __TBB_override {return SERVER_VERSION;}
yield()1026     void yield() __TBB_override {thread_monitor::yield();}
independent_thread_number_changed(int delta)1027     void independent_thread_number_changed( int delta ) __TBB_override { my_thread_map.adjust_balance( -delta ); }
default_concurrency() const1028     unsigned default_concurrency() const __TBB_override { return the_default_concurrency; }
1029     friend void wakeup_some_tbb_threads();
1030     friend class connection_scavenger_thread;
1031 
1032 protected:
1033     thread_map my_thread_map;
1034     generic_connection* next_conn;
1035     size_t my_ec;
1036 #if RML_USE_WCRM
1037     // FIXME: pad it?
1038     tbb::spin_mutex map_mtx;
1039     IScheduler* my_scheduler;
do_open(IScheduler * s)1040     void do_open( IScheduler* s ) {
1041         my_scheduler = s;
1042         my_thread_map.bind( s );
1043     }
is_closing()1044     bool is_closing() { return my_thread_map.is_closing(); }
1045     void request_close_connection( bool existing );
1046 #else
do_open()1047     void do_open() {my_thread_map.bind();}
1048     void request_close_connection( bool );
1049 #endif /* RML_USE_WCRM */
1050     //! Make destructor virtual
~generic_connection()1051     virtual ~generic_connection() {}
1052 #if !RML_USE_WCRM
generic_connection(wait_counter & fc,Client & c)1053     generic_connection( wait_counter& fc, Client& c ) : my_thread_map(fc,c), next_conn(NULL), my_ec(0) {}
1054 #else
generic_connection(wait_counter & fc,Client & c)1055     generic_connection( wait_counter& fc, Client& c ) :
1056             my_thread_map(fc,c), next_conn(NULL), my_ec(0), map_mtx(), my_scheduler(NULL) {}
1057     void add_virtual_processors( IVirtualProcessorRoot** vprocs, unsigned int count );
1058     void remove_virtual_processors( IVirtualProcessorRoot** vprocs, unsigned int count );
notify_resources_externally_busy(IVirtualProcessorRoot ** vprocs,unsigned int count)1059     void notify_resources_externally_busy( IVirtualProcessorRoot** vprocs, unsigned int count ) { my_thread_map.mark_virtual_processors_as_lent( vprocs, count, map_mtx ); }
notify_resources_externally_idle(IVirtualProcessorRoot ** vprocs,unsigned int count)1060     void notify_resources_externally_idle( IVirtualProcessorRoot** vprocs, unsigned int count ) {
1061         my_thread_map.mark_virtual_processors_as_returned( vprocs, count, map_mtx );
1062     }
1063 #endif /* !RML_USE_WCRM */
1064 
1065 public:
1066     typedef Server server_type;
1067     typedef Client client_type;
client() const1068     Client& client() const {return static_cast<Client&>(my_thread_map.client());}
set_scratch_ptr(job & j,void * ptr)1069     void set_scratch_ptr( job& j, void* ptr ) { ::rml::server::scratch_ptr(j) = ptr; }
1070 #if RML_USE_WCRM
1071     template<typename Connection>
1072     friend void make_job( Connection& c, server_thread& t );
add_server_ref()1073     void add_server_ref ()   {my_thread_map.add_server_ref();}
remove_server_ref()1074     void remove_server_ref() {if( my_thread_map.remove_server_ref()==0 ) delete this;}
add_client_ref()1075     void add_client_ref ()   {my_thread_map.add_client_ref();}
remove_client_ref()1076     void remove_client_ref() {my_thread_map.remove_client_ref();}
1077 #else /* !RML_USE_WCRM */
add_server_ref()1078     int  add_server_ref ()   {return my_thread_map.add_server_ref();}
remove_server_ref()1079     void remove_server_ref() {if( my_thread_map.remove_server_ref()==0 ) delete this;}
remove_client_ref()1080     void remove_client_ref() {my_thread_map.remove_client_ref();}
1081     void make_job( server_thread& t, job_automaton& ja );
1082 #endif /* RML_USE_WCRM */
get_addr(uintptr_t addr_ex)1083     static generic_connection* get_addr( uintptr_t addr_ex ) {
1084         return reinterpret_cast<generic_connection*>( addr_ex&~(uintptr_t)3 );
1085     }
1086 };
1087 
1088 //------------------------------------------------------------------------
1089 // TBB server
1090 //------------------------------------------------------------------------
1091 
1092 template<>
1093 struct connection_traits<tbb_server,tbb_client> {
1094     static const bool assist_null_only = true;
1095     static const bool is_tbb = true;
1096 };
1097 
1098 //! Represents a server/client binding.
1099 /** The internal representation uses inheritance for the server part and a pointer for the client part. */
1100 class tbb_connection_v2: public generic_connection<tbb_server,tbb_client> {
1101     void adjust_job_count_estimate( int delta ) __TBB_override;
1102 #if !RML_USE_WCRM
1103 #if _WIN32||_WIN64
register_master(rml::server::execution_resource_t &)1104     void register_master ( rml::server::execution_resource_t& /*v*/ ) __TBB_override {}
unregister_master(rml::server::execution_resource_t)1105     void unregister_master ( rml::server::execution_resource_t /*v*/ ) __TBB_override {}
1106 #endif
1107 #else
register_master(rml::server::execution_resource_t & v)1108     void register_master ( rml::server::execution_resource_t& v ) __TBB_override {
1109         my_thread_map.register_as_master(v);
1110         if( v ) ++nesting;
1111     }
unregister_master(rml::server::execution_resource_t v)1112     void unregister_master ( rml::server::execution_resource_t v ) __TBB_override {
1113         if( v ) {
1114             __TBB_ASSERT( nesting>0, NULL );
1115             if( --nesting==0 ) {
1116 #if !__RML_REMOVE_VIRTUAL_PROCESSORS_DISABLED
1117                 my_thread_map.allocate_thread_scavenger( (IExecutionResource*)v );
1118 #endif
1119             }
1120         }
1121         my_thread_map.unregister(v);
1122     }
create_scheduler()1123     IScheduler* create_scheduler() {return( scheduler<tbb_connection_v2>::create( *this ) );}
1124     friend void  free_all_connections( uintptr_t );
1125     friend class scheduler<tbb_connection_v2>;
1126     friend class execution_context;
1127     friend class connection_scavenger_thread;
1128 #endif /* RML_USE_WCRM */
1129     friend void wakeup_some_tbb_threads();
1130     //! Estimate on number of jobs without threads working on them.
1131     tbb::atomic<int> my_slack;
1132     friend class dummy_class_to_shut_up_gratuitous_warning_from_gcc_3_2_3;
1133 #if TBB_USE_ASSERT
1134     tbb::atomic<int> my_job_count_estimate;
1135 #endif /* TBB_USE_ASSERT */
1136 
1137     tbb::atomic<int> n_adjust_job_count_requests;
1138 #if RML_USE_WCRM
1139     tbb::atomic<int> nesting;
1140 #endif
1141 
1142     // dtor
1143     ~tbb_connection_v2();
1144 
1145 public:
1146 #if RML_USE_WCRM
1147     typedef tbb_server_thread server_thread_type;
1148 #endif
1149     //! True if there is slack that try_process can use.
has_slack() const1150     bool has_slack() const {return my_slack>0;}
1151 
1152 #if RML_USE_WCRM
try_process(job & job)1153     bool try_process( job& job )
1154 #else
1155     bool try_process( server_thread& t, job& job )
1156 #endif
1157     {
1158         bool visited = false;
1159         // No check for my_slack>0 here because caller is expected to do that check.
1160         int k = --my_slack;
1161         if( k>=0 ) {
1162 #if !RML_USE_WCRM
1163             t.my_extra_state = ts_visited; // remember the thread paid a trip to process() at least once
1164 #endif
1165             client().process(job);
1166             visited = true;
1167         }
1168         ++my_slack;
1169         return visited;
1170     }
1171 
tbb_connection_v2(wait_counter & fc,tbb_client & client)1172     tbb_connection_v2( wait_counter& fc, tbb_client& client ) : generic_connection<tbb_server,tbb_client>(fc,client)
1173     {
1174         my_slack = 0;
1175 #if RML_USE_WCRM
1176         nesting = 0;
1177 #endif
1178 #if TBB_USE_ASSERT
1179         my_job_count_estimate = 0;
1180 #endif /* TBB_USE_ASSERT */
1181         __TBB_ASSERT( !my_slack, NULL );
1182 
1183 #if RML_USE_WCRM
1184         do_open( client.max_job_count()>0 ? create_scheduler() : NULL );
1185 #else
1186         do_open();
1187 #endif /* !RML_USE_WCRM */
1188         n_adjust_job_count_requests = 0;
1189 
1190         // Acquire head of active_tbb_connections & push the connection into the list
1191         uintptr_t conn;
1192         do {
1193             for( ; (conn=active_tbb_connections)&1; )
1194                 __TBB_Yield();
1195         } while( active_tbb_connections.compare_and_swap( conn|1, conn )!=conn );
1196 
1197         this->next_conn = generic_connection<tbb_server,tbb_client>::get_addr(conn);
1198         // Update and release head of active_tbb_connections
1199         active_tbb_connections = (uintptr_t) this; // set and release
1200     }
wakeup_tbb_threads(unsigned n)1201     inline void wakeup_tbb_threads( unsigned n ) {
1202         my_thread_map.wakeup_tbb_threads( n
1203 #if RML_USE_WCRM
1204                 , map_mtx
1205 #endif
1206                 );
1207     }
1208 #if RML_USE_WCRM
get_nesting_level()1209     inline int get_nesting_level() { return nesting; }
1210 #else
wakeup_next_thread(thread_map::iterator i)1211     inline bool wakeup_next_thread( thread_map::iterator i ) {return my_thread_map.wakeup_next_thread( i, *this );}
get_unrealized_threads()1212     inline thread_map::size_type get_unrealized_threads () {return my_thread_map.get_unrealized_threads();}
1213 #endif /* !RML_USE_WCRM */
1214 };
1215 
1216 //------------------------------------------------------------------------
1217 // OpenMP server
1218 //------------------------------------------------------------------------
1219 
1220 template<>
1221 struct connection_traits<omp_server,omp_client> {
1222     static const bool assist_null_only = false;
1223     static const bool is_tbb = false;
1224 };
1225 
1226 class omp_connection_v2: public generic_connection<omp_server,omp_client> {
1227 #if !RML_USE_WCRM
current_balance() const1228     int  current_balance() const __TBB_override {return the_balance;}
1229 #else
1230     friend void  free_all_connections( uintptr_t );
1231     friend class scheduler<omp_connection_v2>;
1232     int current_balance() const __TBB_override {return my_thread_map.current_balance();}
1233 #endif /* !RML_USE_WCRM */
1234     int  try_increase_load( size_type n, bool strict ) __TBB_override;
1235     void decrease_load( size_type n ) __TBB_override;
1236     void get_threads( size_type request_size, void* cookie, job* array[] ) __TBB_override;
1237 #if !RML_USE_WCRM
1238 #if _WIN32||_WIN64
register_master(rml::server::execution_resource_t &)1239     void register_master ( rml::server::execution_resource_t& /*v*/ ) __TBB_override {}
unregister_master(rml::server::execution_resource_t)1240     void unregister_master ( rml::server::execution_resource_t /*v*/ ) __TBB_override {}
1241 #endif
1242 #else
register_master(rml::server::execution_resource_t & v)1243     void register_master ( rml::server::execution_resource_t& v ) __TBB_override {
1244         my_thread_map.register_as_master( v );
1245         my_thread_map.addto_original_exec_resources( (IExecutionResource*)v, map_mtx );
1246     }
unregister_master(rml::server::execution_resource_t v)1247     void unregister_master ( rml::server::execution_resource_t v ) __TBB_override { my_thread_map.unregister(v); }
1248 #endif /* !RML_USE_WCRM */
1249 #if _WIN32||_WIN64
1250     void deactivate( rml::job* j ) __TBB_override;
1251     void reactivate( rml::job* j ) __TBB_override;
1252 #endif /* _WIN32||_WIN64 */
1253 #if RML_USE_WCRM
1254 public:
1255     typedef omp_server_thread server_thread_type;
1256 private:
create_scheduler()1257     IScheduler* create_scheduler() {return( scheduler<omp_connection_v2>::create( *this ) );}
1258 #endif /* RML_USE_WCRM */
1259 public:
1260 #if TBB_USE_ASSERT
1261     //! Net change in delta caused by this connection.
1262     /** Should be zero when connection is broken */
1263     tbb::atomic<int> net_delta;
1264 #endif /* TBB_USE_ASSERT */
1265 
omp_connection_v2(wait_counter & fc,omp_client & client)1266     omp_connection_v2( wait_counter& fc, omp_client& client ) : generic_connection<omp_server,omp_client>(fc,client) {
1267 #if TBB_USE_ASSERT
1268         net_delta = 0;
1269 #endif /* TBB_USE_ASSERT */
1270 #if RML_USE_WCRM
1271         do_open( create_scheduler() );
1272 #else
1273         do_open();
1274 #endif /* RML_USE_WCRM */
1275     }
~omp_connection_v2()1276     ~omp_connection_v2() {__TBB_ASSERT( net_delta==0, "net increase/decrease of load is nonzero" );}
1277 };
1278 
1279 #if !RML_USE_WCRM
1280 /* to deal with cases where the machine is oversubscribed; we want each thread to trip to try_process() at least once */
1281 /* this should not involve computing the_balance */
wakeup_next_thread(thread_map::iterator this_thr,tbb_connection_v2 & conn)1282 bool thread_map::wakeup_next_thread( thread_map::iterator this_thr, tbb_connection_v2& conn ) {
1283     if( all_visited_at_least_once )
1284         return false;
1285 
1286     iterator e = end();
1287 retry:
1288     bool exist = false;
1289     iterator k=this_thr;
1290     for( ++k; k!=e; ++k ) {
1291         // If another thread added *k, there is a tiny timing window where thread() is invalid.
1292         server_thread& t = k->wait_for_thread();
1293         if( t.my_extra_state!=ts_visited )
1294             exist = true;
1295         if( t.read_state()!=ts_tbb_busy && t.my_extra_state==ts_started )
1296             if( t.try_grab_for( ts_tbb_busy ) )
1297                 return true;
1298     }
1299     for( k=begin(); k!=this_thr; ++k ) {
1300         server_thread& t = k->wait_for_thread();
1301         if( t.my_extra_state!=ts_visited )
1302             exist = true;
1303         if( t.read_state()!=ts_tbb_busy && t.my_extra_state==ts_started )
1304             if( t.try_grab_for( ts_tbb_busy ) )
1305                 return true;
1306     }
1307 
1308     if( exist )
1309         if( conn.has_slack() )
1310             goto retry;
1311     else
1312         all_visited_at_least_once = true;
1313     return false;
1314 }
1315 
release_tbb_threads(server_thread * t)1316 void thread_map::release_tbb_threads( server_thread* t ) {
1317     for( ; t; t = t->link ) {
1318         while( t->read_state()!=ts_asleep )
1319             __TBB_Yield();
1320         t->my_extra_state = ts_started;
1321     }
1322 }
1323 #endif /* !RML_USE_WCRM */
1324 
adjust_balance(int delta)1325 void thread_map::adjust_balance( int delta ) {
1326     int new_balance = the_balance += delta;
1327     if( new_balance>0 && 0>=new_balance-delta /*== old the_balance*/ )
1328         wakeup_some_tbb_threads();
1329 }
1330 
remove_client_ref()1331 void thread_map::remove_client_ref() {
1332     int k = my_client_ref_count.remove_ref();
1333     if( k==0 ) {
1334         // Notify factory that thread has crossed back into RML.
1335         --my_factory_counter;
1336         // Notify client that RML is done with the client object.
1337         my_client.acknowledge_close_connection();
1338     }
1339 }
1340 
1341 #if RML_USE_WCRM
1342 /** Not a member of generic_connection because we need Connection to be the derived class. */
1343 template<typename Connection>
make_job(Connection & c,typename Connection::server_thread_type & t)1344 void make_job( Connection& c, typename Connection::server_thread_type& t ) {
1345     if( t.my_job_automaton.try_acquire() ) {
1346         rml::job* j = t.my_client.create_one_job();
1347         __TBB_ASSERT( j!=NULL, "client:::create_one_job returned NULL" );
1348         __TBB_ASSERT( (intptr_t(j)&1)==0, "client::create_one_job returned misaligned job" );
1349         t.my_job_automaton.set_and_release( j );
1350         c.set_scratch_ptr( *j, (void*) &t );
1351     }
1352 }
1353 #endif /* RML_USE_WCRM */
1354 
1355 #if _MSC_VER && !defined(__INTEL_COMPILER)
1356 // Suppress "conditional expression is constant" warning.
1357 #pragma warning( push )
1358 #pragma warning( disable: 4127 )
1359 #endif
1360 #if RML_USE_WCRM
1361 template<typename Server, typename Client>
request_close_connection(bool exiting)1362 void generic_connection<Server,Client>::request_close_connection( bool exiting ) {
1363     // for TBB connections, exiting should always be false
1364     if( connection_traits<Server,Client>::is_tbb )
1365         __TBB_ASSERT( !exiting, NULL);
1366 #if TBB_USE_ASSERT
1367     else if( exiting )
1368         reinterpret_cast<omp_connection_v2*>(this)->net_delta = 0;
1369 #endif
1370     if( exiting ) {
1371         uintptr_t tail = connections_to_reclaim.tail;
1372         while( connections_to_reclaim.tail.compare_and_swap( garbage_connection_queue::plugged, tail )!=tail )
1373             __TBB_Yield();
1374         my_thread_map.unbind( *this, map_mtx );
1375         my_thread_map.assist_cleanup( connection_traits<Server,Client>::assist_null_only );
1376         // It is assumed that the client waits for all other threads to terminate before
1377         // calling request_close_connection with true.  Thus, it is safe to return all
1378         // outstanding connection objects that are reachable. It is possible that there may
1379         // be some unreachable connection objects lying somewhere.
1380         free_all_connections( connection_scavenger.grab_and_prepend( this ) );
1381         return;
1382     }
1383 #else /* !RML_USE_WCRM */
1384 template<typename Server, typename Client>
1385 void generic_connection<Server,Client>::request_close_connection( bool ) {
1386 #endif /* RML_USE_WCRM */
1387     if( connection_traits<Server,Client>::is_tbb ) {
1388         // acquire the head of active tbb connections
1389         uintptr_t conn;
1390         do {
1391             for( ; (conn=active_tbb_connections)&1; )
1392                 __TBB_Yield();
1393         } while( active_tbb_connections.compare_and_swap( conn|1, conn )!=conn );
1394 
1395         // Locate the current connection
1396         generic_connection* pred_conn = NULL;
1397         generic_connection* curr_conn = (generic_connection*) conn;
1398         for( ; curr_conn && curr_conn!=this; curr_conn=curr_conn->next_conn )
1399             pred_conn = curr_conn;
1400         __TBB_ASSERT( curr_conn==this, "the current connection is not in the list?" );
1401 
1402         // Remove this from the list
1403         if( pred_conn ) {
1404             pred_conn->next_conn = curr_conn->next_conn;
1405             active_tbb_connections = reinterpret_cast<uintptr_t>(generic_connection<tbb_server,tbb_client>::get_addr(active_tbb_connections)); // release it
1406         } else
1407             active_tbb_connections = (uintptr_t) curr_conn->next_conn; // update & release it
1408         curr_conn->next_conn = NULL;
1409         // Increment the tbb connection close event count
1410         my_ec = ++close_tbb_connection_event_count;
1411         // Wait happens in tbb_connection_v2::~tbb_connection_v2()
1412     }
1413 #if RML_USE_WCRM
1414     my_thread_map.unbind( *this, map_mtx );
1415     my_thread_map.assist_cleanup( connection_traits<Server,Client>::assist_null_only );
1416     connection_scavenger.add_request( this );
1417 #else
1418     my_thread_map.unbind();
1419     my_thread_map.assist_cleanup( connection_traits<Server,Client>::assist_null_only );
1420     // Remove extra reference
1421     remove_server_ref();
1422 #endif
1423 }
1424 #if _MSC_VER && !defined(__INTEL_COMPILER)
1425 #pragma warning( pop )
1426 #endif
1427 
1428 #if RML_USE_WCRM
1429 
1430 template<typename Server, typename Client>
1431 void generic_connection<Server,Client>::add_virtual_processors( IVirtualProcessorRoot** vproots, unsigned int count )
1432 {}
1433 
1434 template<>
1435 void generic_connection<tbb_server,tbb_client>::add_virtual_processors( IVirtualProcessorRoot** vproots, unsigned int count )
1436 {
1437     my_thread_map.add_virtual_processors( vproots, count, (tbb_connection_v2&)*this, map_mtx );
1438 }
1439 template<>
1440 void generic_connection<omp_server,omp_client>::add_virtual_processors( IVirtualProcessorRoot** vproots, unsigned int count )
1441 {
1442     // For OMP, since it uses ScheudlerPolicy of MinThreads==MaxThreads, this is called once when
1443     // RequestInitialVirtualProcessors() is  called.
1444     my_thread_map.add_virtual_processors( vproots, count, (omp_connection_v2&)*this, map_mtx );
1445 }
1446 
1447 template<typename Server, typename Client>
1448 void generic_connection<Server,Client>::remove_virtual_processors( IVirtualProcessorRoot** vproots, unsigned int count )
1449 {
1450     __TBB_ASSERT( false, "should not be called" );
1451 }
1452 /* For OMP, RemoveVirtualProcessors() will never be called. */
1453 
1454 template<>
1455 void generic_connection<tbb_server,tbb_client>::remove_virtual_processors( IVirtualProcessorRoot** vproots, unsigned int count )
1456 {
1457     my_thread_map.remove_virtual_processors( vproots, count, map_mtx );
1458 }
1459 
1460 void tbb_connection_v2::adjust_job_count_estimate( int delta ) {
1461 #if TBB_USE_ASSERT
1462     my_job_count_estimate += delta;
1463 #endif /* TBB_USE_ASSERT */
1464     // Atomically update slack.
1465     int c = my_slack+=delta;
1466     if( c>0 ) {
1467         ++n_adjust_job_count_requests;
1468         my_thread_map.wakeup_tbb_threads( c, map_mtx );
1469         --n_adjust_job_count_requests;
1470     }
1471 }
1472 #endif /* RML_USE_WCRM */
1473 
1474 tbb_connection_v2::~tbb_connection_v2() {
1475 #if TBB_USE_ASSERT
1476     if( my_job_count_estimate!=0 ) {
1477         fprintf(stderr, "TBB client tried to disconnect with non-zero net job count estimate of %d\n", int(my_job_count_estimate ));
1478         abort();
1479     }
1480     __TBB_ASSERT( !my_slack, "attempt to destroy tbb_server with nonzero slack" );
1481     __TBB_ASSERT( this!=static_cast<tbb_connection_v2*>(generic_connection<tbb_server,tbb_client >::get_addr(active_tbb_connections)), "request_close_connection() must be called" );
1482 #endif /* TBB_USE_ASSERT */
1483 #if !RML_USE_WCRM
1484     // If there are other threads ready for work, give them coins
1485     if( the_balance>0 )
1486         wakeup_some_tbb_threads();
1487 #endif
1488     // Someone might be accessing my data members
1489     while( current_tbb_conn_readers>0 && (ptrdiff_t)(my_ec-current_tbb_conn_reader_epoch)>0 )
1490         __TBB_Yield();
1491 }
1492 
1493 #if !RML_USE_WCRM
1494 template<typename Server, typename Client>
1495 void generic_connection<Server,Client>::make_job( server_thread& t, job_automaton& ja ) {
1496     if( ja.try_acquire() ) {
1497         rml::job* j = client().create_one_job();
1498         __TBB_ASSERT( j!=NULL, "client:::create_one_job returned NULL" );
1499         __TBB_ASSERT( (intptr_t(j)&1)==0, "client::create_one_job returned misaligned job" );
1500         ja.set_and_release( j );
1501         __TBB_ASSERT( t.my_conn && t.my_ja && t.my_job==NULL, NULL );
1502         t.my_job  = j;
1503         set_scratch_ptr( *j, (void*) &t );
1504     }
1505 }
1506 
1507 void tbb_connection_v2::adjust_job_count_estimate( int delta ) {
1508 #if TBB_USE_ASSERT
1509     my_job_count_estimate += delta;
1510 #endif /* TBB_USE_ASSERT */
1511     // Atomically update slack.
1512     int c = my_slack+=delta;
1513     if( c>0 ) {
1514         ++n_adjust_job_count_requests;
1515         // The client has work to do and there are threads available
1516         thread_map::size_type n = my_thread_map.wakeup_tbb_threads(c);
1517 
1518         server_thread* new_threads_anchor = NULL;
1519         thread_map::size_type i;
1520         {
1521         tbb::internal::affinity_helper fpa;
1522         for( i=0; i<n; ++i ) {
1523             // Obtain unrealized threads
1524             thread_map::value_type* k = my_thread_map.add_one_thread( false );
1525             if( !k )
1526                 // No unrealized threads left.
1527                 break;
1528             // Eagerly start the thread off.
1529             fpa.protect_affinity_mask( /*restore_process_mask=*/true );
1530             my_thread_map.bind_one_thread( *this, *k );
1531             server_thread& t = k->thread();
1532             __TBB_ASSERT( !t.link, NULL );
1533             t.link = new_threads_anchor;
1534             new_threads_anchor = &t;
1535         }
1536         // Implicit destruction of fpa resets original affinity mask.
1537         }
1538 
1539         thread_map::size_type j=0;
1540         for( ; the_balance>0 && j<i; ++j ) {
1541             if( --the_balance>=0 ) {
1542                 // Withdraw a coin from the bank
1543                 __TBB_ASSERT( new_threads_anchor, NULL );
1544 
1545                 server_thread* t = new_threads_anchor;
1546                 new_threads_anchor = t->link;
1547                 while( !t->try_grab_for( ts_tbb_busy ) )
1548                     __TBB_Yield();
1549                 t->my_extra_state = ts_started;
1550             } else {
1551                 // Overdraft. return it to the bank
1552                 ++the_balance;
1553                 break;
1554             }
1555         }
1556         __TBB_ASSERT( i-j!=0||new_threads_anchor==NULL, NULL );
1557         // Mark the ones that did not get started as eligible for being snatched.
1558         if( new_threads_anchor )
1559             my_thread_map.release_tbb_threads( new_threads_anchor );
1560 
1561         --n_adjust_job_count_requests;
1562     }
1563 }
1564 #endif /* RML_USE_WCRM */
1565 
1566 #if RML_USE_WCRM
1567 int omp_connection_v2::try_increase_load( size_type n, bool strict ) {
1568     __TBB_ASSERT(int(n)>=0,NULL);
1569     if( strict ) {
1570         the_balance -= int(n);
1571     } else {
1572         int avail, old;
1573         do {
1574             avail = the_balance;
1575             if( avail<=0 ) {
1576                 // No atomic read-write-modify operation necessary.
1577                 return avail;
1578             }
1579             // Don't read the_system_balance; if it changes, compare_and_swap will fail anyway.
1580             old = the_balance.compare_and_swap( int(n)<avail ? avail-n : 0, avail );
1581         } while( old!=avail );
1582         if( int(n)>avail )
1583             n=avail;
1584     }
1585 #if TBB_USE_ASSERT
1586     net_delta += n;
1587 #endif /* TBB_USE_ASSERT */
1588     return n;
1589 }
1590 
1591 void omp_connection_v2::decrease_load( size_type /*n*/ ) {}
1592 
1593 void omp_connection_v2::get_threads( size_type request_size, void* cookie, job* array[] ) {
1594     unsigned index = 0;
1595     std::vector<omp_server_thread*> enlisted(request_size);
1596     std::vector<thread_grab_t> to_activate(request_size);
1597 
1598     if( request_size==0 ) return;
1599 
1600     {
1601         tbb::spin_mutex::scoped_lock lock(map_mtx);
1602 
1603         __TBB_ASSERT( !is_closing(), "try to get threads while connection is being shutdown?" );
1604 
1605         for( int scan=0; scan<2; ++scan ) {
1606             for( thread_map::iterator i=my_thread_map.begin(); i!=my_thread_map.end(); ++i ) {
1607                 omp_server_thread* thr = (omp_server_thread*) (*i).second;
1608                 // in the first scan, skip VPs that are lent
1609                 if( scan==0 && thr->is_lent() ) continue;
1610                 thread_grab_t res = thr->try_grab_for();
1611                 if( res!=wk_failed ) {// && if is not busy by some other scheduler
1612                     to_activate[index] = res;
1613                     enlisted[index] = thr;
1614                     if( ++index==request_size )
1615                         goto activate_threads;
1616                 }
1617             }
1618         }
1619     }
1620 
1621 activate_threads:
1622 
1623     for( unsigned i=0; i<index; ++i ) {
1624         omp_server_thread* thr = enlisted[i];
1625         if( to_activate[i]==wk_from_asleep )
1626             thr->get_virtual_processor()->Activate( thr );
1627         job* j = thr->wait_for_job();
1628         array[i] = j;
1629         thr->omp_data.produce( client(), j, cookie, i PRODUCE_ARG(*this) );
1630     }
1631 
1632     if( index==request_size )
1633         return;
1634 
1635     // If we come to this point, it must be because dynamic==false
1636     // Create Oversubscribers..
1637 
1638     // Note that our policy is such that MinConcurrency==MaxConcurrency.
1639     // RM will deliver MaxConcurrency of VirtualProcessors and no more.
1640     __TBB_ASSERT( request_size>index, NULL );
1641     unsigned n = request_size - index;
1642     std::vector<server_thread*> thr_vec(n);
1643     typedef std::vector<server_thread*>::iterator iterator_thr;
1644     my_thread_map.create_oversubscribers( n, thr_vec, *this, map_mtx );
1645     for( iterator_thr ti=thr_vec.begin(); ti!=thr_vec.end(); ++ti ) {
1646         omp_server_thread* thr = (omp_server_thread*) *ti;
1647         __TBB_ASSERT( thr, "thread not created?" );
1648         // Thread is already grabbed; since it is newly created, we need to activate it.
1649         thr->get_virtual_processor()->Activate( thr );
1650         job* j = thr->wait_for_job();
1651         array[index] = j;
1652         thr->omp_data.produce( client(), j, cookie, index PRODUCE_ARG(*this) );
1653         ++index;
1654     }
1655 }
1656 
1657 #if _WIN32||_WIN64
1658 void omp_connection_v2::deactivate( rml::job* j )
1659 {
1660     my_thread_map.adjust_balance(1);
1661 #if TBB_USE_ASSERT
1662     net_delta -= 1;
1663 #endif
1664     omp_server_thread* thr = (omp_server_thread*) scratch_ptr( *j );
1665     (thr->get_virtual_processor())->Deactivate( thr );
1666 }
1667 
1668 void omp_connection_v2::reactivate( rml::job* j )
1669 {
1670     // Should not adjust the_balance because OMP client is supposed to
1671     // do try_increase_load() to reserve the threads to use.
1672     omp_server_thread* thr = (omp_server_thread*) scratch_ptr( *j );
1673     (thr->get_virtual_processor())->Activate( thr );
1674 }
1675 #endif /* !_WIN32||_WIN64 */
1676 
1677 #endif  /* RML_USE_WCRM */
1678 
1679 //! Wake up some available tbb threads
1680 void wakeup_some_tbb_threads()
1681 {
1682     /* First, atomically grab the connection, then increase the server ref count to keep
1683        it from being released prematurely.  Second, check if the balance is available for TBB
1684        and the tbb conneciton has slack to exploit.  If the answer is true, go ahead and
1685        try to wake some up. */
1686     if( generic_connection<tbb_server,tbb_client >::get_addr(active_tbb_connections)==0 )
1687         // the next connection will see the change; return.
1688         return;
1689 
1690 start_it_over:
1691     int n_curr_readers = ++current_tbb_conn_readers;
1692     if( n_curr_readers>1 ) // I lost
1693         return;
1694     // if n_curr_readers==1, i am the first one, so I will take responsibility for waking tbb threads up.
1695 
1696     // update the current epoch
1697     current_tbb_conn_reader_epoch = close_tbb_connection_event_count;
1698 
1699     // read and clear
1700     // Newly added connection will not invalidate the pointer, and it will
1701     // compete with the current one to claim coins.
1702     // One that is about to close the connection increments the event count
1703     // after it removes the connection from the list.  But it will keep around
1704     // the connection until all readers including this one catch up. So, reading
1705     // the head and clearing the lock bit should be o.k.
1706     generic_connection<tbb_server,tbb_client>* next_conn_wake_up = generic_connection<tbb_server,tbb_client>::get_addr( active_tbb_connections );
1707 
1708     for( ; next_conn_wake_up; ) {
1709         /* some threads are creating tbb server threads; they may not see my changes made to the_balance */
1710         /* When a thread is in adjust_job_count_estimate() to increase the slack
1711            RML tries to activate worker threads on behalf of the requesting thread
1712            by repeatedly drawing a coin from the bank optimistically and grabbing a
1713            thread.  If it finds the bank overdrafted, it returns the coin back to
1714            the bank and returns the control to the thread (return from the method).
1715            There lies a tiny timing hole.
1716 
1717            When the overdraft occurs (note that multiple masters may be in
1718            adjust_job_count_estimate() so the_balance can be any negative value) and
1719            a worker returns from the TBB work at that moment, its returning the coin
1720            does not bump up the_balance over 0, so it happily returns from
1721            wakeup_some_tbb_threads() without attempting to give coins to worker threads
1722            that are ready.
1723         */
1724         while( ((tbb_connection_v2*)next_conn_wake_up)->n_adjust_job_count_requests>0 )
1725             __TBB_Yield();
1726 
1727         int bal = the_balance;
1728         n_curr_readers = current_tbb_conn_readers; // get the snapshot
1729         if( bal<=0 ) break;
1730         // if the connection is deleted, the following will immediately return because its slack would be 0 or less.
1731 
1732         tbb_connection_v2* tbb_conn = (tbb_connection_v2*)next_conn_wake_up;
1733         int my_slack = tbb_conn->my_slack;
1734         if( my_slack>0 ) tbb_conn->wakeup_tbb_threads( my_slack );
1735         next_conn_wake_up = next_conn_wake_up->next_conn;
1736     }
1737 
1738     int delta = current_tbb_conn_readers -= n_curr_readers;
1739     //if delta>0, more threads entered the routine since this one took the snapshot
1740     if( delta>0 ) {
1741         current_tbb_conn_readers = 0;
1742         if( the_balance>0 && generic_connection<tbb_server,tbb_client >::get_addr(active_tbb_connections)!=0 )
1743             goto start_it_over;
1744     }
1745 
1746     // Signal any connection that is waiting for me to complete my access that I am done.
1747     current_tbb_conn_reader_epoch = close_tbb_connection_event_count;
1748 }
1749 
1750 #if !RML_USE_WCRM
1751 int omp_connection_v2::try_increase_load( size_type n, bool strict ) {
1752     __TBB_ASSERT(int(n)>=0,NULL);
1753     if( strict ) {
1754         the_balance -= int(n);
1755     } else {
1756         int avail, old;
1757         do {
1758             avail = the_balance;
1759             if( avail<=0 ) {
1760                 // No atomic read-write-modify operation necessary.
1761                 return avail;
1762             }
1763             // don't read the_balance; if it changes, compare_and_swap will fail anyway.
1764             old = the_balance.compare_and_swap( int(n)<avail ? avail-n : 0, avail );
1765         } while( old!=avail );
1766         if( int(n)>avail )
1767             n=avail;
1768     }
1769 #if TBB_USE_ASSERT
1770     net_delta += n;
1771 #endif /* TBB_USE_ASSERT */
1772     return n;
1773 }
1774 
1775 void omp_connection_v2::decrease_load( size_type n ) {
1776     __TBB_ASSERT(int(n)>=0,NULL);
1777     my_thread_map.adjust_balance(int(n));
1778 #if TBB_USE_ASSERT
1779     net_delta -= n;
1780 #endif /* TBB_USE_ASSERT */
1781 }
1782 
1783 void omp_connection_v2::get_threads( size_type request_size, void* cookie, job* array[] ) {
1784 
1785     if( !request_size )
1786         return;
1787 
1788     unsigned index = 0;
1789     for(;;) { // don't return until all request_size threads are grabbed.
1790         // Need to grab some threads
1791         thread_map::iterator k_end=my_thread_map.end();
1792         for( thread_map::iterator k=my_thread_map.begin(); k!=k_end; ++k ) {
1793             // If another thread added *k, there is a tiny timing window where thread() is invalid.
1794             server_thread& t = k->wait_for_thread();
1795             if( t.try_grab_for( ts_omp_busy ) ) {
1796                 // The preincrement instead of post-increment of index is deliberate.
1797                 job* j = k->wait_for_job();
1798                 array[index] = j;
1799                 t.omp_dispatch.produce( client(), j, cookie, index PRODUCE_ARG(*this) );
1800                 if( ++index==request_size )
1801                     return;
1802             }
1803         }
1804         // Need to allocate more threads
1805         for( unsigned i=index; i<request_size; ++i ) {
1806             __TBB_ASSERT( index<request_size, NULL );
1807             thread_map::value_type* k = my_thread_map.add_one_thread( true );
1808 #if TBB_USE_ASSERT
1809             if( !k ) {
1810                 // Client erred
1811                 __TBB_ASSERT(false, "server::get_threads: exceeded job_count\n");
1812             }
1813 #endif
1814             my_thread_map.bind_one_thread( *this, *k );
1815             server_thread& t = k->thread();
1816             if( t.try_grab_for( ts_omp_busy ) ) {
1817                 job* j = k->wait_for_job();
1818                 array[index] = j;
1819                 // The preincrement instead of post-increment of index is deliberate.
1820                 t.omp_dispatch.produce( client(), j, cookie, index PRODUCE_ARG(*this) );
1821                 if( ++index==request_size )
1822                     return;
1823             } // else someone else snatched it.
1824         }
1825     }
1826 }
1827 #endif /* !RML_USE_WCRM */
1828 
1829 //------------------------------------------------------------------------
1830 // Methods of omp_dispatch_type
1831 //------------------------------------------------------------------------
1832 void omp_dispatch_type::consume() {
1833     // Wait for short window between when master sets state of this thread to ts_omp_busy
1834     // and master thread calls produce.
1835     job_type* j;
1836     tbb::internal::atomic_backoff backoff;
1837     while( (j = job)==NULL ) backoff.pause();
1838     job = static_cast<job_type*>(NULL);
1839     client->process(*j,cookie,index);
1840 #if TBB_USE_ASSERT
1841     // Return of method process implies "decrease_load" from client's viewpoint, even though
1842     // the actual adjustment of the_balance only happens when this thread really goes to sleep.
1843     --server->net_delta;
1844 #endif /* TBB_USE_ASSERT */
1845 }
1846 
1847 #if !RML_USE_WCRM
1848 #if _WIN32||_WIN64
1849 void omp_connection_v2::deactivate( rml::job* j )
1850 {
1851 #if TBB_USE_ASSERT
1852     net_delta -= 1;
1853 #endif
1854     __TBB_ASSERT( j, NULL );
1855     server_thread* thr = (server_thread*) scratch_ptr( *j );
1856     thr->deactivate();
1857 }
1858 
1859 void omp_connection_v2::reactivate( rml::job* j )
1860 {
1861     // Should not adjust the_balance because OMP client is supposed to
1862     // do try_increase_load() to reserve the threads to use.
1863     __TBB_ASSERT( j, NULL );
1864     server_thread* thr = (server_thread*) scratch_ptr( *j );
1865     thr->reactivate();
1866 }
1867 #endif /* _WIN32||_WIN64 */
1868 
1869 //------------------------------------------------------------------------
1870 // Methods of server_thread
1871 //------------------------------------------------------------------------
1872 
1873 server_thread::server_thread() :
1874     ref_count(0),
1875     link(NULL),
1876     my_map_pos(),
1877     my_conn(NULL), my_job(NULL), my_ja(NULL)
1878 {
1879     state = ts_idle;
1880     terminate = false;
1881 #if TBB_USE_ASSERT
1882     has_active_thread = false;
1883 #endif /* TBB_USE_ASSERT */
1884 }
1885 
1886 server_thread::~server_thread() {
1887     __TBB_ASSERT( !has_active_thread, NULL );
1888 }
1889 
1890 #if _MSC_VER && !defined(__INTEL_COMPILER)
1891     // Suppress overzealous compiler warnings about an initialized variable 'sink_for_alloca' not referenced
1892     #pragma warning(push)
1893     #pragma warning(disable:4189)
1894 #endif
1895 __RML_DECL_THREAD_ROUTINE server_thread::thread_routine( void* arg ) {
1896     server_thread* self = static_cast<server_thread*>(arg);
1897     AVOID_64K_ALIASING( self->my_index );
1898 #if TBB_USE_ASSERT
1899     __TBB_ASSERT( !self->has_active_thread, NULL );
1900     self->has_active_thread = true;
1901 #endif /* TBB_USE_ASSERT */
1902     self->loop();
1903     return 0;
1904 }
1905 #if _MSC_VER && !defined(__INTEL_COMPILER)
1906     #pragma warning(pop)
1907 #endif
1908 
1909 void server_thread::launch( size_t stack_size ) {
1910 #if USE_WINTHREAD
1911     thread_monitor::launch( thread_routine, this, stack_size, &this->my_index );
1912 #else
1913     thread_monitor::launch( thread_routine, this, stack_size );
1914 #endif /* USE_PTHREAD */
1915 }
1916 
1917 void server_thread::sleep_perhaps( thread_state_t asleep ) {
1918     if( terminate ) return;
1919     __TBB_ASSERT( asleep==ts_asleep, NULL );
1920     thread_monitor::cookie c;
1921     monitor.prepare_wait(c);
1922     if( state.compare_and_swap( asleep, ts_idle )==ts_idle ) {
1923         if( !terminate ) {
1924             monitor.commit_wait(c);
1925             // Someone else woke me up.  The compare_and_swap further below deals with spurious wakeups.
1926         } else {
1927             monitor.cancel_wait();
1928         }
1929         thread_state_t s = read_state();
1930         if( s==ts_asleep ) {
1931             state.compare_and_swap( ts_idle, ts_asleep );
1932             // I woke myself up, either because I cancelled the wait or suffered a spurious wakeup.
1933         } else {
1934             // Someone else woke me up; there the_balance is decremented by 1. -- tbb only
1935             if( !is_omp_thread ) {
1936                 __TBB_ASSERT( s==ts_tbb_busy||s==ts_idle, NULL );
1937             }
1938         }
1939     } else {
1940         // someone else made it busy ; see try_grab_for when state==ts_idle.
1941         __TBB_ASSERT( state==ts_omp_busy||state==ts_tbb_busy, NULL );
1942         monitor.cancel_wait();
1943     }
1944     __TBB_ASSERT( read_state()!=asleep, "a thread can only put itself to sleep" );
1945 }
1946 
1947 bool server_thread::wakeup( thread_state_t to, thread_state_t from ) {
1948     bool success = false;
1949     __TBB_ASSERT( from==ts_asleep && (to==ts_idle||to==ts_omp_busy||to==ts_tbb_busy), NULL );
1950     if( state.compare_and_swap( to, from )==from ) {
1951         if( !is_omp_thread ) __TBB_ASSERT( to==ts_idle||to==ts_tbb_busy, NULL );
1952         // There is a small timing window that permits balance to become negative,
1953         // but such occurrences are probably rare enough to not worry about, since
1954         // at worst the result is slight temporary oversubscription.
1955         monitor.notify();
1956         success = true;
1957     }
1958     return success;
1959 }
1960 
1961 //! Attempt to change a thread's state to ts_omp_busy, and waking it up if necessary.
1962 bool server_thread::try_grab_for( thread_state_t target_state ) {
1963     bool success = false;
1964     switch( read_state() ) {
1965         case ts_asleep:
1966             success = wakeup( target_state, ts_asleep );
1967             break;
1968         case ts_idle:
1969             success = state.compare_and_swap( target_state, ts_idle )==ts_idle;
1970             break;
1971         default:
1972             // Thread is not available to be part of an OpenMP thread team.
1973             break;
1974     }
1975     return success;
1976 }
1977 
1978 #if _WIN32||_WIN64
1979 void server_thread::deactivate() {
1980     thread_state_t es = (thread_state_t) my_extra_state.fetch_and_store( ts_deactivated );
1981     __TBB_ASSERT( my_extra_state==ts_deactivated, "someone else tampered with my_extra_state?" );
1982     if( es==ts_none )
1983         state = ts_idle;
1984     else
1985         __TBB_ASSERT( es==ts_reactivated, "Cannot call deactivate() while in ts_deactivated" );
1986         // only the thread can transition itself from ts_deactivted to ts_none
1987     __TBB_ASSERT( my_extra_state==ts_deactivated, "someone else tampered with my_extra_state?" );
1988     my_extra_state = ts_none; // release the critical section
1989     int bal = ++the_balance;
1990     if( bal>0 )
1991         wakeup_some_tbb_threads();
1992     if( es==ts_none )
1993         sleep_perhaps( ts_asleep );
1994 }
1995 
1996 void server_thread::reactivate() {
1997     thread_state_t es;
1998     do {
1999         while( (es=read_extra_state())==ts_deactivated )
2000             __TBB_Yield();
2001         if( es==ts_reactivated ) {
2002             __TBB_ASSERT( false, "two Reactivate() calls in a row.  Should not happen" );
2003             return;
2004         }
2005         __TBB_ASSERT( es==ts_none, NULL );
2006     } while( (thread_state_t)my_extra_state.compare_and_swap( ts_reactivated, ts_none )!=ts_none );
2007     if( state!=ts_omp_busy ) {
2008         my_extra_state = ts_none;
2009         while( !try_grab_for( ts_omp_busy ) )
2010             __TBB_Yield();
2011     }
2012 }
2013 #endif /* _WIN32||_WIN64 */
2014 
2015 
2016 template<typename Connection>
2017 bool server_thread::destroy_job( Connection& c ) {
2018     __TBB_ASSERT( !is_omp_thread||(state==ts_idle||state==ts_omp_busy), NULL );
2019     __TBB_ASSERT(  is_omp_thread||(state==ts_idle||state==ts_tbb_busy), NULL );
2020     if( !is_omp_thread ) {
2021         __TBB_ASSERT( state==ts_idle||state==ts_tbb_busy, NULL );
2022         if( state==ts_idle )
2023             state.compare_and_swap( ts_done, ts_idle );
2024         // 'state' may be set to ts_tbb_busy by another thread.
2025 
2026         if( state==ts_tbb_busy ) { // return the coin to the deposit
2027             // need to deposit first to let the next connection see the change
2028             ++the_balance;
2029             state = ts_done; // no other thread changes the state when it is ts_*_busy
2030         }
2031     }
2032     if( job_automaton* ja = my_ja ) {
2033         rml::job* j;
2034         if( ja->try_plug(j) ) {
2035             __TBB_ASSERT( j, NULL );
2036             c.client().cleanup(*j);
2037             c.remove_client_ref();
2038         } else {
2039             // Some other thread took responsibility for cleaning up the job.
2040         }
2041     }
2042     // Must do remove client reference first, because execution of
2043     // c.remove_ref() can cause *this to be destroyed.
2044     int k = remove_ref();
2045     __TBB_ASSERT_EX( k==0, "more than one references?" );
2046 #if TBB_USE_ASSERT
2047     has_active_thread = false;
2048 #endif /* TBB_USE_ASSERT */
2049     c.remove_server_ref();
2050     return true;
2051 }
2052 
2053 bool server_thread::do_termination() {
2054     if( is_omp_thread )
2055         return destroy_job( *static_cast<omp_connection_v2*>(my_conn) );
2056     else
2057         return destroy_job( *static_cast<tbb_connection_v2*>(my_conn) );
2058 }
2059 
2060 //! Loop that each thread executes
2061 void server_thread::loop() {
2062     if( is_omp_thread )
2063         static_cast<omp_connection_v2*>(my_conn)->make_job( *this, *my_ja );
2064     else
2065         static_cast<tbb_connection_v2*>(my_conn)->make_job( *this, *my_ja );
2066     for(;;) {
2067         __TBB_Yield();
2068         if( state==ts_idle )
2069             sleep_perhaps( ts_asleep );
2070 
2071         // Check whether I should quit.
2072         if( terminate )
2073             if( do_termination() )
2074                 return;
2075 
2076         // read the state
2077         thread_state_t s = read_state();
2078         __TBB_ASSERT( s==ts_idle||s==ts_omp_busy||s==ts_tbb_busy, NULL );
2079 
2080         if( s==ts_omp_busy ) {
2081             // Enslaved by OpenMP team.
2082             omp_dispatch.consume();
2083             /* here wake tbb threads up if feasible */
2084             if( ++the_balance>0 )
2085                 wakeup_some_tbb_threads();
2086             state = ts_idle;
2087         } else if( s==ts_tbb_busy ) {
2088             // do some TBB work.
2089             __TBB_ASSERT( my_conn && my_job, NULL );
2090             tbb_connection_v2& conn = *static_cast<tbb_connection_v2*>(my_conn);
2091             // give openmp higher priority
2092             bool has_coin = true;
2093             if( conn.has_slack() ) {
2094                 // it has the coin, it should trip to the scheduler at least once as long as its slack is positive
2095                 do {
2096                     if( conn.try_process( *this, *my_job ) )
2097                         if( conn.has_slack() && the_balance>=0 )
2098                             has_coin = !conn.wakeup_next_thread( my_map_pos );
2099                 } while( has_coin && conn.has_slack() && the_balance>=0 );
2100             }
2101             state = ts_idle;
2102             if( has_coin ) {
2103                 ++the_balance; // return the coin back to the deposit
2104                 if( conn.has_slack() ) { // a new adjust_job_request_estimate() is in progress
2105                                          // it may have missed my changes to state and/or the_balance
2106                     if( --the_balance>=0 ) { // try to grab the coin back
2107                         // I got the coin
2108                         if( state.compare_and_swap( ts_tbb_busy, ts_idle )!=ts_idle )
2109                             ++the_balance; // someone else enlisted me.
2110                     } else {
2111                         // overdraft. return the coin
2112                         ++the_balance;
2113                     }
2114                 } // else the new request will see my changes to state & the_balance.
2115             }
2116             /* here wake tbb threads up if feasible */
2117             if( the_balance>0 )
2118                 wakeup_some_tbb_threads();
2119         }
2120     }
2121 }
2122 #endif /* !RML_USE_WCRM */
2123 
2124 #if RML_USE_WCRM
2125 
2126 class tbb_connection_v2;
2127 class omp_connection_v2;
2128 
2129 #define CREATE_SCHEDULER_POLICY(policy,min_thrs,max_thrs,stack_size) \
2130     try {                                                                 \
2131         policy = new SchedulerPolicy (7,                                  \
2132                           SchedulerKind, RML_THREAD_KIND, /*defined in _rml_serer_msrt.h*/ \
2133                           MinConcurrency, min_thrs,                       \
2134                           MaxConcurrency, max_thrs,                       \
2135                           TargetOversubscriptionFactor, 1,                \
2136                           ContextStackSize, stack_size/1000, /*ConcRT:kB, iRML:bytes*/ \
2137                           ContextPriority, THREAD_PRIORITY_NORMAL,        \
2138                           DynamicProgressFeedback, ProgressFeedbackDisabled ); \
2139     } catch ( invalid_scheduler_policy_key & ) {                               \
2140         __TBB_ASSERT( false, "invalid scheduler policy key exception caught" );\
2141     } catch ( invalid_scheduler_policy_value & ) {                        \
2142         __TBB_ASSERT( false, "invalid scheduler policy value exception caught" );\
2143     }
2144 
2145 static unsigned int core_count;
2146 static tbb::atomic<int> core_count_inited;
2147 
2148 
2149 static unsigned int get_processor_count()
2150 {
2151     if( core_count_inited!=2 ) {
2152         if( core_count_inited.compare_and_swap( 1, 0 )==0 ) {
2153             core_count = GetProcessorCount();
2154             core_count_inited = 2;
2155         } else {
2156             tbb::internal::spin_wait_until_eq( core_count_inited, 2 );
2157         }
2158     }
2159     return core_count;
2160 }
2161 
2162 template<typename Connection>
2163 scheduler<Connection>::scheduler( Connection& conn ) : uid(GetSchedulerId()), my_conn(conn) {}
2164 
2165 template<>
2166 scheduler<tbb_connection_v2>::scheduler( tbb_connection_v2& conn ) : uid(GetSchedulerId()), my_conn(conn)
2167 {
2168     rml::client& cl = my_conn.client();
2169     unsigned max_job_count = cl.max_job_count();
2170     unsigned count = get_processor_count();
2171     __TBB_ASSERT( max_job_count>0, "max job count must be positive" );
2172     __TBB_ASSERT( count>1, "The processor count must be greater than 1" );
2173     if( max_job_count>count-1) max_job_count = count-1;
2174     CREATE_SCHEDULER_POLICY( my_policy, 0, max_job_count, cl.min_stack_size() );
2175 }
2176 
2177 #if __RML_REMOVE_VIRTUAL_PROCESSORS_DISABLED
2178 template<>
2179 void scheduler<tbb_connection_v2>::RemoveVirtualProcessors( IVirtualProcessorRoot**, unsigned int)
2180 {
2181 }
2182 #else
2183 template<>
2184 void scheduler<tbb_connection_v2>::RemoveVirtualProcessors( IVirtualProcessorRoot** vproots, unsigned int count )
2185 {
2186     if( !my_conn.is_closing() )
2187         my_conn.remove_virtual_processors( vproots, count );
2188 }
2189 #endif
2190 
2191 template<>
2192 void scheduler<tbb_connection_v2>::NotifyResourcesExternallyIdle( IVirtualProcessorRoot** /*vproots*/, unsigned int /*count*/)
2193 {
2194     __TBB_ASSERT( false, "NotifyResourcesExternallyIdle() is not allowed for TBB" );
2195 }
2196 
2197 template<>
2198 void scheduler<tbb_connection_v2>::NotifyResourcesExternallyBusy( IVirtualProcessorRoot** /*vproots*/, unsigned int /*count*/ )
2199 {
2200     __TBB_ASSERT( false, "NotifyResourcesExternallyBusy() is not allowed for TBB" );
2201 }
2202 
2203 template<>
2204 scheduler<omp_connection_v2>::scheduler( omp_connection_v2& conn ) : uid(GetSchedulerId()), my_conn(conn)
2205 {
2206     unsigned count = get_processor_count();
2207     rml::client& cl = my_conn.client();
2208     __TBB_ASSERT( count>1, "The processor count must be greater than 1" );
2209     CREATE_SCHEDULER_POLICY( my_policy, count-1, count-1, cl.min_stack_size() );
2210 }
2211 
2212 template<>
2213 void scheduler<omp_connection_v2>::RemoveVirtualProcessors( IVirtualProcessorRoot** /*vproots*/, unsigned int /*count*/ ) {
2214     __TBB_ASSERT( false, "RemoveVirtualProcessors() is not allowed for OMP" );
2215 }
2216 
2217 template<>
2218 void scheduler<omp_connection_v2>::NotifyResourcesExternallyIdle( IVirtualProcessorRoot** vproots, unsigned int count ){
2219     if( !my_conn.is_closing() )
2220         my_conn.notify_resources_externally_idle( vproots, count );
2221 }
2222 
2223 template<>
2224 void scheduler<omp_connection_v2>::NotifyResourcesExternallyBusy( IVirtualProcessorRoot** vproots, unsigned int count ){
2225     if( !my_conn.is_closing() )
2226         my_conn.notify_resources_externally_busy( vproots, count );
2227 }
2228 
2229 /* ts_idle, ts_asleep, ts_busy */
2230 void tbb_server_thread::Dispatch( DispatchState* ) {
2231     // Activate() will resume a thread right after Deactivate() as if it returns from the call
2232     tbb_connection_v2* tbb_conn = static_cast<tbb_connection_v2*>(my_conn);
2233     make_job( *tbb_conn, *this );
2234 
2235     for( ;; ) {
2236         // Try to wake some tbb threads if the balance is positive.
2237         // When a thread is added by ConcRT and enter here for the first time,
2238         // the thread may wake itself up (i.e., atomically change its state to ts_busy.
2239         if( the_balance>0 )
2240              wakeup_some_tbb_threads();
2241         if( read_state()!=ts_busy )
2242             if( sleep_perhaps() )
2243                 return;
2244         if( terminate )
2245             if( initiate_termination() )
2246                 return;
2247         if( read_state()==ts_busy ) {
2248             // this thread has a coin (i.e., state=ts_busy; it should trip to the scheduler at least once
2249             if ( tbb_conn->has_slack() ) {
2250                 do {
2251                     tbb_conn->try_process( *wait_for_job() );
2252                 } while( tbb_conn->has_slack() && the_balance>=0 && !is_removed() );
2253             }
2254             __TBB_ASSERT( read_state()==ts_busy, "thread is not in busy state after returning from process()" );
2255             // see remove_virtual_processors()
2256             if( my_state.compare_and_swap( ts_idle, ts_busy )==ts_busy ) {
2257                 int bal = ++the_balance;
2258                 if( tbb_conn->has_slack() ) {
2259                     // slack is positive, volunteer to help
2260                     bal = --the_balance;  // try to grab the coin back
2261                     if( bal>=0 ) { // got the coin back
2262                         if( my_state.compare_and_swap( ts_busy, ts_idle )!=ts_idle )
2263                             ++the_balance; // someone else enlisted me.
2264                         // else my_state is ts_busy, I will come back to tbb_conn->try_process().
2265                     } else {
2266                         // overdraft. return the coin
2267                         ++the_balance;
2268                     }
2269                 } // else the new request will see my changes to state & the_balance.
2270             } else {
2271                 __TBB_ASSERT( false, "someone tampered with my state" );
2272             }
2273         } // someone else might set the state to something other than ts_idle
2274     }
2275 }
2276 
2277 void omp_server_thread::Dispatch( DispatchState* ) {
2278     // Activate() will resume a thread right after Deactivate() as if it returns from the call
2279     make_job( *static_cast<omp_connection_v2*>(my_conn), *this );
2280 
2281     for( ;; ) {
2282         if( read_state()!=ts_busy )
2283             sleep_perhaps();
2284         if( terminate ) {
2285             if( initiate_termination() )
2286                 return;
2287         }
2288         if( read_state()==ts_busy ) {
2289             omp_data.consume();
2290             __TBB_ASSERT( read_state()==ts_busy, "thread is not in busy state after returning from process()" );
2291             my_thread_map.adjust_balance( 1 );
2292             set_state( ts_idle );
2293         }
2294         // someone else might set the state to something other than ts_idle
2295     }
2296 }
2297 
2298 //! Attempt to change a thread's state to ts_omp_busy, and waking it up if necessary.
2299 thread_grab_t server_thread_rep::try_grab_for() {
2300     thread_grab_t res = wk_failed;
2301     thread_state_t s = read_state();
2302     switch( s ) {
2303     case ts_asleep:
2304         if( wakeup( ts_busy, ts_asleep ) )
2305             res = wk_from_asleep;
2306         __TBB_ASSERT( res==wk_failed||read_state()==ts_busy, NULL );
2307         break;
2308     case ts_idle:
2309         if( my_state.compare_and_swap( ts_busy, ts_idle )==ts_idle )
2310             res = wk_from_idle;
2311         // At this point a thread is grabbed (i.e., its state has  changed to ts_busy.
2312         // It is possible that the thread 1) processes the job, returns from process() and
2313         // sets its state ts_idle again.  In some cases, it even sets its state to ts_asleep.
2314         break;
2315     default:
2316         break;
2317     }
2318     return res;
2319 }
2320 
2321 bool tbb_server_thread::switch_out() {
2322     thread_state_t s = read_state();
2323     __TBB_ASSERT( s==ts_asleep||s==ts_busy, NULL );
2324     // This thread comes back from the TBB scheduler, and changed its state to ts_asleep successfully.
2325     // The master enlisted it and woke it up by Activate()'ing it; now it is emerging from Deactivated().
2326     // ConcRT requested for removal of the vp associated with the thread, and RML marks it removed.
2327     // Now, it has ts_busy, and removed. -- we should remove it.
2328     IExecutionResource* old_vp = my_execution_resource;
2329     if( s==ts_busy ) {
2330         ++the_balance;
2331         my_state = ts_asleep;
2332     }
2333     IThreadProxy* proxy = my_proxy;
2334     __TBB_ASSERT( proxy, NULL );
2335     my_execution_resource = (IExecutionResource*) c_remove_prepare;
2336     old_vp->Remove( my_scheduler );
2337     my_execution_resource = (IExecutionResource*) c_remove_returned;
2338     int cnt = --activation_count;
2339     __TBB_ASSERT_EX( cnt==0||cnt==1, "too many activations?" );
2340     proxy->SwitchOut();
2341     if( terminate ) {
2342         bool activated = activation_count==1;
2343 #if TBB_USE_ASSERT
2344         /* In a rare sequence of events, a thread comes out of SwitchOut with activation_count==1.
2345          * 1) The thread is SwitchOut'ed.
2346          * 2) AddVirtualProcessors() arrived and the thread is Activated.
2347          * 3) The thread is coming out of SwitchOut().
2348          * 4) request_close_connection arrives and inform the thread that it is time to terminate.
2349          * 5) The thread hits the check and falls into the path with 'activated==true'.
2350          * In that case, do the clean-up but do not switch to the thread scavenger; rather simply return to RM.
2351          */
2352         if( activated ) {
2353             // thread is 'revived' in add_virtual_processors after being Activated().
2354             // so, if the thread extra state is still marked 'removed', it will shortly change to 'none'
2355             // i.e., !is_remove().  The thread state is changed to ts_idle before the extra state, so
2356             // the thread's state should be either ts_idle or ts_done.
2357             while( is_removed() )
2358                 __TBB_Yield();
2359             thread_state_t s = read_state();
2360             __TBB_ASSERT( s==ts_idle || s==ts_done, NULL );
2361         }
2362 #endif
2363         __TBB_ASSERT( my_state==ts_asleep||my_state==ts_idle, NULL );
2364         // it is possible that in make_job() the thread may not have a chance to create a job.
2365         // my_job may not be set if the thread did not get a chance to process client's job (i.e., call try_process())
2366         rml::job* j;
2367         if( my_job_automaton.try_plug(j) ) {
2368             __TBB_ASSERT( j, NULL );
2369             my_client.cleanup(*j);
2370             my_conn->remove_client_ref();
2371         }
2372         // Must do remove client reference first, because execution of
2373         // c.remove_ref() can cause *this to be destroyed.
2374         if( !activated )
2375             proxy->SwitchTo( my_thread_map.get_thread_scavenger(), Idle );
2376         my_conn->remove_server_ref();
2377         return true;
2378     }
2379     // We revive a thread in add_virtual_processors() after we Activate the thread on a new virtual processor.
2380     // So briefly wait until the thread's my_execution_resource gets set.
2381     while( get_virtual_processor()==c_remove_returned )
2382         __TBB_Yield();
2383     return false;
2384 }
2385 
2386 bool tbb_server_thread::sleep_perhaps () {
2387     if( terminate ) return false;
2388     thread_state_t s = read_state();
2389     if( s==ts_idle ) {
2390         if( my_state.compare_and_swap( ts_asleep, ts_idle )==ts_idle ) {
2391             // If a thread is between read_state() and compare_and_swap(), and the master tries to terminate,
2392             // the master's compare_and_swap() will fail because the thread's state is ts_idle.
2393             // We need to check if terminate is true or not before letting the thread go to sleep,
2394             // otherwise we will miss the terminate signal.
2395             if( !terminate ) {
2396                 if( !is_removed() ) {
2397                     --activation_count;
2398                     get_virtual_processor()->Deactivate( this );
2399                 }
2400                 if( is_removed() ) {
2401                     if( switch_out() )
2402                         return true;
2403                     __TBB_ASSERT( my_execution_resource>c_remove_returned, NULL );
2404                 }
2405                 // in add_virtual_processors(), when we revive a thread, we change its state after Activate the thread
2406                 // in that case the state may be ts_asleep for a short period
2407                 while( read_state()==ts_asleep )
2408                     __TBB_Yield();
2409             } else {
2410                 if( my_state.compare_and_swap( ts_done, ts_asleep )!=ts_asleep ) {
2411                     --activation_count;
2412                     // unbind() changed my state. It will call Activate(). So issue a matching Deactivate()
2413                     get_virtual_processor()->Deactivate( this );
2414                 }
2415             }
2416         }
2417     } else {
2418         __TBB_ASSERT( s==ts_busy, NULL );
2419     }
2420     return false;
2421 }
2422 
2423 void omp_server_thread::sleep_perhaps () {
2424     if( terminate ) return;
2425     thread_state_t s = read_state();
2426     if( s==ts_idle ) {
2427         if( my_state.compare_and_swap( ts_asleep, ts_idle )==ts_idle ) {
2428             // If a thread is between read_state() and compare_and_swap(), and the master tries to terminate,
2429             // the master's compare_and_swap() will fail because the thread's state is ts_idle.
2430             // We need to check if terminate is true or not before letting the thread go to sleep,
2431             // otherwise we will miss the terminate signal.
2432             if( !terminate ) {
2433                 get_virtual_processor()->Deactivate( this );
2434                 __TBB_ASSERT( !is_removed(), "OMP threads should not be deprived of a virtual processor" );
2435                 __TBB_ASSERT( read_state()!=ts_asleep, NULL );
2436             } else {
2437                 if( my_state.compare_and_swap( ts_done, ts_asleep )!=ts_asleep )
2438                     // unbind() changed my state. It will call Activate(). So issue a matching Deactivate()
2439                     get_virtual_processor()->Deactivate( this );
2440             }
2441         }
2442     } else {
2443         __TBB_ASSERT( s==ts_busy, NULL );
2444     }
2445 }
2446 
2447 bool tbb_server_thread::initiate_termination() {
2448     if( read_state()==ts_busy ) {
2449         int bal = ++the_balance;
2450         if( bal>0 ) wakeup_some_tbb_threads();
2451     }
2452     return destroy_job( (tbb_connection_v2*) my_conn );
2453 }
2454 
2455 template<typename Connection>
2456 bool server_thread_rep::destroy_job( Connection* c ) {
2457     __TBB_ASSERT( my_state!=ts_asleep, NULL );
2458     rml::job* j;
2459     if( my_job_automaton.try_plug(j) ) {
2460         __TBB_ASSERT( j, NULL );
2461         my_client.cleanup(*j);
2462         c->remove_client_ref();
2463     }
2464     // Must do remove client reference first, because execution of
2465     // c.remove_ref() can cause *this to be destroyed.
2466     c->remove_server_ref();
2467     return true;
2468 }
2469 
2470 void thread_map::assist_cleanup( bool assist_null_only ) {
2471     // To avoid deadlock, the current thread *must* help out with cleanups that have not started,
2472     // because the thread that created the job may be busy for a long time.
2473     for( iterator i = begin(); i!=end(); ++i ) {
2474         rml::job* j=0;
2475         server_thread* thr = (*i).second;
2476         job_automaton& ja = thr->my_job_automaton;
2477         if( assist_null_only ? ja.try_plug_null() : ja.try_plug(j) ) {
2478             if( j ) {
2479                 my_client.cleanup(*j);
2480             } else {
2481                 // server thread did not get a chance to create a job.
2482             }
2483             remove_client_ref();
2484         }
2485     }
2486 }
2487 
2488 void thread_map::add_virtual_processors( IVirtualProcessorRoot** vproots, unsigned int count, tbb_connection_v2& conn, ::tbb::spin_mutex& mtx )
2489 {
2490 #if TBB_USE_ASSERT
2491     int req_cnt = ++n_add_vp_requests;
2492     __TBB_ASSERT( req_cnt==1, NULL );
2493 #endif
2494     std::vector<thread_map::iterator> vec(count);
2495     std::vector<tbb_server_thread*> tvec(count);
2496     iterator end;
2497 
2498     {
2499         tbb::spin_mutex::scoped_lock lck( mtx );
2500         __TBB_ASSERT( my_map.size()==0||count==1, NULL );
2501         end = my_map.end(); //remember 'end' at the time of 'find'
2502         // find entries in the map for those VPs that were previously added and then removed.
2503         for( size_t i=0; i<count; ++i ) {
2504             vec[i] = my_map.find( (key_type) vproots[i] );
2505 #if TBB_USE_DEBUG
2506             if( vec[i]!=end ) {
2507                 tbb_server_thread* t = (tbb_server_thread*) (*vec[i]).second;
2508                 IVirtualProcessorRoot* v = t->get_virtual_processor();
2509                 __TBB_ASSERT( v==c_remove_prepare||v==c_remove_returned, NULL );
2510             }
2511 #endif
2512         }
2513 
2514         iterator nxt = my_map.begin();
2515         for( size_t i=0; i<count; ++i ) {
2516             if( vec[i]!=end ) {
2517 #if TBB_USE_ASSERT
2518                 tbb_server_thread* t = (tbb_server_thread*) (*vec[i]).second;
2519                 __TBB_ASSERT( t->read_state()==ts_asleep, NULL );
2520                 IVirtualProcessorRoot* r = t->get_virtual_processor();
2521                 __TBB_ASSERT( r==c_remove_prepare||r==c_remove_returned, NULL );
2522 #endif
2523                 continue;
2524             }
2525 
2526             if( my_unrealized_threads>0 ) {
2527                 --my_unrealized_threads;
2528             } else {
2529                 __TBB_ASSERT( nxt!=end, "nxt should not be thread_map::iterator::end" );
2530                 // find a removed thread context for i
2531                 for( ; nxt!=end; ++nxt ) {
2532                     tbb_server_thread* t = (tbb_server_thread*) (*nxt).second;
2533                     if( t->is_removed() && t->read_state()==ts_asleep && t->get_virtual_processor()==c_remove_returned ) {
2534                         vec[i] = nxt++;
2535                         break;
2536                     }
2537                 }
2538                 // break target
2539                 if( vec[i]==end ) // ignore excessive VP.
2540                     vproots[i] = NULL;
2541             }
2542         }
2543     }
2544 
2545     for( size_t i=0; i<count; ++i ) {
2546         __TBB_ASSERT( !tvec[i], NULL );
2547         if( vec[i]==end ) {
2548             if( vproots[i] ) {
2549                 tvec[i] = my_tbb_allocator.allocate(1);
2550                 new ( tvec[i] ) tbb_server_thread( false, my_scheduler, (IExecutionResource*)vproots[i], &conn, *this, my_client );
2551             }
2552 #if TBB_USE_ASSERT
2553         } else {
2554             tbb_server_thread* t = (tbb_server_thread*) (*vec[i]).second;
2555             __TBB_ASSERT( t->GetProxy(), "Proxy is cleared?" );
2556 #endif
2557         }
2558     }
2559 
2560     {
2561         tbb::spin_mutex::scoped_lock lck( mtx );
2562 
2563         bool closing = is_closing();
2564 
2565         for( size_t i=0; i<count; ++i ) {
2566             if( vec[i]==end ) {
2567                 if( vproots[i] ) {
2568                     thread_map::key_type key = (thread_map::key_type) vproots[i];
2569                     vec[i] = insert( key, (server_thread*) tvec[i] );
2570                     my_client_ref_count.add_ref();
2571                     my_server_ref_count.add_ref();
2572                 }
2573             } else if( !closing ) {
2574                 tbb_server_thread* t = (tbb_server_thread*) (*vec[i]).second;
2575 
2576                 if( (*vec[i]).first!=(thread_map::key_type)vproots[i] ) {
2577                     my_map.erase( vec[i] );
2578                     thread_map::key_type key = (thread_map::key_type) vproots[i];
2579                     __TBB_ASSERT( key, NULL );
2580                     vec[i] = insert( key, t );
2581                 }
2582                 __TBB_ASSERT( t->read_state()==ts_asleep, NULL );
2583                 // We did not decrement server/client ref count when a thread is removed.
2584                 // So, don't increment server/client ref count here.
2585             }
2586         }
2587 
2588         // we could check is_closing() earlier.  That requires marking the newly allocated server_thread objects
2589         // that are not inserted into the thread_map, and deallocate them.  Doing so seems more cumbersome
2590         // than simply adding these to the thread_map and let thread_map's destructor take care of reclamation.
2591         __TBB_ASSERT( closing==is_closing(), NULL );
2592         if( closing ) return;
2593     }
2594 
2595     for( size_t i=0; i<count; ++i ) {
2596         if( vproots[i] ) {
2597             tbb_server_thread* t = (tbb_server_thread*) (*vec[i]).second;
2598             __TBB_ASSERT( tvec[i]!=NULL||t->GetProxy(), "Proxy is cleared?" );
2599             if( t->is_removed() )
2600                 __TBB_ASSERT( t->get_virtual_processor()==c_remove_returned, NULL );
2601             int cnt = ++t->activation_count;
2602             __TBB_ASSERT_EX( cnt==0||cnt==1, NULL );
2603             vproots[i]->Activate( t );
2604             if( t->is_removed() )
2605                 t->revive( my_scheduler, vproots[i], my_client );
2606         }
2607     }
2608 #if TBB_USE_ASSERT
2609     req_cnt = --n_add_vp_requests;
2610     __TBB_ASSERT( req_cnt==0, NULL );
2611 #endif
2612 }
2613 
2614 void thread_map::remove_virtual_processors( IVirtualProcessorRoot** vproots, unsigned count, ::tbb::spin_mutex& mtx ) {
2615     if( my_map.size()==0 )
2616         return;
2617     tbb::spin_mutex::scoped_lock lck( mtx );
2618 
2619     if( is_closing() ) return;
2620 
2621     for( unsigned int c=0; c<count; ++c ) {
2622         iterator i = my_map.find( (key_type) vproots[c] );
2623         if( i==my_map.end() ) {
2624             thread_scavenger_thread* tst = my_thread_scavenger_thread;
2625             if( !tst ) {
2626                 // Remove unknown vp from my scheduler;
2627                 vproots[c]->Remove( my_scheduler );
2628             } else {
2629                 while( (tst=my_thread_scavenger_thread)==c_claimed )
2630                     __TBB_Yield();
2631                 if( vproots[c]!=tst->get_virtual_processor() )
2632                     vproots[c]->Remove( my_scheduler );
2633             }
2634             continue;
2635         }
2636         tbb_server_thread* thr = (tbb_server_thread*) (*i).second;
2637         __TBB_ASSERT( thr->tbb_thread, "incorrect type of server_thread" );
2638         thr->set_removed();
2639         if( thr->read_state()==ts_asleep ) {
2640             while( thr->activation_count>0 ) {
2641                 if( thr->get_virtual_processor()<=c_remove_returned )
2642                     break;
2643                 __TBB_Yield();
2644             }
2645             if( thr->get_virtual_processor()>c_remove_returned ) {
2646                 // the thread is in Deactivated state
2647                 ++thr->activation_count;
2648                 // wake the thread up so that it Switches Out itself.
2649                 thr->get_virtual_processor()->Activate( thr );
2650             } // else, it is Switched Out
2651         } // else the thread will see that it is removed and proceed to switch itself out without Deactivation
2652     }
2653 }
2654 
2655 void thread_map::add_virtual_processors( IVirtualProcessorRoot** vproots, unsigned int count, omp_connection_v2& conn, ::tbb::spin_mutex& mtx )
2656 {
2657     std::vector<thread_map::iterator> vec(count);
2658     std::vector<server_thread*> tvec(count);
2659     iterator end;
2660 
2661     {
2662         tbb::spin_mutex::scoped_lock lck( mtx );
2663         // read the map
2664         end = my_map.end(); //remember 'end' at the time of 'find'
2665         for( size_t i=0; i<count; ++i )
2666             vec[i] = my_map.find( (key_type) vproots[i] );
2667     }
2668 
2669     for( size_t i=0; i<count; ++i ) {
2670         __TBB_ASSERT( !tvec[i], NULL );
2671         if( vec[i]==end ) {
2672             tvec[i] = my_omp_allocator.allocate(1);
2673             new ( tvec[i] ) omp_server_thread( false, my_scheduler, (IExecutionResource*)vproots[i], &conn, *this, my_client );
2674         }
2675     }
2676 
2677     {
2678         tbb::spin_mutex::scoped_lock lck( mtx );
2679 
2680         for( size_t i=0; i<count; ++i ) {
2681             if( vec[i]==my_map.end() ) {
2682                 thread_map::key_type key = (thread_map::key_type) vproots[i];
2683                 vec[i] = insert( key, tvec[i] );
2684                 my_client_ref_count.add_ref();
2685                 my_server_ref_count.add_ref();
2686             }
2687         }
2688 
2689         // we could check is_closing() earlier.  That requires marking the newly allocated server_thread objects
2690         // that are not inserted into the thread_map, and deallocate them.  Doing so seems more cumbersome
2691         // than simply adding these to the thread_map and let thread_map's destructor take care of reclamation.
2692         if( is_closing() ) return;
2693     }
2694 
2695     for( size_t i=0; i<count; ++i )
2696         vproots[i]->Activate( (*vec[i]).second );
2697 
2698     {
2699         tbb::spin_mutex::scoped_lock lck( mtx );
2700         for( size_t i=0; i<count; ++i )
2701             original_exec_resources.push_back( vproots[i] );
2702     }
2703 }
2704 
2705 void thread_map::mark_virtual_processors_as_lent( IVirtualProcessorRoot** vproots, unsigned count, ::tbb::spin_mutex& mtx ) {
2706     tbb::spin_mutex::scoped_lock lck( mtx );
2707 
2708     if( is_closing() ) return;
2709 
2710     iterator end = my_map.end();
2711     for( unsigned int c=0; c<count; ++c ) {
2712         iterator i = my_map.find( (key_type) vproots[c] );
2713         if( i==end ) {
2714             // The vproc has not been added to the map in create_oversubscribers()
2715             my_map.insert( hash_map_type::value_type( (key_type) vproots[c], (server_thread*)1 ) );
2716         } else {
2717             server_thread* thr = (*i).second;
2718             if( ((uintptr_t)thr)&~(uintptr_t)1 ) {
2719                 __TBB_ASSERT( !thr->is_removed(), "incorrectly removed" );
2720                 ((omp_server_thread*)thr)->set_lent();
2721             }
2722         }
2723     }
2724 }
2725 
2726 void thread_map::create_oversubscribers( unsigned n, std::vector<server_thread*>& thr_vec, omp_connection_v2& conn, ::tbb::spin_mutex& mtx ) {
2727     std::vector<IExecutionResource*> curr_exec_rsc;
2728     {
2729         tbb::spin_mutex::scoped_lock lck( mtx );
2730         curr_exec_rsc = original_exec_resources; // copy construct
2731     }
2732     typedef std::vector<IExecutionResource*>::iterator iterator_er;
2733     typedef ::std::vector<std::pair<hash_map_type::key_type, hash_map_type::mapped_type> > hash_val_vector_t;
2734     hash_val_vector_t v_vec(n);
2735     iterator_er begin = curr_exec_rsc.begin();
2736     iterator_er end   = curr_exec_rsc.end();
2737     iterator_er i = begin;
2738     for( unsigned c=0; c<n; ++c ) {
2739         IVirtualProcessorRoot* vpr = my_scheduler_proxy->CreateOversubscriber( *i );
2740         omp_server_thread* t = new ( my_omp_allocator.allocate(1) ) omp_server_thread( true, my_scheduler, (IExecutionResource*)vpr, &conn, *this, my_client );
2741         thr_vec[c] = t;
2742         v_vec[c] = hash_map_type::value_type( (key_type) vpr, t );
2743         if( ++i==end ) i = begin;
2744     }
2745 
2746     {
2747         tbb::spin_mutex::scoped_lock lck( mtx );
2748 
2749         if( is_closing() ) return;
2750 
2751         iterator end = my_map.end();
2752         unsigned c = 0;
2753         for( hash_val_vector_t::iterator vi=v_vec.begin(); vi!=v_vec.end(); ++vi, ++c ) {
2754             iterator i = my_map.find( (key_type) (*vi).first );
2755             if( i==end ) {
2756                 my_map.insert( *vi );
2757             } else {
2758                 // the vproc has not been added to the map in mark_virtual_processors_as_returned();
2759                 uintptr_t lent = (uintptr_t) (*i).second;
2760                 __TBB_ASSERT( lent<=1, "vproc map entry added incorrectly?");
2761                 (*i).second = thr_vec[c];
2762                 if( lent )
2763                     ((omp_server_thread*)thr_vec[c])->set_lent();
2764                 else
2765                     ((omp_server_thread*)thr_vec[c])->set_returned();
2766             }
2767             my_client_ref_count.add_ref();
2768             my_server_ref_count.add_ref();
2769         }
2770     }
2771 }
2772 
2773 void thread_map::wakeup_tbb_threads( int c, ::tbb::spin_mutex& mtx ) {
2774     std::vector<tbb_server_thread*> vec(c);
2775 
2776     size_t idx = 0;
2777     {
2778         tbb::spin_mutex::scoped_lock lck( mtx );
2779 
2780         if( is_closing() ) return;
2781         // only one RML thread is in here to wake worker threads up.
2782 
2783         int bal = the_balance;
2784         int cnt = c<bal ? c : bal;
2785 
2786         if( cnt<=0 ) { return; }
2787 
2788         for( iterator i=begin(); i!=end(); ++i ) {
2789             tbb_server_thread* thr = (tbb_server_thread*) (*i).second;
2790             // ConcRT RM should take threads away from TBB scheduler instead of lending them to another scheduler
2791             if( thr->is_removed() )
2792                 continue;
2793 
2794             if( --the_balance>=0 ) {
2795                 thread_grab_t res;
2796                 while( (res=thr->try_grab_for())!=wk_from_idle ) {
2797                     if( res==wk_from_asleep ) {
2798                         vec[idx++] = thr;
2799                         break;
2800                     } else {
2801                         thread_state_t s = thr->read_state();
2802                         if( s==ts_busy ) {// failed because already assigned. move on.
2803                             ++the_balance;
2804                             goto skip;
2805                         }
2806                     }
2807                 }
2808                 thread_state_t s = thr->read_state();
2809                 __TBB_ASSERT_EX( s==ts_busy, "should have set the state to ts_busy" );
2810                 if( --cnt==0 )
2811                     break;
2812             } else {
2813                 // overdraft
2814                 ++the_balance;
2815                 break;
2816             }
2817 skip:
2818             ;
2819         }
2820     }
2821 
2822     for( size_t i=0; i<idx; ++i ) {
2823         tbb_server_thread* thr = vec[i];
2824         __TBB_ASSERT( thr, NULL );
2825         thread_state_t s = thr->read_state();
2826         __TBB_ASSERT_EX( s==ts_busy, "should have set the state to ts_busy" );
2827         ++thr->activation_count;
2828         thr->get_virtual_processor()->Activate( thr );
2829     }
2830 
2831 }
2832 
2833 void thread_map::mark_virtual_processors_as_returned( IVirtualProcessorRoot** vprocs, unsigned int count, tbb::spin_mutex& mtx ) {
2834     {
2835         tbb::spin_mutex::scoped_lock lck( mtx );
2836 
2837         if( is_closing() ) return;
2838 
2839         iterator end = my_map.end();
2840         for(unsigned c=0; c<count; ++c ) {
2841             iterator i = my_map.find( (key_type) vprocs[c] );
2842             if( i==end ) {
2843                 // the vproc has not been added to the map in create_oversubscribers()
2844                 my_map.insert( hash_map_type::value_type( (key_type) vprocs[c], static_cast<server_thread*>(0) ) );
2845             } else {
2846                 omp_server_thread* thr = (omp_server_thread*) (*i).second;
2847                 if( ((uintptr_t)thr)&~(uintptr_t)1 ) {
2848                     __TBB_ASSERT( !thr->is_removed(), "incorrectly removed" );
2849                     // we should not make any assumption on the initial state of an added vproc.
2850                     thr->set_returned();
2851                 }
2852             }
2853         }
2854     }
2855 }
2856 
2857 
2858 void thread_map::unbind( rml::server& /*server*/, tbb::spin_mutex& mtx ) {
2859     {
2860         tbb::spin_mutex::scoped_lock lck( mtx );
2861         shutdown_in_progress = true;  // ignore any callbacks from ConcRT RM
2862 
2863         // Ask each server_thread to cleanup its job for this server.
2864         for( iterator i = begin(); i!=end(); ++i ) {
2865             server_thread* t = (*i).second;
2866             t->terminate = true;
2867             if( t->is_removed() ) {
2868                 // This is for TBB only as ConcRT RM does not request OMP schedulers to remove virtual processors
2869                 if( t->read_state()==ts_asleep ) {
2870                     __TBB_ASSERT( my_thread_scavenger_thread, "this is TBB connection; thread_scavenger_thread must be allocated" );
2871                     // thread is on its way to switch_out; see remove_virtual_processors() where
2872                     // the thread is Activated() to bring it back from 'Deactivated' in sleep_perhaps()
2873                     // now assume that the thread will go to SwitchOut()
2874 #if TBB_USE_ASSERT
2875                     while( t->get_virtual_processor()>c_remove_returned )
2876                         __TBB_Yield();
2877 #endif
2878                     // A removed thread is supposed to proceed to SwithcOut.
2879                     // There, we remove client&server references.
2880                 }
2881             } else {
2882                 if( t->wakeup( ts_done, ts_asleep ) ) {
2883                     if( t->tbb_thread )
2884                         ++((tbb_server_thread*)t)->activation_count;
2885                     t->get_virtual_processor()->Activate( t );
2886                     // We mark in the thread_map such that when termination sequence started, we ignore
2887                     // all notification from ConcRT RM.
2888                 }
2889             }
2890         }
2891     }
2892     // Remove extra ref to client.
2893     remove_client_ref();
2894 
2895     if( my_thread_scavenger_thread ) {
2896         thread_scavenger_thread* tst;
2897         while( (tst=my_thread_scavenger_thread)==c_claimed )
2898             __TBB_Yield();
2899 #if TBB_USE_ASSERT
2900         ++my_thread_scavenger_thread->activation_count;
2901 #endif
2902         tst->get_virtual_processor()->Activate( tst );
2903     }
2904 }
2905 
2906 #if !__RML_REMOVE_VIRTUAL_PROCESSORS_DISABLED
2907 void thread_map::allocate_thread_scavenger( IExecutionResource* v )
2908 {
2909     if( my_thread_scavenger_thread>c_claimed ) return;
2910     thread_scavenger_thread* c = my_thread_scavenger_thread.fetch_and_store((thread_scavenger_thread*)c_claimed);
2911     if( c==NULL ) { // successfully claimed
2912         add_server_ref();
2913 #if TBB_USE_ASSERT
2914         ++n_thread_scavengers_created;
2915 #endif
2916         __TBB_ASSERT( v, NULL );
2917         IVirtualProcessorRoot* vpr = my_scheduler_proxy->CreateOversubscriber( v );
2918         my_thread_scavenger_thread = c = new ( my_scavenger_allocator.allocate(1) ) thread_scavenger_thread( my_scheduler, vpr, *this );
2919 #if TBB_USE_ASSERT
2920         ++c->activation_count;
2921 #endif
2922         vpr->Activate( c );
2923     } else if( c>c_claimed ) {
2924         my_thread_scavenger_thread = c;
2925     }
2926 }
2927 #endif
2928 
2929 void thread_scavenger_thread::Dispatch( DispatchState* )
2930 {
2931     __TBB_ASSERT( my_proxy, NULL );
2932 #if TBB_USE_ASSERT
2933     --activation_count;
2934 #endif
2935     get_virtual_processor()->Deactivate( this );
2936     for( thread_map::iterator i=my_thread_map.begin(); i!=my_thread_map.end(); ++i ) {
2937         tbb_server_thread* t = (tbb_server_thread*) (*i).second;
2938         if( t->read_state()==ts_asleep && t->is_removed() ) {
2939             while( t->get_execution_resource()!=c_remove_returned )
2940                 __TBB_Yield();
2941             my_proxy->SwitchTo( t, Blocking );
2942         }
2943     }
2944     get_virtual_processor()->Remove( my_scheduler );
2945     my_thread_map.remove_server_ref();
2946     // signal to the connection scavenger that i am done with the map.
2947     __TBB_ASSERT( activation_count==1, NULL );
2948     set_state( ts_done );
2949 }
2950 
2951 //! Windows "DllMain" that handles startup and shutdown of dynamic library.
2952 extern "C" bool WINAPI DllMain( HINSTANCE /*hinstDLL*/, DWORD fwdReason, LPVOID lpvReserved ) {
2953     void assist_cleanup_connections();
2954     if( fwdReason==DLL_PROCESS_DETACH ) {
2955         // dll is being unloaded
2956         if( !lpvReserved ) // if FreeLibrary has been called
2957             assist_cleanup_connections();
2958     }
2959     return true;
2960 }
2961 
2962 void free_all_connections( uintptr_t conn_ex ) {
2963     while( conn_ex ) {
2964         bool is_tbb = (conn_ex&2)>0;
2965         //clear extra bits
2966         uintptr_t curr_conn = conn_ex & ~(uintptr_t)3;
2967         __TBB_ASSERT( curr_conn, NULL );
2968 
2969         // Wait for worker threads to return
2970         if( is_tbb ) {
2971             tbb_connection_v2* tbb_conn = reinterpret_cast<tbb_connection_v2*>(curr_conn);
2972             conn_ex = reinterpret_cast<uintptr_t>(tbb_conn->next_conn);
2973             while( tbb_conn->my_thread_map.remove_server_ref()>0 )
2974                 __TBB_Yield();
2975             delete tbb_conn;
2976         } else {
2977             omp_connection_v2* omp_conn = reinterpret_cast<omp_connection_v2*>(curr_conn);
2978             conn_ex = reinterpret_cast<uintptr_t>(omp_conn->next_conn);
2979             while( omp_conn->my_thread_map.remove_server_ref()>0 )
2980                 __TBB_Yield();
2981             delete omp_conn;
2982         }
2983     }
2984 }
2985 
2986 void assist_cleanup_connections()
2987 {
2988     //signal to connection_scavenger_thread to terminate
2989     uintptr_t tail = connections_to_reclaim.tail;
2990     while( connections_to_reclaim.tail.compare_and_swap( garbage_connection_queue::plugged, tail )!=tail ) {
2991         __TBB_Yield();
2992         tail = connections_to_reclaim.tail;
2993     }
2994 
2995     __TBB_ASSERT( connection_scavenger.state==ts_busy || connection_scavenger.state==ts_asleep, NULL );
2996     // Scavenger thread may be busy freeing connections
2997     DWORD thr_exit_code = STILL_ACTIVE;
2998     while( connection_scavenger.state==ts_busy ) {
2999         if( GetExitCodeThread( connection_scavenger.thr_handle, &thr_exit_code )>0 )
3000             if( thr_exit_code!=STILL_ACTIVE )
3001                 break;
3002         __TBB_Yield();
3003         thr_exit_code = STILL_ACTIVE;
3004     }
3005     if( connection_scavenger.state==ts_asleep && thr_exit_code==STILL_ACTIVE )
3006         connection_scavenger.wakeup(); // wake the connection scavenger thread up
3007 
3008     // it is possible that the connection scavenger thread already exited.  Take over its responsibility.
3009     if( tail && connections_to_reclaim.tail!=garbage_connection_queue::plugged_acked ) {
3010         // atomically claim the head of the list.
3011         uintptr_t head = connections_to_reclaim.head.fetch_and_store( garbage_connection_queue::empty );
3012         if( head==garbage_connection_queue::empty )
3013             head = tail;
3014         connection_scavenger.process_requests( head );
3015     }
3016     __TBB_ASSERT( connections_to_reclaim.tail==garbage_connection_queue::plugged||connections_to_reclaim.tail==garbage_connection_queue::plugged_acked, "someone else added a request after termination has initiated" );
3017     __TBB_ASSERT( (unsigned)the_balance==the_default_concurrency, NULL );
3018 }
3019 
3020 void connection_scavenger_thread::sleep_perhaps() {
3021     uintptr_t tail = connections_to_reclaim.tail;
3022     // connections_to_reclaim.tail==garbage_connection_queue::plugged --> terminate,
3023     // connections_to_reclaim.tail>garbage_connection_queue::plugged : we got work to do
3024     if( tail>=garbage_connection_queue::plugged ) return;
3025     __TBB_ASSERT( !tail, NULL );
3026     thread_monitor::cookie c;
3027     monitor.prepare_wait(c);
3028     if( state.compare_and_swap( ts_asleep, ts_busy )==ts_busy ) {
3029         if( connections_to_reclaim.tail!=garbage_connection_queue::plugged ) {
3030             monitor.commit_wait(c);
3031             // Someone else woke me up.  The compare_and_swap further below deals with spurious wakeups.
3032         } else {
3033             monitor.cancel_wait();
3034         }
3035         thread_state_t s = state;
3036         if( s==ts_asleep ) // if spurious wakeup.
3037             state.compare_and_swap( ts_busy, ts_asleep );
3038             // I woke myself up, either because I cancelled the wait or suffered a spurious wakeup.
3039     } else {
3040         __TBB_ASSERT( false, "someone else tampered with my state" );
3041     }
3042     __TBB_ASSERT( state==ts_busy, "a thread can only put itself to sleep" );
3043 }
3044 
3045 void connection_scavenger_thread::process_requests( uintptr_t conn_ex )
3046 {
3047     __TBB_ASSERT( conn_ex>1, NULL );
3048     __TBB_ASSERT( n_scavenger_threads==1||connections_to_reclaim.tail==garbage_connection_queue::plugged, "more than one connection_scavenger_thread being active?" );
3049 
3050     bool done = false;
3051     while( !done ) {
3052         bool is_tbb = (conn_ex&2)>0;
3053         //clear extra bits
3054         uintptr_t curr_conn = conn_ex & ~(uintptr_t)3;
3055 
3056         // no contention. there is only one connection_scavenger_thread!!
3057         uintptr_t next_conn;
3058         tbb_connection_v2* tbb_conn = NULL;
3059         omp_connection_v2* omp_conn = NULL;
3060         // Wait for worker threads to return
3061         if( is_tbb ) {
3062             tbb_conn = reinterpret_cast<tbb_connection_v2*>(curr_conn);
3063             next_conn = reinterpret_cast<uintptr_t>(tbb_conn->next_conn);
3064             while( tbb_conn->my_thread_map.get_server_ref_count()>1 )
3065                 __TBB_Yield();
3066         } else {
3067             omp_conn = reinterpret_cast<omp_connection_v2*>(curr_conn);
3068             next_conn = reinterpret_cast<uintptr_t>(omp_conn->next_conn);
3069             while( omp_conn->my_thread_map.get_server_ref_count()>1 )
3070                 __TBB_Yield();
3071         }
3072 
3073         //someone else may try to write into this connection object.
3074         //So access next_conn field first before remove the extra server ref count.
3075 
3076         if( next_conn==0 ) {
3077             uintptr_t tail = connections_to_reclaim.tail;
3078             if( tail==garbage_connection_queue::plugged ) {
3079                 tail = garbage_connection_queue::plugged_acked; // connection scavenger saw the flag, and it freed all connections.
3080                 done = true;
3081             } else if( tail==conn_ex ) {
3082                 if( connections_to_reclaim.tail.compare_and_swap( garbage_connection_queue::empty, tail )==tail ) {
3083                     __TBB_ASSERT( !connections_to_reclaim.head, NULL );
3084                     done = true;
3085                 }
3086             }
3087 
3088             if( !done ) {
3089                 // A new connection to close is added to connections_to_reclaim.tail;
3090                 // Wait for curr_conn->next_conn to be set.
3091                 if( is_tbb ) {
3092                     while( !tbb_conn->next_conn )
3093                         __TBB_Yield();
3094                     conn_ex = reinterpret_cast<uintptr_t>(tbb_conn->next_conn);
3095                 } else {
3096                     while( !omp_conn->next_conn )
3097                         __TBB_Yield();
3098                     conn_ex = reinterpret_cast<uintptr_t>(omp_conn->next_conn);
3099                 }
3100             }
3101         } else {
3102             conn_ex = next_conn;
3103         }
3104         __TBB_ASSERT( conn_ex, NULL );
3105         if( is_tbb )
3106             // remove extra server ref count; this will trigger Shutdown/Release of ConcRT RM
3107             tbb_conn->remove_server_ref();
3108         else
3109             // remove extra server ref count; this will trigger Shutdown/Release of ConcRT RM
3110             omp_conn->remove_server_ref();
3111     }
3112 }
3113 
3114 __RML_DECL_THREAD_ROUTINE connection_scavenger_thread::thread_routine( void* arg ) {
3115     connection_scavenger_thread* thr = (connection_scavenger_thread*) arg;
3116     thr->state = ts_busy;
3117     thr->thr_handle = GetCurrentThread();
3118 #if TBB_USE_ASSERT
3119     ++thr->n_scavenger_threads;
3120 #endif
3121     for(;;) {
3122         __TBB_Yield();
3123         thr->sleep_perhaps();
3124         if( connections_to_reclaim.tail==garbage_connection_queue::plugged || connections_to_reclaim.tail==garbage_connection_queue::plugged_acked ) {
3125             thr->state = ts_asleep;
3126             return 0;
3127         }
3128 
3129         __TBB_ASSERT( connections_to_reclaim.tail!=garbage_connection_queue::plugged_acked, NULL );
3130         __TBB_ASSERT( connections_to_reclaim.tail>garbage_connection_queue::plugged && (connections_to_reclaim.tail&garbage_connection_queue::plugged)==0 , NULL );
3131         while( connections_to_reclaim.head==garbage_connection_queue::empty )
3132             __TBB_Yield();
3133         uintptr_t head = connections_to_reclaim.head.fetch_and_store( garbage_connection_queue::empty );
3134         thr->process_requests( head );
3135         wakeup_some_tbb_threads();
3136     }
3137 }
3138 
3139 template<typename Server, typename Client>
3140 void connection_scavenger_thread::add_request( generic_connection<Server,Client>* conn_to_close )
3141 {
3142     uintptr_t conn_ex = (uintptr_t)conn_to_close | (connection_traits<Server,Client>::is_tbb<<1);
3143     __TBB_ASSERT( !conn_to_close->next_conn, NULL );
3144     const uintptr_t old_tail_ex = connections_to_reclaim.tail.fetch_and_store(conn_ex);
3145     __TBB_ASSERT( old_tail_ex==0||old_tail_ex>garbage_connection_queue::plugged_acked, "Unloading DLL called while this connection is being closed?" );
3146 
3147     if( old_tail_ex==garbage_connection_queue::empty )
3148         connections_to_reclaim.head = conn_ex;
3149     else {
3150         bool is_tbb = (old_tail_ex&2)>0;
3151         uintptr_t old_tail = old_tail_ex & ~(uintptr_t)3;
3152         if( is_tbb )
3153             reinterpret_cast<tbb_connection_v2*>(old_tail)->next_conn = reinterpret_cast<tbb_connection_v2*>(conn_ex);
3154         else
3155             reinterpret_cast<omp_connection_v2*>(old_tail)->next_conn = reinterpret_cast<omp_connection_v2*>(conn_ex);
3156     }
3157 
3158     if( state==ts_asleep )
3159         wakeup();
3160 }
3161 
3162 template<>
3163 uintptr_t connection_scavenger_thread::grab_and_prepend( generic_connection<tbb_server,tbb_client>* /*last_conn_to_close*/ ) { return 0;}
3164 
3165 template<>
3166 uintptr_t connection_scavenger_thread::grab_and_prepend( generic_connection<omp_server,omp_client>* last_conn_to_close )
3167 {
3168     uintptr_t conn_ex = (uintptr_t)last_conn_to_close;
3169     uintptr_t head = connections_to_reclaim.head.fetch_and_store( garbage_connection_queue::empty );
3170     reinterpret_cast<omp_connection_v2*>(last_conn_to_close)->next_conn = reinterpret_cast<omp_connection_v2*>(head);
3171     return conn_ex;
3172 }
3173 
3174 extern "C" ULONGLONG NTAPI VerSetConditionMask( ULONGLONG, DWORD, BYTE);
3175 
3176 bool is_windows7_or_later ()
3177 {
3178     try {
3179         return GetOSVersion()>=IResourceManager::Win7OrLater;
3180     } catch( ... ) {
3181         return false;
3182     }
3183 }
3184 
3185 #endif /* RML_USE_WCRM */
3186 
3187 template<typename Connection, typename Server, typename Client>
3188 static factory::status_type connect( factory& f, Server*& server, Client& client ) {
3189     server = new Connection(*static_cast<wait_counter*>(f.scratch_ptr),client);
3190     return factory::st_success;
3191 }
3192 
3193 void init_rml_module () {
3194     the_balance = the_default_concurrency = tbb::internal::AvailableHwConcurrency() - 1;
3195 #if RML_USE_WCRM
3196     connection_scavenger.launch();
3197 #endif
3198 }
3199 
3200 extern "C" factory::status_type __RML_open_factory( factory& f, version_type& server_version, version_type client_version ) {
3201     // Hack to keep this library from being closed by causing the first client's dlopen to not have a corresponding dlclose.
3202     // This code will be removed once we figure out how to do shutdown of the RML perfectly.
3203     static tbb::atomic<bool> one_time_flag;
3204     if( one_time_flag.compare_and_swap(true,false)==false) {
3205         __TBB_ASSERT( (size_t)f.library_handle!=factory::c_dont_unload, NULL );
3206 #if _WIN32||_WIN64
3207         f.library_handle = reinterpret_cast<HMODULE>(factory::c_dont_unload);
3208 #else
3209         f.library_handle = reinterpret_cast<void*>(factory::c_dont_unload);
3210 #endif
3211     }
3212     // End of hack
3213 
3214     // Initialize the_balance only once
3215     tbb::internal::atomic_do_once ( &init_rml_module, rml_module_state );
3216 
3217     server_version = SERVER_VERSION;
3218     f.scratch_ptr = 0;
3219     if( client_version==0 ) {
3220         return factory::st_incompatible;
3221 #if RML_USE_WCRM
3222     } else if ( !is_windows7_or_later() ) {
3223 #if TBB_USE_DEBUG
3224         fprintf(stderr, "This version of the RML library requires Windows 7 to run on.\nConnection request denied.\n");
3225 #endif
3226         return factory::st_incompatible;
3227 #endif
3228     } else {
3229 #if TBB_USE_DEBUG
3230         if( client_version<EARLIEST_COMPATIBLE_CLIENT_VERSION )
3231             fprintf(stderr, "This client library is too old for the current RML server.\nThe connection request is granted but oversubscription/undersubscription may occur.\n");
3232 #endif
3233         f.scratch_ptr = new wait_counter;
3234         return factory::st_success;
3235     }
3236 }
3237 
3238 extern "C" void __RML_close_factory( factory& f ) {
3239     if( wait_counter* fc = static_cast<wait_counter*>(f.scratch_ptr) ) {
3240         f.scratch_ptr = 0;
3241         fc->wait();
3242         size_t bal = the_balance;
3243         f.scratch_ptr = (void*)bal;
3244         delete fc;
3245     }
3246 }
3247 
3248 void call_with_build_date_str( ::rml::server_info_callback_t cb, void* arg );
3249 
3250 }} // rml::internal
3251 
3252 namespace tbb {
3253 namespace internal {
3254 namespace rml {
3255 
__TBB_make_rml_server(tbb_factory & f,tbb_server * & server,tbb_client & client)3256 extern "C" tbb_factory::status_type __TBB_make_rml_server( tbb_factory& f, tbb_server*& server, tbb_client& client ) {
3257     return ::rml::internal::connect< ::rml::internal::tbb_connection_v2>(f,server,client);
3258 }
3259 
__TBB_call_with_my_server_info(::rml::server_info_callback_t cb,void * arg)3260 extern "C" void __TBB_call_with_my_server_info( ::rml::server_info_callback_t cb, void* arg ) {
3261     return ::rml::internal::call_with_build_date_str( cb, arg );
3262 }
3263 
3264 }}}
3265 
3266 namespace __kmp {
3267 namespace rml {
3268 
__KMP_make_rml_server(omp_factory & f,omp_server * & server,omp_client & client)3269 extern "C" omp_factory::status_type __KMP_make_rml_server( omp_factory& f, omp_server*& server, omp_client& client ) {
3270     return ::rml::internal::connect< ::rml::internal::omp_connection_v2>(f,server,client);
3271 }
3272 
__KMP_call_with_my_server_info(::rml::server_info_callback_t cb,void * arg)3273 extern "C" void __KMP_call_with_my_server_info( ::rml::server_info_callback_t cb, void* arg ) {
3274     return ::rml::internal::call_with_build_date_str( cb, arg );
3275 }
3276 
3277 }}
3278 
3279 /*
3280  * RML server info
3281  */
3282 #include "version_string.ver"
3283 
3284 #ifndef __TBB_VERSION_STRINGS
3285 #pragma message("Warning: version_string.ver isn't generated properly by version_info.sh script!")
3286 #endif
3287 
3288 // We use the build time as the RML server info. TBB is required to build RML, so we make it the same as the TBB build time.
3289 #ifndef __TBB_DATETIME
3290 #define __TBB_DATETIME __DATE__ " " __TIME__
3291 #endif
3292 
3293 #if !RML_USE_WCRM
3294 #define RML_SERVER_BUILD_TIME "Intel(R) RML library built: " __TBB_DATETIME
3295 #define RML_SERVER_VERSION_ST "Intel(R) RML library version: v" TOSTRING(SERVER_VERSION)
3296 #else
3297 #define RML_SERVER_BUILD_TIME "Intel(R) RML library built: " __TBB_DATETIME
3298 #define RML_SERVER_VERSION_ST "Intel(R) RML library version: v" TOSTRING(SERVER_VERSION) " on ConcRT RM with " RML_THREAD_KIND_STRING
3299 #endif
3300 
3301 namespace rml {
3302 namespace internal {
3303 
call_with_build_date_str(::rml::server_info_callback_t cb,void * arg)3304 void call_with_build_date_str( ::rml::server_info_callback_t cb, void* arg )
3305 {
3306     (*cb)( arg, RML_SERVER_BUILD_TIME );
3307     (*cb)( arg, RML_SERVER_VERSION_ST );
3308 }
3309 }} // rml::internal
3310