1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef _TBB_market_H 18 #define _TBB_market_H 19 20 #include "scheduler_common.h" 21 #include "market_concurrent_monitor.h" 22 #include "intrusive_list.h" 23 #include "rml_tbb.h" 24 #include "oneapi/tbb/rw_mutex.h" 25 26 #include "oneapi/tbb/spin_rw_mutex.h" 27 #include "oneapi/tbb/task_group.h" 28 29 #include <atomic> 30 31 #if defined(_MSC_VER) && defined(_Wp64) 32 // Workaround for overzealous compiler warnings in /Wp64 mode 33 #pragma warning (push) 34 #pragma warning (disable: 4244) 35 #endif 36 37 namespace tbb { 38 namespace detail { 39 40 #if __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE 41 namespace d1 { 42 class task_scheduler_handle; 43 } 44 #endif 45 46 namespace r1 { 47 48 class task_arena_base; 49 class task_group_context; 50 51 //------------------------------------------------------------------------ 52 // Class market 53 //------------------------------------------------------------------------ 54 55 class market : no_copy, rml::tbb_client { 56 friend class arena; 57 friend class task_arena_base; 58 template<typename SchedulerTraits> friend class custom_scheduler; 59 friend class task_group_context; 60 friend class governor; 61 #if __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE 62 friend class lifetime_control; 63 #endif 64 65 public: 66 //! Keys for the arena map array. The lower the value the higher priority of the arena list. 67 static constexpr unsigned num_priority_levels = 3; 68 69 private: 70 friend void ITT_DoUnsafeOneTimeInitialization (); 71 #if __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE 72 friend bool finalize_impl(d1::task_scheduler_handle& handle); 73 #endif 74 75 typedef intrusive_list<arena> arena_list_type; 76 typedef intrusive_list<thread_data> thread_data_list_type; 77 78 //! Currently active global market 79 static market* theMarket; 80 81 typedef scheduler_mutex_type global_market_mutex_type; 82 83 //! Mutex guarding creation/destruction of theMarket, insertions/deletions in my_arenas, and cancellation propagation 84 static global_market_mutex_type theMarketMutex; 85 86 //! Lightweight mutex guarding accounting operations with arenas list 87 typedef rw_mutex arenas_list_mutex_type; 88 // TODO: introduce fine-grained (per priority list) locking of arenas. 89 arenas_list_mutex_type my_arenas_list_mutex; 90 91 //! Pointer to the RML server object that services this TBB instance. 92 rml::tbb_server* my_server; 93 94 //! Waiting object for external and coroutine waiters. 95 market_concurrent_monitor my_sleep_monitor; 96 97 //! Maximal number of workers allowed for use by the underlying resource manager 98 /** It can't be changed after market creation. **/ 99 unsigned my_num_workers_hard_limit; 100 101 //! Current application-imposed limit on the number of workers (see set_active_num_workers()) 102 /** It can't be more than my_num_workers_hard_limit. **/ 103 std::atomic<unsigned> my_num_workers_soft_limit; 104 105 //! Number of workers currently requested from RML 106 int my_num_workers_requested; 107 108 //! The target serialization epoch for callers of adjust_job_count_estimate 109 int my_adjust_demand_target_epoch; 110 111 //! The current serialization epoch for callers of adjust_job_count_estimate 112 d1::waitable_atomic<int> my_adjust_demand_current_epoch; 113 114 //! First unused index of worker 115 /** Used to assign indices to the new workers coming from RML, and busy part 116 of my_workers array. **/ 117 std::atomic<unsigned> my_first_unused_worker_idx; 118 119 //! Number of workers that were requested by all arenas on all priority levels 120 std::atomic<int> my_total_demand; 121 122 //! Number of workers that were requested by arenas per single priority list item 123 int my_priority_level_demand[num_priority_levels]; 124 125 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 126 //! How many times mandatory concurrency was requested from the market 127 int my_mandatory_num_requested; 128 #endif 129 130 //! Per priority list of registered arenas 131 arena_list_type my_arenas[num_priority_levels]; 132 133 //! The first arena to be checked when idle worker seeks for an arena to enter 134 /** The check happens in round-robin fashion. **/ 135 arena *my_next_arena; 136 137 //! ABA prevention marker to assign to newly created arenas 138 std::atomic<uintptr_t> my_arenas_aba_epoch; 139 140 //! Reference count controlling market object lifetime 141 std::atomic<unsigned> my_ref_count; 142 143 //! Count of external threads attached 144 std::atomic<unsigned> my_public_ref_count; 145 146 //! Stack size of worker threads 147 std::size_t my_stack_size; 148 149 //! Shutdown mode 150 bool my_join_workers; 151 152 //! The value indicating that the soft limit warning is unnecessary 153 static const unsigned skip_soft_limit_warning = ~0U; 154 155 //! Either workers soft limit to be reported via runtime_warning() or skip_soft_limit_warning 156 std::atomic<unsigned> my_workers_soft_limit_to_report; 157 158 //! Constructor 159 market ( unsigned workers_soft_limit, unsigned workers_hard_limit, std::size_t stack_size ); 160 161 //! Destroys and deallocates market object created by market::create() 162 void destroy (); 163 164 //! Recalculates the number of workers requested from RML and updates the allotment. 165 int update_workers_request(); 166 167 //! Recalculates the number of workers assigned to each arena in the list. 168 /** The actual number of workers servicing a particular arena may temporarily 169 deviate from the calculated value. **/ update_allotment(unsigned effective_soft_limit)170 void update_allotment (unsigned effective_soft_limit) { 171 int total_demand = my_total_demand.load(std::memory_order_relaxed); 172 if (total_demand) { 173 update_allotment(my_arenas, total_demand, (int)effective_soft_limit); 174 } 175 } 176 177 //! Returns next arena that needs more workers, or NULL. 178 arena* arena_in_need(arena* prev); 179 180 template <typename Pred> enforce(Pred pred,const char * msg)181 static void enforce (Pred pred, const char* msg) { 182 suppress_unused_warning(pred, msg); 183 #if TBB_USE_ASSERT 184 global_market_mutex_type::scoped_lock lock(theMarketMutex); 185 __TBB_ASSERT(pred(), msg); 186 #endif 187 } 188 189 //////////////////////////////////////////////////////////////////////////////// 190 // Helpers to unify code branches dependent on priority feature presence 191 192 arena* select_next_arena( arena* hint ); 193 194 void insert_arena_into_list ( arena& a ); 195 196 void remove_arena_from_list ( arena& a ); 197 198 arena* arena_in_need ( arena_list_type* arenas, arena* hint ); 199 200 int update_allotment ( arena_list_type* arenas, int total_demand, int max_workers ); 201 202 bool is_arena_in_list( arena_list_type& arenas, arena* a ); 203 204 bool is_arena_alive( arena* a ); 205 206 //////////////////////////////////////////////////////////////////////////////// 207 // Implementation of rml::tbb_client interface methods 208 version()209 version_type version () const override { return 0; } 210 max_job_count()211 unsigned max_job_count () const override { return my_num_workers_hard_limit; } 212 min_stack_size()213 std::size_t min_stack_size () const override { return worker_stack_size(); } 214 215 job* create_one_job () override; 216 217 void cleanup( job& j ) override; 218 219 void acknowledge_close_connection () override; 220 221 void process( job& j ) override; 222 223 public: 224 //! Factory method creating new market object 225 static market& global_market( bool is_public, unsigned max_num_workers = 0, std::size_t stack_size = 0 ); 226 227 //! Add reference to market if theMarket exists 228 static bool add_ref_unsafe( global_market_mutex_type::scoped_lock& lock, bool is_public, unsigned max_num_workers = 0, std::size_t stack_size = 0 ); 229 230 //! Creates an arena object 231 /** If necessary, also creates global market instance, and boosts its ref count. 232 Each call to create_arena() must be matched by the call to arena::free_arena(). **/ 233 static arena* create_arena ( int num_slots, int num_reserved_slots, 234 unsigned arena_index, std::size_t stack_size ); 235 236 //! Removes the arena from the market's list 237 void try_destroy_arena ( arena*, uintptr_t aba_epoch, unsigned pririty_level ); 238 239 //! Removes the arena from the market's list 240 void detach_arena ( arena& ); 241 242 //! Decrements market's refcount and destroys it in the end 243 bool release ( bool is_public, bool blocking_terminate ); 244 245 //! Return wait list get_wait_list()246 market_concurrent_monitor& get_wait_list() { return my_sleep_monitor; } 247 248 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 249 //! Imlpementation of mandatory concurrency enabling 250 void enable_mandatory_concurrency_impl ( arena *a ); 251 252 //! Inform the external thread that there is an arena with mandatory concurrency 253 void enable_mandatory_concurrency ( arena *a ); 254 255 //! Inform the external thread that the arena is no more interested in mandatory concurrency 256 void disable_mandatory_concurrency_impl(arena* a); 257 258 //! Inform the external thread that the arena is no more interested in mandatory concurrency 259 void mandatory_concurrency_disable ( arena *a ); 260 #endif /* __TBB_ENQUEUE_ENFORCED_CONCURRENCY */ 261 262 //! Request that arena's need in workers should be adjusted. 263 /** Concurrent invocations are possible only on behalf of different arenas. **/ 264 void adjust_demand ( arena&, int delta, bool mandatory ); 265 266 //! Used when RML asks for join mode during workers termination. must_join_workers()267 bool must_join_workers () const { return my_join_workers; } 268 269 //! Returns the requested stack size of worker threads. worker_stack_size()270 std::size_t worker_stack_size () const { return my_stack_size; } 271 272 //! Set number of active workers 273 static void set_active_num_workers( unsigned w ); 274 275 //! Reports active parallelism level according to user's settings 276 static unsigned app_parallelism_limit(); 277 278 #if __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE 279 //! Reports if any active global lifetime references are present 280 static unsigned is_lifetime_control_present(); 281 #endif 282 283 //! Finds all contexts affected by the state change and propagates the new state to them. 284 /** The propagation is relayed to the market because tasks created by one 285 external thread can be passed to and executed by other external threads. This means 286 that context trees can span several arenas at once and thus state change 287 propagation cannot be generally localized to one arena only. **/ 288 template <typename T> 289 bool propagate_task_group_state (std::atomic<T> d1::task_group_context::*mptr_state, d1::task_group_context& src, T new_state ); 290 291 //! List of registered external threads 292 thread_data_list_type my_masters; 293 294 //! Array of pointers to the registered workers 295 /** Used by cancellation propagation mechanism. 296 Must be the last data member of the class market. **/ 297 std::atomic<thread_data*> my_workers[1]; 298 max_num_workers()299 static unsigned max_num_workers() { 300 global_market_mutex_type::scoped_lock lock( theMarketMutex ); 301 return theMarket? theMarket->my_num_workers_hard_limit : 0; 302 } 303 304 void add_external_thread(thread_data& td); 305 306 void remove_external_thread(thread_data& td); 307 }; // class market 308 309 } // namespace r1 310 } // namespace detail 311 } // namespace tbb 312 313 #if defined(_MSC_VER) && defined(_Wp64) 314 // Workaround for overzealous compiler warnings in /Wp64 mode 315 #pragma warning (pop) 316 #endif // warning 4244 is back 317 318 #endif /* _TBB_market_H */ 319