1 /*
2 Copyright (c) 2005-2021 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #include "oneapi/tbb/global_control.h" // global_control::active_value
18
19 #include "market.h"
20 #include "main.h"
21 #include "governor.h"
22 #include "arena.h"
23 #include "thread_data.h"
24 #include "itt_notify.h"
25
26 #include <cstring> // std::memset()
27
28 namespace tbb {
29 namespace detail {
30 namespace r1 {
31
32 /** This method must be invoked under my_arenas_list_mutex. **/
select_next_arena(arena * hint)33 arena* market::select_next_arena( arena* hint ) {
34 unsigned next_arena_priority_level = num_priority_levels;
35 if ( hint )
36 next_arena_priority_level = hint->my_priority_level;
37 for ( unsigned idx = 0; idx < next_arena_priority_level; ++idx ) {
38 if ( !my_arenas[idx].empty() )
39 return &*my_arenas[idx].begin();
40 }
41 // don't change if arena with higher priority is not found.
42 return hint;
43 }
44
insert_arena_into_list(arena & a)45 void market::insert_arena_into_list ( arena& a ) {
46 __TBB_ASSERT( a.my_priority_level < num_priority_levels, nullptr );
47 my_arenas[a.my_priority_level].push_front( a );
48 __TBB_ASSERT( !my_next_arena || my_next_arena->my_priority_level < num_priority_levels, nullptr );
49 my_next_arena = select_next_arena( my_next_arena );
50 }
51
remove_arena_from_list(arena & a)52 void market::remove_arena_from_list ( arena& a ) {
53 __TBB_ASSERT( a.my_priority_level < num_priority_levels, nullptr );
54 my_arenas[a.my_priority_level].remove( a );
55 if ( my_next_arena == &a )
56 my_next_arena = nullptr;
57 my_next_arena = select_next_arena( my_next_arena );
58 }
59
60 //------------------------------------------------------------------------
61 // market
62 //------------------------------------------------------------------------
63
market(unsigned workers_soft_limit,unsigned workers_hard_limit,std::size_t stack_size)64 market::market ( unsigned workers_soft_limit, unsigned workers_hard_limit, std::size_t stack_size )
65 : my_num_workers_hard_limit(workers_hard_limit)
66 , my_num_workers_soft_limit(workers_soft_limit)
67 , my_next_arena(nullptr)
68 , my_ref_count(1)
69 , my_stack_size(stack_size)
70 , my_workers_soft_limit_to_report(workers_soft_limit)
71 {
72 // Once created RML server will start initializing workers that will need
73 // global market instance to get worker stack size
74 my_server = governor::create_rml_server( *this );
75 __TBB_ASSERT( my_server, "Failed to create RML server" );
76 }
77
calc_workers_soft_limit(unsigned workers_soft_limit,unsigned workers_hard_limit)78 static unsigned calc_workers_soft_limit(unsigned workers_soft_limit, unsigned workers_hard_limit) {
79 if( int soft_limit = market::app_parallelism_limit() )
80 workers_soft_limit = soft_limit-1;
81 else // if user set no limits (yet), use market's parameter
82 workers_soft_limit = max( governor::default_num_threads() - 1, workers_soft_limit );
83 if( workers_soft_limit >= workers_hard_limit )
84 workers_soft_limit = workers_hard_limit-1;
85 return workers_soft_limit;
86 }
87
add_ref_unsafe(global_market_mutex_type::scoped_lock & lock,bool is_public,unsigned workers_requested,std::size_t stack_size)88 bool market::add_ref_unsafe( global_market_mutex_type::scoped_lock& lock, bool is_public, unsigned workers_requested, std::size_t stack_size ) {
89 market *m = theMarket;
90 if( m ) {
91 ++m->my_ref_count;
92 const unsigned old_public_count = is_public ? m->my_public_ref_count++ : /*any non-zero value*/1;
93 lock.release();
94 if( old_public_count==0 )
95 set_active_num_workers( calc_workers_soft_limit(workers_requested, m->my_num_workers_hard_limit) );
96
97 // do not warn if default number of workers is requested
98 if( workers_requested != governor::default_num_threads()-1 ) {
99 __TBB_ASSERT( skip_soft_limit_warning > workers_requested,
100 "skip_soft_limit_warning must be larger than any valid workers_requested" );
101 unsigned soft_limit_to_report = m->my_workers_soft_limit_to_report.load(std::memory_order_relaxed);
102 if( soft_limit_to_report < workers_requested ) {
103 runtime_warning( "The number of workers is currently limited to %u. "
104 "The request for %u workers is ignored. Further requests for more workers "
105 "will be silently ignored until the limit changes.\n",
106 soft_limit_to_report, workers_requested );
107 // The race is possible when multiple threads report warnings.
108 // We are OK with that, as there are just multiple warnings.
109 unsigned expected_limit = soft_limit_to_report;
110 m->my_workers_soft_limit_to_report.compare_exchange_strong(expected_limit, skip_soft_limit_warning);
111 }
112
113 }
114 if( m->my_stack_size < stack_size )
115 runtime_warning( "Thread stack size has been already set to %u. "
116 "The request for larger stack (%u) cannot be satisfied.\n", m->my_stack_size, stack_size );
117 return true;
118 }
119 return false;
120 }
121
global_market(bool is_public,unsigned workers_requested,std::size_t stack_size)122 market& market::global_market(bool is_public, unsigned workers_requested, std::size_t stack_size) {
123 global_market_mutex_type::scoped_lock lock( theMarketMutex );
124 if( !market::add_ref_unsafe(lock, is_public, workers_requested, stack_size) ) {
125 // TODO: A lot is done under theMarketMutex locked. Can anything be moved out?
126 if( stack_size == 0 )
127 stack_size = global_control::active_value(global_control::thread_stack_size);
128 // Expecting that 4P is suitable for most applications.
129 // Limit to 2P for large thread number.
130 // TODO: ask RML for max concurrency and possibly correct hard_limit
131 const unsigned factor = governor::default_num_threads()<=128? 4 : 2;
132 // The requested number of threads is intentionally not considered in
133 // computation of the hard limit, in order to separate responsibilities
134 // and avoid complicated interactions between global_control and task_scheduler_init.
135 // The market guarantees that at least 256 threads might be created.
136 const unsigned workers_hard_limit = max(max(factor*governor::default_num_threads(), 256u), app_parallelism_limit());
137 const unsigned workers_soft_limit = calc_workers_soft_limit(workers_requested, workers_hard_limit);
138 // Create the global market instance
139 std::size_t size = sizeof(market);
140 __TBB_ASSERT( __TBB_offsetof(market, my_workers) + sizeof(std::atomic<thread_data*>) == sizeof(market),
141 "my_workers must be the last data field of the market class");
142 size += sizeof(std::atomic<thread_data*>) * (workers_hard_limit - 1);
143 __TBB_InitOnce::add_ref();
144 void* storage = cache_aligned_allocate(size);
145 std::memset( storage, 0, size );
146 // Initialize and publish global market
147 market* m = new (storage) market( workers_soft_limit, workers_hard_limit, stack_size );
148 if( is_public )
149 m->my_public_ref_count.store(1, std::memory_order_relaxed);
150 #if __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE
151 if (market::is_lifetime_control_present()) {
152 ++m->my_public_ref_count;
153 ++m->my_ref_count;
154 }
155 #endif // __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE
156 theMarket = m;
157 // This check relies on the fact that for shared RML default_concurrency==max_concurrency
158 if ( !governor::UsePrivateRML && m->my_server->default_concurrency() < workers_soft_limit )
159 runtime_warning( "RML might limit the number of workers to %u while %u is requested.\n"
160 , m->my_server->default_concurrency(), workers_soft_limit );
161 }
162 return *theMarket;
163 }
164
destroy()165 void market::destroy () {
166 this->market::~market(); // qualified to suppress warning
167 cache_aligned_deallocate( this );
168 __TBB_InitOnce::remove_ref();
169 }
170
release(bool is_public,bool blocking_terminate)171 bool market::release ( bool is_public, bool blocking_terminate ) {
172 market::enforce([this] { return theMarket == this; }, "Global market instance was destroyed prematurely?");
173 bool do_release = false;
174 {
175 global_market_mutex_type::scoped_lock lock( theMarketMutex );
176 if ( blocking_terminate ) {
177 __TBB_ASSERT( is_public, "Only an object with a public reference can request the blocking terminate" );
178 while ( my_public_ref_count.load(std::memory_order_relaxed) == 1 &&
179 my_ref_count.load(std::memory_order_relaxed) > 1 ) {
180 lock.release();
181 // To guarantee that request_close_connection() is called by the last external thread, we need to wait till all
182 // references are released. Re-read my_public_ref_count to limit waiting if new external threads are created.
183 // Theoretically, new private references to the market can be added during waiting making it potentially
184 // endless.
185 // TODO: revise why the weak scheduler needs market's pointer and try to remove this wait.
186 // Note that the market should know about its schedulers for cancellation/exception/priority propagation,
187 // see e.g. task_group_context::cancel_group_execution()
188 while ( my_public_ref_count.load(std::memory_order_acquire) == 1 &&
189 my_ref_count.load(std::memory_order_acquire) > 1 ) {
190 yield();
191 }
192 lock.acquire( theMarketMutex );
193 }
194 }
195 if ( is_public ) {
196 __TBB_ASSERT( theMarket == this, "Global market instance was destroyed prematurely?" );
197 __TBB_ASSERT( my_public_ref_count.load(std::memory_order_relaxed), NULL );
198 --my_public_ref_count;
199 }
200 if ( --my_ref_count == 0 ) {
201 __TBB_ASSERT( !my_public_ref_count.load(std::memory_order_relaxed), NULL );
202 do_release = true;
203 theMarket = NULL;
204 }
205 }
206 if( do_release ) {
207 __TBB_ASSERT( !my_public_ref_count.load(std::memory_order_relaxed),
208 "No public references remain if we remove the market." );
209 // inform RML that blocking termination is required
210 my_join_workers = blocking_terminate;
211 my_server->request_close_connection();
212 return blocking_terminate;
213 }
214 return false;
215 }
216
update_workers_request()217 int market::update_workers_request() {
218 int old_request = my_num_workers_requested;
219 my_num_workers_requested = min(my_total_demand.load(std::memory_order_relaxed),
220 (int)my_num_workers_soft_limit.load(std::memory_order_relaxed));
221 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
222 if (my_mandatory_num_requested > 0) {
223 __TBB_ASSERT(my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0, NULL);
224 my_num_workers_requested = 1;
225 }
226 #endif
227 update_allotment(my_num_workers_requested);
228 return my_num_workers_requested - old_request;
229 }
230
set_active_num_workers(unsigned soft_limit)231 void market::set_active_num_workers ( unsigned soft_limit ) {
232 market *m;
233
234 {
235 global_market_mutex_type::scoped_lock lock( theMarketMutex );
236 if ( !theMarket )
237 return; // actual value will be used at market creation
238 m = theMarket;
239 if (m->my_num_workers_soft_limit.load(std::memory_order_relaxed) == soft_limit)
240 return;
241 ++m->my_ref_count;
242 }
243 // have my_ref_count for market, use it safely
244
245 int delta = 0;
246 {
247 arenas_list_mutex_type::scoped_lock lock( m->my_arenas_list_mutex );
248 __TBB_ASSERT(soft_limit <= m->my_num_workers_hard_limit, NULL);
249
250 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
251 arena_list_type* arenas = m->my_arenas;
252
253 if (m->my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0 &&
254 m->my_mandatory_num_requested > 0)
255 {
256 for (unsigned level = 0; level < num_priority_levels; ++level )
257 for (arena_list_type::iterator it = arenas[level].begin(); it != arenas[level].end(); ++it)
258 if (it->my_global_concurrency_mode.load(std::memory_order_relaxed))
259 m->disable_mandatory_concurrency_impl(&*it);
260 }
261 __TBB_ASSERT(m->my_mandatory_num_requested == 0, NULL);
262 #endif
263
264 m->my_num_workers_soft_limit.store(soft_limit, std::memory_order_release);
265 // report only once after new soft limit value is set
266 m->my_workers_soft_limit_to_report.store(soft_limit, std::memory_order_relaxed);
267
268 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
269 if (m->my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0) {
270 for (unsigned level = 0; level < num_priority_levels; ++level )
271 for (arena_list_type::iterator it = arenas[level].begin(); it != arenas[level].end(); ++it)
272 if (it->has_enqueued_tasks())
273 m->enable_mandatory_concurrency_impl(&*it);
274 }
275 #endif
276
277 delta = m->update_workers_request();
278 }
279 // adjust_job_count_estimate must be called outside of any locks
280 if( delta!=0 )
281 m->my_server->adjust_job_count_estimate( delta );
282 // release internal market reference to match ++m->my_ref_count above
283 m->release( /*is_public=*/false, /*blocking_terminate=*/false );
284 }
285
does_client_join_workers(const rml::tbb_client & client)286 bool governor::does_client_join_workers (const rml::tbb_client &client) {
287 return ((const market&)client).must_join_workers();
288 }
289
create_arena(int num_slots,int num_reserved_slots,unsigned arena_priority_level,std::size_t stack_size)290 arena* market::create_arena ( int num_slots, int num_reserved_slots, unsigned arena_priority_level,
291 std::size_t stack_size )
292 {
293 __TBB_ASSERT( num_slots > 0, NULL );
294 __TBB_ASSERT( num_reserved_slots <= num_slots, NULL );
295 // Add public market reference for an external thread/task_arena (that adds an internal reference in exchange).
296 market &m = global_market( /*is_public=*/true, num_slots-num_reserved_slots, stack_size );
297 arena& a = arena::allocate_arena( m, num_slots, num_reserved_slots, arena_priority_level );
298 // Add newly created arena into the existing market's list.
299 arenas_list_mutex_type::scoped_lock lock(m.my_arenas_list_mutex);
300 m.insert_arena_into_list(a);
301 return &a;
302 }
303
304 /** This method must be invoked under my_arenas_list_mutex. **/
detach_arena(arena & a)305 void market::detach_arena ( arena& a ) {
306 market::enforce([this] { return theMarket == this; }, "Global market instance was destroyed prematurely?");
307 __TBB_ASSERT( !a.my_slots[0].is_occupied(), NULL );
308 if (a.my_global_concurrency_mode.load(std::memory_order_relaxed))
309 disable_mandatory_concurrency_impl(&a);
310
311 remove_arena_from_list(a);
312 if (a.my_aba_epoch == my_arenas_aba_epoch.load(std::memory_order_relaxed)) {
313 my_arenas_aba_epoch.store(my_arenas_aba_epoch.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed);
314 }
315 }
316
try_destroy_arena(arena * a,uintptr_t aba_epoch,unsigned priority_level)317 void market::try_destroy_arena ( arena* a, uintptr_t aba_epoch, unsigned priority_level ) {
318 bool locked = true;
319 __TBB_ASSERT( a, NULL );
320 // we hold reference to the market, so it cannot be destroyed at any moment here
321 market::enforce([this] { return theMarket == this; }, NULL);
322 __TBB_ASSERT( my_ref_count!=0, NULL );
323 my_arenas_list_mutex.lock();
324 arena_list_type::iterator it = my_arenas[priority_level].begin();
325 for ( ; it != my_arenas[priority_level].end(); ++it ) {
326 if ( a == &*it ) {
327 if ( it->my_aba_epoch == aba_epoch ) {
328 // Arena is alive
329 // Acquire my_references to sync with threads that just left the arena
330 if (!a->my_num_workers_requested && !a->my_references.load(std::memory_order_acquire)) {
331 __TBB_ASSERT(
332 !a->my_num_workers_allotted.load(std::memory_order_relaxed) &&
333 (a->my_pool_state == arena::SNAPSHOT_EMPTY || !a->my_max_num_workers),
334 "Inconsistent arena state"
335 );
336 // Arena is abandoned. Destroy it.
337 detach_arena( *a );
338 my_arenas_list_mutex.unlock();
339 locked = false;
340 a->free_arena();
341 }
342 }
343 if (locked)
344 my_arenas_list_mutex.unlock();
345 return;
346 }
347 }
348 my_arenas_list_mutex.unlock();
349 }
350
351 /** This method must be invoked under my_arenas_list_mutex. **/
arena_in_need(arena_list_type * arenas,arena * hint)352 arena* market::arena_in_need ( arena_list_type* arenas, arena* hint ) {
353 // TODO: make sure arena with higher priority returned only if there are available slots in it.
354 hint = select_next_arena( hint );
355 if ( !hint )
356 return nullptr;
357 arena_list_type::iterator it = hint;
358 unsigned curr_priority_level = hint->my_priority_level;
359 __TBB_ASSERT( it != arenas[curr_priority_level].end(), nullptr );
360 do {
361 arena& a = *it;
362 if ( ++it == arenas[curr_priority_level].end() ) {
363 do {
364 ++curr_priority_level %= num_priority_levels;
365 } while ( arenas[curr_priority_level].empty() );
366 it = arenas[curr_priority_level].begin();
367 }
368 if( a.num_workers_active() < a.my_num_workers_allotted.load(std::memory_order_relaxed) ) {
369 a.my_references += arena::ref_worker;
370 return &a;
371 }
372 } while ( it != hint );
373 return nullptr;
374 }
375
arena_in_need(arena * prev)376 arena* market::arena_in_need(arena* prev) {
377 if (my_total_demand.load(std::memory_order_acquire) <= 0)
378 return nullptr;
379 arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex, /*is_writer=*/false);
380 // TODO: introduce three state response: alive, not_alive, no_market_arenas
381 if ( is_arena_alive(prev) )
382 return arena_in_need(my_arenas, prev);
383 return arena_in_need(my_arenas, my_next_arena);
384 }
385
update_allotment(arena_list_type * arenas,int workers_demand,int max_workers)386 int market::update_allotment ( arena_list_type* arenas, int workers_demand, int max_workers ) {
387 __TBB_ASSERT( workers_demand > 0, nullptr );
388 max_workers = min(workers_demand, max_workers);
389 int unassigned_workers = max_workers;
390 int assigned = 0;
391 int carry = 0;
392 unsigned max_priority_level = num_priority_levels;
393 for (unsigned list_idx = 0; list_idx < num_priority_levels; ++list_idx ) {
394 int assigned_per_priority = min(my_priority_level_demand[list_idx], unassigned_workers);
395 unassigned_workers -= assigned_per_priority;
396 for (arena_list_type::iterator it = arenas[list_idx].begin(); it != arenas[list_idx].end(); ++it) {
397 arena& a = *it;
398 __TBB_ASSERT(a.my_num_workers_requested >= 0, nullptr);
399 __TBB_ASSERT(a.my_num_workers_requested <= int(a.my_max_num_workers)
400 || (a.my_max_num_workers == 0 && a.my_local_concurrency_requests > 0 && a.my_num_workers_requested == 1), nullptr);
401 if (a.my_num_workers_requested == 0) {
402 __TBB_ASSERT(!a.my_num_workers_allotted.load(std::memory_order_relaxed), nullptr);
403 continue;
404 }
405
406 if (max_priority_level == num_priority_levels) {
407 max_priority_level = list_idx;
408 }
409
410 int allotted = 0;
411 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
412 if (my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0) {
413 __TBB_ASSERT(max_workers == 0 || max_workers == 1, nullptr);
414 allotted = a.my_global_concurrency_mode.load(std::memory_order_relaxed) &&
415 assigned < max_workers ? 1 : 0;
416 } else
417 #endif
418 {
419 int tmp = a.my_num_workers_requested * assigned_per_priority + carry;
420 allotted = tmp / my_priority_level_demand[list_idx];
421 carry = tmp % my_priority_level_demand[list_idx];
422 __TBB_ASSERT(allotted <= a.my_num_workers_requested, nullptr);
423 __TBB_ASSERT(allotted <= int(a.my_num_slots - a.my_num_reserved_slots), nullptr);
424 }
425 a.my_num_workers_allotted.store(allotted, std::memory_order_relaxed);
426 a.my_is_top_priority.store(list_idx == max_priority_level, std::memory_order_relaxed);
427 assigned += allotted;
428 }
429 }
430 __TBB_ASSERT( 0 <= assigned && assigned <= max_workers, nullptr );
431 return assigned;
432 }
433
434 /** This method must be invoked under my_arenas_list_mutex. **/
is_arena_in_list(arena_list_type & arenas,arena * a)435 bool market::is_arena_in_list( arena_list_type &arenas, arena *a ) {
436 __TBB_ASSERT( a, "Expected non-null pointer to arena." );
437 for ( arena_list_type::iterator it = arenas.begin(); it != arenas.end(); ++it )
438 if ( a == &*it )
439 return true;
440 return false;
441 }
442
443 /** This method must be invoked under my_arenas_list_mutex. **/
is_arena_alive(arena * a)444 bool market::is_arena_alive(arena* a) {
445 if ( !a )
446 return false;
447
448 // Still cannot access internals of the arena since the object itself might be destroyed.
449
450 for ( unsigned idx = 0; idx < num_priority_levels; ++idx ) {
451 if ( is_arena_in_list( my_arenas[idx], a ) )
452 return true;
453 }
454 return false;
455 }
456
457 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
enable_mandatory_concurrency_impl(arena * a)458 void market::enable_mandatory_concurrency_impl ( arena *a ) {
459 __TBB_ASSERT(!a->my_global_concurrency_mode.load(std::memory_order_relaxed), NULL);
460 __TBB_ASSERT(my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0, NULL);
461
462 a->my_global_concurrency_mode.store(true, std::memory_order_relaxed);
463 my_mandatory_num_requested++;
464 }
465
enable_mandatory_concurrency(arena * a)466 void market::enable_mandatory_concurrency ( arena *a ) {
467 int delta = 0;
468 {
469 arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex);
470 if (my_num_workers_soft_limit.load(std::memory_order_relaxed) != 0 ||
471 a->my_global_concurrency_mode.load(std::memory_order_relaxed))
472 return;
473
474 enable_mandatory_concurrency_impl(a);
475 delta = update_workers_request();
476 }
477
478 if (delta != 0)
479 my_server->adjust_job_count_estimate(delta);
480 }
481
disable_mandatory_concurrency_impl(arena * a)482 void market::disable_mandatory_concurrency_impl(arena* a) {
483 __TBB_ASSERT(a->my_global_concurrency_mode.load(std::memory_order_relaxed), NULL);
484 __TBB_ASSERT(my_mandatory_num_requested > 0, NULL);
485
486 a->my_global_concurrency_mode.store(false, std::memory_order_relaxed);
487 my_mandatory_num_requested--;
488 }
489
mandatory_concurrency_disable(arena * a)490 void market::mandatory_concurrency_disable ( arena *a ) {
491 int delta = 0;
492 {
493 arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex);
494 if (!a->my_global_concurrency_mode.load(std::memory_order_relaxed))
495 return;
496 // There is a racy window in advertise_new_work between mandtory concurrency enabling and
497 // setting SNAPSHOT_FULL. It gives a chance to spawn request to disable mandatory concurrency.
498 // Therefore, we double check that there is no enqueued tasks.
499 if (a->has_enqueued_tasks())
500 return;
501
502 __TBB_ASSERT(my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0, NULL);
503 disable_mandatory_concurrency_impl(a);
504
505 delta = update_workers_request();
506 }
507 if (delta != 0)
508 my_server->adjust_job_count_estimate(delta);
509 }
510 #endif /* __TBB_ENQUEUE_ENFORCED_CONCURRENCY */
511
adjust_demand(arena & a,int delta,bool mandatory)512 void market::adjust_demand ( arena& a, int delta, bool mandatory ) {
513 if (!delta) {
514 return;
515 }
516 int target_epoch{};
517 {
518 arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex);
519 __TBB_ASSERT(theMarket != nullptr, "market instance was destroyed prematurely?");
520 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
521 if (mandatory) {
522 __TBB_ASSERT(delta == 1 || delta == -1, nullptr);
523 // Count the number of mandatory requests and proceed only for 0->1 and 1->0 transitions.
524 a.my_local_concurrency_requests += delta;
525 if ((delta > 0 && a.my_local_concurrency_requests != 1) ||
526 (delta < 0 && a.my_local_concurrency_requests != 0))
527 {
528 return;
529 }
530 }
531 #endif
532 a.my_total_num_workers_requested += delta;
533 int target_workers = 0;
534 // Cap target_workers into interval [0, a.my_max_num_workers]
535 if (a.my_total_num_workers_requested > 0) {
536 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
537 // At least one thread should be requested when mandatory concurrency
538 int max_num_workers = int(a.my_max_num_workers);
539 if (a.my_local_concurrency_requests > 0 && max_num_workers == 0) {
540 max_num_workers = 1;
541 }
542 #endif
543 target_workers = min(a.my_total_num_workers_requested, max_num_workers);
544 }
545
546 delta = target_workers - a.my_num_workers_requested;
547
548 if (delta == 0) {
549 return;
550 }
551
552 a.my_num_workers_requested += delta;
553 if (a.my_num_workers_requested == 0) {
554 a.my_num_workers_allotted.store(0, std::memory_order_relaxed);
555 }
556
557 int total_demand = my_total_demand.load(std::memory_order_relaxed) + delta;
558 my_total_demand.store(total_demand, std::memory_order_relaxed);
559 my_priority_level_demand[a.my_priority_level] += delta;
560 unsigned effective_soft_limit = my_num_workers_soft_limit.load(std::memory_order_relaxed);
561 if (my_mandatory_num_requested > 0) {
562 __TBB_ASSERT(effective_soft_limit == 0, NULL);
563 effective_soft_limit = 1;
564 }
565
566 update_allotment(effective_soft_limit);
567 if (delta > 0) {
568 // can't overflow soft_limit, but remember values request by arenas in
569 // my_total_demand to not prematurely release workers to RML
570 if (my_num_workers_requested + delta > (int)effective_soft_limit)
571 delta = effective_soft_limit - my_num_workers_requested;
572 }
573 else {
574 // the number of workers should not be decreased below my_total_demand
575 if (my_num_workers_requested + delta < total_demand)
576 delta = min(total_demand, (int)effective_soft_limit) - my_num_workers_requested;
577 }
578 my_num_workers_requested += delta;
579 __TBB_ASSERT(my_num_workers_requested <= (int)effective_soft_limit, NULL);
580
581 target_epoch = my_adjust_demand_target_epoch++;
582 }
583
584 my_adjust_demand_current_epoch.wait_until(target_epoch, /* context = */ target_epoch, std::memory_order_acquire);
585 // Must be called outside of any locks
586 my_server->adjust_job_count_estimate( delta );
587 my_adjust_demand_current_epoch.exchange(target_epoch + 1);
588 my_adjust_demand_current_epoch.notify_relaxed(target_epoch + 1);
589 }
590
process(job & j)591 void market::process( job& j ) {
592 thread_data& td = static_cast<thread_data&>(j);
593 // td.my_arena can be dead. Don't access it until arena_in_need is called
594 arena *a = td.my_arena;
595 for (int i = 0; i < 2; ++i) {
596 while ( (a = arena_in_need(a)) ) {
597 a->process(td);
598 }
599 // Workers leave market because there is no arena in need. It can happen earlier than
600 // adjust_job_count_estimate() decreases my_slack and RML can put this thread to sleep.
601 // It might result in a busy-loop checking for my_slack<0 and calling this method instantly.
602 // the yield refines this spinning.
603 if ( !i ) {
604 yield();
605 }
606 }
607 }
608
cleanup(job & j)609 void market::cleanup( job& j) {
610 market::enforce([this] { return theMarket != this; }, NULL );
611 governor::auto_terminate(&j);
612 }
613
acknowledge_close_connection()614 void market::acknowledge_close_connection() {
615 destroy();
616 }
617
create_one_job()618 ::rml::job* market::create_one_job() {
619 unsigned short index = ++my_first_unused_worker_idx;
620 __TBB_ASSERT( index > 0, NULL );
621 ITT_THREAD_SET_NAME(_T("TBB Worker Thread"));
622 // index serves as a hint decreasing conflicts between workers when they migrate between arenas
623 thread_data* td = new(cache_aligned_allocate(sizeof(thread_data))) thread_data{ index, true };
624 __TBB_ASSERT( index <= my_num_workers_hard_limit, NULL );
625 __TBB_ASSERT( my_workers[index - 1].load(std::memory_order_relaxed) == nullptr, NULL );
626 my_workers[index - 1].store(td, std::memory_order_release);
627 return td;
628 }
629
add_external_thread(thread_data & td)630 void market::add_external_thread(thread_data& td) {
631 context_state_propagation_mutex_type::scoped_lock lock(the_context_state_propagation_mutex);
632 my_masters.push_front(td);
633 }
634
remove_external_thread(thread_data & td)635 void market::remove_external_thread(thread_data& td) {
636 context_state_propagation_mutex_type::scoped_lock lock(the_context_state_propagation_mutex);
637 my_masters.remove(td);
638 }
639
640 } // namespace r1
641 } // namespace detail
642 } // namespace tbb
643