1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "oneapi/tbb/global_control.h" // global_control::active_value
18 
19 #include "market.h"
20 #include "main.h"
21 #include "governor.h"
22 #include "arena.h"
23 #include "thread_data.h"
24 #include "itt_notify.h"
25 
26 #include <cstring> // std::memset()
27 
28 namespace tbb {
29 namespace detail {
30 namespace r1 {
31 
32 /** This method must be invoked under my_arenas_list_mutex. **/
select_next_arena(arena * hint)33 arena* market::select_next_arena( arena* hint ) {
34     unsigned next_arena_priority_level = num_priority_levels;
35     if ( hint )
36         next_arena_priority_level = hint->my_priority_level;
37     for ( unsigned idx = 0; idx < next_arena_priority_level; ++idx ) {
38         if ( !my_arenas[idx].empty() )
39             return &*my_arenas[idx].begin();
40     }
41     // don't change if arena with higher priority is not found.
42     return hint;
43 }
44 
insert_arena_into_list(arena & a)45 void market::insert_arena_into_list ( arena& a ) {
46     __TBB_ASSERT( a.my_priority_level < num_priority_levels, nullptr );
47     my_arenas[a.my_priority_level].push_front( a );
48     __TBB_ASSERT( !my_next_arena || my_next_arena->my_priority_level < num_priority_levels, nullptr );
49     my_next_arena = select_next_arena( my_next_arena );
50 }
51 
remove_arena_from_list(arena & a)52 void market::remove_arena_from_list ( arena& a ) {
53     __TBB_ASSERT( a.my_priority_level < num_priority_levels, nullptr );
54     my_arenas[a.my_priority_level].remove( a );
55     if ( my_next_arena == &a )
56         my_next_arena = nullptr;
57     my_next_arena = select_next_arena( my_next_arena );
58 }
59 
60 //------------------------------------------------------------------------
61 // market
62 //------------------------------------------------------------------------
63 
market(unsigned workers_soft_limit,unsigned workers_hard_limit,std::size_t stack_size)64 market::market ( unsigned workers_soft_limit, unsigned workers_hard_limit, std::size_t stack_size )
65     : my_num_workers_hard_limit(workers_hard_limit)
66     , my_num_workers_soft_limit(workers_soft_limit)
67     , my_next_arena(nullptr)
68     , my_ref_count(1)
69     , my_stack_size(stack_size)
70     , my_workers_soft_limit_to_report(workers_soft_limit)
71 {
72     // Once created RML server will start initializing workers that will need
73     // global market instance to get worker stack size
74     my_server = governor::create_rml_server( *this );
75     __TBB_ASSERT( my_server, "Failed to create RML server" );
76 }
77 
calc_workers_soft_limit(unsigned workers_soft_limit,unsigned workers_hard_limit)78 static unsigned calc_workers_soft_limit(unsigned workers_soft_limit, unsigned workers_hard_limit) {
79     if( int soft_limit = market::app_parallelism_limit() )
80         workers_soft_limit = soft_limit-1;
81     else // if user set no limits (yet), use market's parameter
82         workers_soft_limit = max( governor::default_num_threads() - 1, workers_soft_limit );
83     if( workers_soft_limit >= workers_hard_limit )
84         workers_soft_limit = workers_hard_limit-1;
85     return workers_soft_limit;
86 }
87 
add_ref_unsafe(global_market_mutex_type::scoped_lock & lock,bool is_public,unsigned workers_requested,std::size_t stack_size)88 bool market::add_ref_unsafe( global_market_mutex_type::scoped_lock& lock, bool is_public, unsigned workers_requested, std::size_t stack_size ) {
89     market *m = theMarket;
90     if( m ) {
91         ++m->my_ref_count;
92         const unsigned old_public_count = is_public ? m->my_public_ref_count++ : /*any non-zero value*/1;
93         lock.release();
94         if( old_public_count==0 )
95             set_active_num_workers( calc_workers_soft_limit(workers_requested, m->my_num_workers_hard_limit) );
96 
97         // do not warn if default number of workers is requested
98         if( workers_requested != governor::default_num_threads()-1 ) {
99             __TBB_ASSERT( skip_soft_limit_warning > workers_requested,
100                           "skip_soft_limit_warning must be larger than any valid workers_requested" );
101             unsigned soft_limit_to_report = m->my_workers_soft_limit_to_report.load(std::memory_order_relaxed);
102             if( soft_limit_to_report < workers_requested ) {
103                 runtime_warning( "The number of workers is currently limited to %u. "
104                                  "The request for %u workers is ignored. Further requests for more workers "
105                                  "will be silently ignored until the limit changes.\n",
106                                  soft_limit_to_report, workers_requested );
107                 // The race is possible when multiple threads report warnings.
108                 // We are OK with that, as there are just multiple warnings.
109                 unsigned expected_limit = soft_limit_to_report;
110                 m->my_workers_soft_limit_to_report.compare_exchange_strong(expected_limit, skip_soft_limit_warning);
111             }
112 
113         }
114         if( m->my_stack_size < stack_size )
115             runtime_warning( "Thread stack size has been already set to %u. "
116                              "The request for larger stack (%u) cannot be satisfied.\n", m->my_stack_size, stack_size );
117         return true;
118     }
119     return false;
120 }
121 
global_market(bool is_public,unsigned workers_requested,std::size_t stack_size)122 market& market::global_market(bool is_public, unsigned workers_requested, std::size_t stack_size) {
123     global_market_mutex_type::scoped_lock lock( theMarketMutex );
124     if( !market::add_ref_unsafe(lock, is_public, workers_requested, stack_size) ) {
125         // TODO: A lot is done under theMarketMutex locked. Can anything be moved out?
126         if( stack_size == 0 )
127             stack_size = global_control::active_value(global_control::thread_stack_size);
128         // Expecting that 4P is suitable for most applications.
129         // Limit to 2P for large thread number.
130         // TODO: ask RML for max concurrency and possibly correct hard_limit
131         const unsigned factor = governor::default_num_threads()<=128? 4 : 2;
132         // The requested number of threads is intentionally not considered in
133         // computation of the hard limit, in order to separate responsibilities
134         // and avoid complicated interactions between global_control and task_scheduler_init.
135         // The market guarantees that at least 256 threads might be created.
136         const unsigned workers_hard_limit = max(max(factor*governor::default_num_threads(), 256u), app_parallelism_limit());
137         const unsigned workers_soft_limit = calc_workers_soft_limit(workers_requested, workers_hard_limit);
138         // Create the global market instance
139         std::size_t size = sizeof(market);
140         __TBB_ASSERT( __TBB_offsetof(market, my_workers) + sizeof(std::atomic<thread_data*>) == sizeof(market),
141                       "my_workers must be the last data field of the market class");
142         size += sizeof(std::atomic<thread_data*>) * (workers_hard_limit - 1);
143         __TBB_InitOnce::add_ref();
144         void* storage = cache_aligned_allocate(size);
145         std::memset( storage, 0, size );
146         // Initialize and publish global market
147         market* m = new (storage) market( workers_soft_limit, workers_hard_limit, stack_size );
148         if( is_public )
149             m->my_public_ref_count.store(1, std::memory_order_relaxed);
150 #if __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE
151         if (market::is_lifetime_control_present()) {
152             ++m->my_public_ref_count;
153             ++m->my_ref_count;
154         }
155 #endif // __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE
156         theMarket = m;
157         // This check relies on the fact that for shared RML default_concurrency==max_concurrency
158         if ( !governor::UsePrivateRML && m->my_server->default_concurrency() < workers_soft_limit )
159             runtime_warning( "RML might limit the number of workers to %u while %u is requested.\n"
160                     , m->my_server->default_concurrency(), workers_soft_limit );
161     }
162     return *theMarket;
163 }
164 
destroy()165 void market::destroy () {
166     this->market::~market(); // qualified to suppress warning
167     cache_aligned_deallocate( this );
168     __TBB_InitOnce::remove_ref();
169 }
170 
release(bool is_public,bool blocking_terminate)171 bool market::release ( bool is_public, bool blocking_terminate ) {
172     market::enforce([this] { return theMarket == this; }, "Global market instance was destroyed prematurely?");
173     bool do_release = false;
174     {
175         global_market_mutex_type::scoped_lock lock( theMarketMutex );
176         if ( blocking_terminate ) {
177             __TBB_ASSERT( is_public, "Only an object with a public reference can request the blocking terminate" );
178             while ( my_public_ref_count.load(std::memory_order_relaxed) == 1 &&
179                     my_ref_count.load(std::memory_order_relaxed) > 1 ) {
180                 lock.release();
181                 // To guarantee that request_close_connection() is called by the last external thread, we need to wait till all
182                 // references are released. Re-read my_public_ref_count to limit waiting if new external threads are created.
183                 // Theoretically, new private references to the market can be added during waiting making it potentially
184                 // endless.
185                 // TODO: revise why the weak scheduler needs market's pointer and try to remove this wait.
186                 // Note that the market should know about its schedulers for cancellation/exception/priority propagation,
187                 // see e.g. task_group_context::cancel_group_execution()
188                 while ( my_public_ref_count.load(std::memory_order_acquire) == 1 &&
189                         my_ref_count.load(std::memory_order_acquire) > 1 ) {
190                     yield();
191                 }
192                 lock.acquire( theMarketMutex );
193             }
194         }
195         if ( is_public ) {
196             __TBB_ASSERT( theMarket == this, "Global market instance was destroyed prematurely?" );
197             __TBB_ASSERT( my_public_ref_count.load(std::memory_order_relaxed), NULL );
198             --my_public_ref_count;
199         }
200         if ( --my_ref_count == 0 ) {
201             __TBB_ASSERT( !my_public_ref_count.load(std::memory_order_relaxed), NULL );
202             do_release = true;
203             theMarket = NULL;
204         }
205     }
206     if( do_release ) {
207         __TBB_ASSERT( !my_public_ref_count.load(std::memory_order_relaxed),
208             "No public references remain if we remove the market." );
209         // inform RML that blocking termination is required
210         my_join_workers = blocking_terminate;
211         my_server->request_close_connection();
212         return blocking_terminate;
213     }
214     return false;
215 }
216 
update_workers_request()217 int market::update_workers_request() {
218     int old_request = my_num_workers_requested;
219     my_num_workers_requested = min(my_total_demand.load(std::memory_order_relaxed),
220                                    (int)my_num_workers_soft_limit.load(std::memory_order_relaxed));
221 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
222     if (my_mandatory_num_requested > 0) {
223         __TBB_ASSERT(my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0, NULL);
224         my_num_workers_requested = 1;
225     }
226 #endif
227     update_allotment(my_num_workers_requested);
228     return my_num_workers_requested - old_request;
229 }
230 
set_active_num_workers(unsigned soft_limit)231 void market::set_active_num_workers ( unsigned soft_limit ) {
232     market *m;
233 
234     {
235         global_market_mutex_type::scoped_lock lock( theMarketMutex );
236         if ( !theMarket )
237             return; // actual value will be used at market creation
238         m = theMarket;
239         if (m->my_num_workers_soft_limit.load(std::memory_order_relaxed) == soft_limit)
240             return;
241         ++m->my_ref_count;
242     }
243     // have my_ref_count for market, use it safely
244 
245     int delta = 0;
246     {
247         arenas_list_mutex_type::scoped_lock lock( m->my_arenas_list_mutex );
248         __TBB_ASSERT(soft_limit <= m->my_num_workers_hard_limit, NULL);
249 
250 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
251         arena_list_type* arenas = m->my_arenas;
252 
253         if (m->my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0 &&
254             m->my_mandatory_num_requested > 0)
255         {
256             for (unsigned level = 0; level < num_priority_levels; ++level )
257                 for (arena_list_type::iterator it = arenas[level].begin(); it != arenas[level].end(); ++it)
258                     if (it->my_global_concurrency_mode.load(std::memory_order_relaxed))
259                         m->disable_mandatory_concurrency_impl(&*it);
260         }
261         __TBB_ASSERT(m->my_mandatory_num_requested == 0, NULL);
262 #endif
263 
264         m->my_num_workers_soft_limit.store(soft_limit, std::memory_order_release);
265         // report only once after new soft limit value is set
266         m->my_workers_soft_limit_to_report.store(soft_limit, std::memory_order_relaxed);
267 
268 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
269         if (m->my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0) {
270             for (unsigned level = 0; level < num_priority_levels; ++level )
271                 for (arena_list_type::iterator it = arenas[level].begin(); it != arenas[level].end(); ++it)
272                     if (it->has_enqueued_tasks())
273                         m->enable_mandatory_concurrency_impl(&*it);
274         }
275 #endif
276 
277         delta = m->update_workers_request();
278     }
279     // adjust_job_count_estimate must be called outside of any locks
280     if( delta!=0 )
281         m->my_server->adjust_job_count_estimate( delta );
282     // release internal market reference to match ++m->my_ref_count above
283     m->release( /*is_public=*/false, /*blocking_terminate=*/false );
284 }
285 
does_client_join_workers(const rml::tbb_client & client)286 bool governor::does_client_join_workers (const rml::tbb_client &client) {
287     return ((const market&)client).must_join_workers();
288 }
289 
create_arena(int num_slots,int num_reserved_slots,unsigned arena_priority_level,std::size_t stack_size)290 arena* market::create_arena ( int num_slots, int num_reserved_slots, unsigned arena_priority_level,
291                               std::size_t stack_size )
292 {
293     __TBB_ASSERT( num_slots > 0, NULL );
294     __TBB_ASSERT( num_reserved_slots <= num_slots, NULL );
295     // Add public market reference for an external thread/task_arena (that adds an internal reference in exchange).
296     market &m = global_market( /*is_public=*/true, num_slots-num_reserved_slots, stack_size );
297     arena& a = arena::allocate_arena( m, num_slots, num_reserved_slots, arena_priority_level );
298     // Add newly created arena into the existing market's list.
299     arenas_list_mutex_type::scoped_lock lock(m.my_arenas_list_mutex);
300     m.insert_arena_into_list(a);
301     return &a;
302 }
303 
304 /** This method must be invoked under my_arenas_list_mutex. **/
detach_arena(arena & a)305 void market::detach_arena ( arena& a ) {
306     market::enforce([this] { return theMarket == this; }, "Global market instance was destroyed prematurely?");
307     __TBB_ASSERT( !a.my_slots[0].is_occupied(), NULL );
308     if (a.my_global_concurrency_mode.load(std::memory_order_relaxed))
309         disable_mandatory_concurrency_impl(&a);
310 
311     remove_arena_from_list(a);
312     if (a.my_aba_epoch == my_arenas_aba_epoch.load(std::memory_order_relaxed)) {
313         my_arenas_aba_epoch.store(my_arenas_aba_epoch.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed);
314     }
315 }
316 
try_destroy_arena(arena * a,uintptr_t aba_epoch,unsigned priority_level)317 void market::try_destroy_arena ( arena* a, uintptr_t aba_epoch, unsigned priority_level ) {
318     bool locked = true;
319     __TBB_ASSERT( a, NULL );
320     // we hold reference to the market, so it cannot be destroyed at any moment here
321     market::enforce([this] { return theMarket == this; }, NULL);
322     __TBB_ASSERT( my_ref_count!=0, NULL );
323     my_arenas_list_mutex.lock();
324         arena_list_type::iterator it = my_arenas[priority_level].begin();
325         for ( ; it != my_arenas[priority_level].end(); ++it ) {
326             if ( a == &*it ) {
327                 if ( it->my_aba_epoch == aba_epoch ) {
328                     // Arena is alive
329                     // Acquire my_references to sync with threads that just left the arena
330                     if (!a->my_num_workers_requested && !a->my_references.load(std::memory_order_acquire)) {
331                         __TBB_ASSERT(
332                             !a->my_num_workers_allotted.load(std::memory_order_relaxed) &&
333                             (a->my_pool_state == arena::SNAPSHOT_EMPTY || !a->my_max_num_workers),
334                             "Inconsistent arena state"
335                         );
336                         // Arena is abandoned. Destroy it.
337                         detach_arena( *a );
338                         my_arenas_list_mutex.unlock();
339                         locked = false;
340                         a->free_arena();
341                     }
342                 }
343                 if (locked)
344                     my_arenas_list_mutex.unlock();
345                 return;
346             }
347         }
348     my_arenas_list_mutex.unlock();
349 }
350 
351 /** This method must be invoked under my_arenas_list_mutex. **/
arena_in_need(arena_list_type * arenas,arena * hint)352 arena* market::arena_in_need ( arena_list_type* arenas, arena* hint ) {
353     // TODO: make sure arena with higher priority returned only if there are available slots in it.
354     hint = select_next_arena( hint );
355     if ( !hint )
356         return nullptr;
357     arena_list_type::iterator it = hint;
358     unsigned curr_priority_level = hint->my_priority_level;
359     __TBB_ASSERT( it != arenas[curr_priority_level].end(), nullptr );
360     do {
361         arena& a = *it;
362         if ( ++it == arenas[curr_priority_level].end() ) {
363             do {
364                 ++curr_priority_level %= num_priority_levels;
365             } while ( arenas[curr_priority_level].empty() );
366             it = arenas[curr_priority_level].begin();
367         }
368         if( a.num_workers_active() < a.my_num_workers_allotted.load(std::memory_order_relaxed) ) {
369             a.my_references += arena::ref_worker;
370             return &a;
371         }
372     } while ( it != hint );
373     return nullptr;
374 }
375 
arena_in_need(arena * prev)376 arena* market::arena_in_need(arena* prev) {
377     if (my_total_demand.load(std::memory_order_acquire) <= 0)
378         return nullptr;
379     arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex, /*is_writer=*/false);
380     // TODO: introduce three state response: alive, not_alive, no_market_arenas
381     if ( is_arena_alive(prev) )
382         return arena_in_need(my_arenas, prev);
383     return arena_in_need(my_arenas, my_next_arena);
384 }
385 
update_allotment(arena_list_type * arenas,int workers_demand,int max_workers)386 int market::update_allotment ( arena_list_type* arenas, int workers_demand, int max_workers ) {
387     __TBB_ASSERT( workers_demand > 0, nullptr );
388     max_workers = min(workers_demand, max_workers);
389     int unassigned_workers = max_workers;
390     int assigned = 0;
391     int carry = 0;
392     unsigned max_priority_level = num_priority_levels;
393     for (unsigned list_idx = 0; list_idx < num_priority_levels; ++list_idx ) {
394         int assigned_per_priority = min(my_priority_level_demand[list_idx], unassigned_workers);
395         unassigned_workers -= assigned_per_priority;
396         for (arena_list_type::iterator it = arenas[list_idx].begin(); it != arenas[list_idx].end(); ++it) {
397             arena& a = *it;
398             __TBB_ASSERT(a.my_num_workers_requested >= 0, nullptr);
399             __TBB_ASSERT(a.my_num_workers_requested <= int(a.my_max_num_workers)
400                 || (a.my_max_num_workers == 0 && a.my_local_concurrency_requests > 0 && a.my_num_workers_requested == 1), nullptr);
401             if (a.my_num_workers_requested == 0) {
402                 __TBB_ASSERT(!a.my_num_workers_allotted.load(std::memory_order_relaxed), nullptr);
403                 continue;
404             }
405 
406             if (max_priority_level == num_priority_levels) {
407                 max_priority_level = list_idx;
408             }
409 
410             int allotted = 0;
411 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
412             if (my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0) {
413                 __TBB_ASSERT(max_workers == 0 || max_workers == 1, nullptr);
414                 allotted = a.my_global_concurrency_mode.load(std::memory_order_relaxed) &&
415                     assigned < max_workers ? 1 : 0;
416             } else
417 #endif
418             {
419                 int tmp = a.my_num_workers_requested * assigned_per_priority + carry;
420                 allotted = tmp / my_priority_level_demand[list_idx];
421                 carry = tmp % my_priority_level_demand[list_idx];
422                 __TBB_ASSERT(allotted <= a.my_num_workers_requested, nullptr);
423                 __TBB_ASSERT(allotted <= int(a.my_num_slots - a.my_num_reserved_slots), nullptr);
424             }
425             a.my_num_workers_allotted.store(allotted, std::memory_order_relaxed);
426             a.my_is_top_priority.store(list_idx == max_priority_level, std::memory_order_relaxed);
427             assigned += allotted;
428         }
429     }
430     __TBB_ASSERT( 0 <= assigned && assigned <= max_workers, nullptr );
431     return assigned;
432 }
433 
434 /** This method must be invoked under my_arenas_list_mutex. **/
is_arena_in_list(arena_list_type & arenas,arena * a)435 bool market::is_arena_in_list( arena_list_type &arenas, arena *a ) {
436     __TBB_ASSERT( a, "Expected non-null pointer to arena." );
437     for ( arena_list_type::iterator it = arenas.begin(); it != arenas.end(); ++it )
438         if ( a == &*it )
439             return true;
440     return false;
441 }
442 
443 /** This method must be invoked under my_arenas_list_mutex. **/
is_arena_alive(arena * a)444 bool market::is_arena_alive(arena* a) {
445     if ( !a )
446         return false;
447 
448     // Still cannot access internals of the arena since the object itself might be destroyed.
449 
450     for ( unsigned idx = 0; idx < num_priority_levels; ++idx ) {
451         if ( is_arena_in_list( my_arenas[idx], a ) )
452             return true;
453     }
454     return false;
455 }
456 
457 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
enable_mandatory_concurrency_impl(arena * a)458 void market::enable_mandatory_concurrency_impl ( arena *a ) {
459     __TBB_ASSERT(!a->my_global_concurrency_mode.load(std::memory_order_relaxed), NULL);
460     __TBB_ASSERT(my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0, NULL);
461 
462     a->my_global_concurrency_mode.store(true, std::memory_order_relaxed);
463     my_mandatory_num_requested++;
464 }
465 
enable_mandatory_concurrency(arena * a)466 void market::enable_mandatory_concurrency ( arena *a ) {
467     int delta = 0;
468     {
469         arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex);
470         if (my_num_workers_soft_limit.load(std::memory_order_relaxed) != 0 ||
471             a->my_global_concurrency_mode.load(std::memory_order_relaxed))
472             return;
473 
474         enable_mandatory_concurrency_impl(a);
475         delta = update_workers_request();
476     }
477 
478     if (delta != 0)
479         my_server->adjust_job_count_estimate(delta);
480 }
481 
disable_mandatory_concurrency_impl(arena * a)482 void market::disable_mandatory_concurrency_impl(arena* a) {
483     __TBB_ASSERT(a->my_global_concurrency_mode.load(std::memory_order_relaxed), NULL);
484     __TBB_ASSERT(my_mandatory_num_requested > 0, NULL);
485 
486     a->my_global_concurrency_mode.store(false, std::memory_order_relaxed);
487     my_mandatory_num_requested--;
488 }
489 
mandatory_concurrency_disable(arena * a)490 void market::mandatory_concurrency_disable ( arena *a ) {
491     int delta = 0;
492     {
493         arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex);
494         if (!a->my_global_concurrency_mode.load(std::memory_order_relaxed))
495             return;
496         // There is a racy window in advertise_new_work between mandtory concurrency enabling and
497         // setting SNAPSHOT_FULL. It gives a chance to spawn request to disable mandatory concurrency.
498         // Therefore, we double check that there is no enqueued tasks.
499         if (a->has_enqueued_tasks())
500             return;
501 
502         __TBB_ASSERT(my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0, NULL);
503         disable_mandatory_concurrency_impl(a);
504 
505         delta = update_workers_request();
506     }
507     if (delta != 0)
508         my_server->adjust_job_count_estimate(delta);
509 }
510 #endif /* __TBB_ENQUEUE_ENFORCED_CONCURRENCY */
511 
adjust_demand(arena & a,int delta,bool mandatory)512 void market::adjust_demand ( arena& a, int delta, bool mandatory ) {
513     if (!delta) {
514         return;
515     }
516     int target_epoch{};
517     {
518         arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex);
519         __TBB_ASSERT(theMarket != nullptr, "market instance was destroyed prematurely?");
520 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
521         if (mandatory) {
522             __TBB_ASSERT(delta == 1 || delta == -1, nullptr);
523             // Count the number of mandatory requests and proceed only for 0->1 and 1->0 transitions.
524             a.my_local_concurrency_requests += delta;
525             if ((delta > 0 && a.my_local_concurrency_requests != 1) ||
526                 (delta < 0 && a.my_local_concurrency_requests != 0))
527             {
528                 return;
529             }
530         }
531 #endif
532         a.my_total_num_workers_requested += delta;
533         int target_workers = 0;
534         // Cap target_workers into interval [0, a.my_max_num_workers]
535         if (a.my_total_num_workers_requested > 0) {
536 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
537             // At least one thread should be requested when mandatory concurrency
538             int max_num_workers = int(a.my_max_num_workers);
539             if (a.my_local_concurrency_requests > 0 && max_num_workers == 0) {
540                 max_num_workers = 1;
541             }
542 #endif
543             target_workers = min(a.my_total_num_workers_requested, max_num_workers);
544         }
545 
546         delta = target_workers - a.my_num_workers_requested;
547 
548         if (delta == 0) {
549             return;
550         }
551 
552         a.my_num_workers_requested += delta;
553         if (a.my_num_workers_requested == 0) {
554             a.my_num_workers_allotted.store(0, std::memory_order_relaxed);
555         }
556 
557         int total_demand = my_total_demand.load(std::memory_order_relaxed) + delta;
558         my_total_demand.store(total_demand, std::memory_order_relaxed);
559         my_priority_level_demand[a.my_priority_level] += delta;
560         unsigned effective_soft_limit = my_num_workers_soft_limit.load(std::memory_order_relaxed);
561         if (my_mandatory_num_requested > 0) {
562             __TBB_ASSERT(effective_soft_limit == 0, NULL);
563             effective_soft_limit = 1;
564         }
565 
566         update_allotment(effective_soft_limit);
567         if (delta > 0) {
568             // can't overflow soft_limit, but remember values request by arenas in
569             // my_total_demand to not prematurely release workers to RML
570             if (my_num_workers_requested + delta > (int)effective_soft_limit)
571                 delta = effective_soft_limit - my_num_workers_requested;
572         }
573         else {
574             // the number of workers should not be decreased below my_total_demand
575             if (my_num_workers_requested + delta < total_demand)
576                 delta = min(total_demand, (int)effective_soft_limit) - my_num_workers_requested;
577         }
578         my_num_workers_requested += delta;
579         __TBB_ASSERT(my_num_workers_requested <= (int)effective_soft_limit, NULL);
580 
581         target_epoch = my_adjust_demand_target_epoch++;
582     }
583 
584     my_adjust_demand_current_epoch.wait_until(target_epoch, /* context = */ target_epoch, std::memory_order_acquire);
585     // Must be called outside of any locks
586     my_server->adjust_job_count_estimate( delta );
587     my_adjust_demand_current_epoch.exchange(target_epoch + 1);
588     my_adjust_demand_current_epoch.notify_relaxed(target_epoch + 1);
589 }
590 
process(job & j)591 void market::process( job& j ) {
592     thread_data& td = static_cast<thread_data&>(j);
593     // td.my_arena can be dead. Don't access it until arena_in_need is called
594     arena *a = td.my_arena;
595     for (int i = 0; i < 2; ++i) {
596         while ( (a = arena_in_need(a)) ) {
597             a->process(td);
598         }
599         // Workers leave market because there is no arena in need. It can happen earlier than
600         // adjust_job_count_estimate() decreases my_slack and RML can put this thread to sleep.
601         // It might result in a busy-loop checking for my_slack<0 and calling this method instantly.
602         // the yield refines this spinning.
603         if ( !i ) {
604             yield();
605         }
606     }
607 }
608 
cleanup(job & j)609 void market::cleanup( job& j) {
610     market::enforce([this] { return theMarket != this; }, NULL );
611     governor::auto_terminate(&j);
612 }
613 
acknowledge_close_connection()614 void market::acknowledge_close_connection() {
615     destroy();
616 }
617 
create_one_job()618 ::rml::job* market::create_one_job() {
619     unsigned short index = ++my_first_unused_worker_idx;
620     __TBB_ASSERT( index > 0, NULL );
621     ITT_THREAD_SET_NAME(_T("TBB Worker Thread"));
622     // index serves as a hint decreasing conflicts between workers when they migrate between arenas
623     thread_data* td = new(cache_aligned_allocate(sizeof(thread_data))) thread_data{ index, true };
624     __TBB_ASSERT( index <= my_num_workers_hard_limit, NULL );
625     __TBB_ASSERT( my_workers[index - 1].load(std::memory_order_relaxed) == nullptr, NULL );
626     my_workers[index - 1].store(td, std::memory_order_release);
627     return td;
628 }
629 
add_external_thread(thread_data & td)630 void market::add_external_thread(thread_data& td) {
631     context_state_propagation_mutex_type::scoped_lock lock(the_context_state_propagation_mutex);
632     my_masters.push_front(td);
633 }
634 
remove_external_thread(thread_data & td)635 void market::remove_external_thread(thread_data& td) {
636     context_state_propagation_mutex_type::scoped_lock lock(the_context_state_propagation_mutex);
637     my_masters.remove(td);
638 }
639 
640 } // namespace r1
641 } // namespace detail
642 } // namespace tbb
643