1 // Copyright (c) 2007-2017 Hartmut Kaiser 2 // Copyright (c) 2007-2009 Chirag Dekate, Anshul Tandon 3 // Copyright (c) 2011 Bryce Lelbach, Katelyn Kufahl 4 // Copyright (c) 2017 Shoshana Jakobovits 5 // Distributed under the Boost Software License, Version 1.0. (See accompanying 6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 7 8 #if !defined(HPX_THREADMANAGER_HPP) 9 #define HPX_THREADMANAGER_HPP 10 11 #include <hpx/config.hpp> 12 #include <hpx/compat/barrier.hpp> 13 #include <hpx/compat/mutex.hpp> 14 #include <hpx/compat/thread.hpp> 15 #include <hpx/exception_fwd.hpp> 16 #include <hpx/performance_counters/counters_fwd.hpp> 17 #include <hpx/runtime/naming/name.hpp> 18 #include <hpx/runtime/resource/detail/partitioner.hpp> 19 #include <hpx/runtime/threads/detail/thread_num_tss.hpp> 20 #include <hpx/runtime/threads/policies/scheduler_mode.hpp> 21 #include <hpx/runtime/threads/thread_init_data.hpp> 22 #include <hpx/runtime/threads/thread_pool_base.hpp> 23 #include <hpx/state.hpp> 24 #include <hpx/util/block_profiler.hpp> 25 #include <hpx/util/io_service_pool.hpp> 26 #include <hpx/util/spinlock.hpp> 27 28 #include <atomic> 29 #include <cstddef> 30 #include <cstdint> 31 #include <exception> 32 #include <iosfwd> 33 #include <memory> 34 #include <mutex> 35 #include <numeric> 36 #include <string> 37 #include <type_traits> 38 #include <vector> 39 #include <utility> 40 41 #include <hpx/config/warnings_prefix.hpp> 42 43 namespace hpx { namespace threads 44 { 45 /////////////////////////////////////////////////////////////////////////// 46 /// The \a thread-manager class is the central instance of management for 47 /// all (non-depleted) threads 48 class HPX_EXPORT threadmanager 49 { 50 private: 51 // we use a simple mutex to protect the data members of the 52 // thread manager for now 53 typedef compat::mutex mutex_type; 54 55 public: 56 typedef threads::policies::callback_notifier notification_policy_type; 57 typedef std::unique_ptr<thread_pool_base> pool_type; 58 typedef threads::policies::scheduler_base scheduler_type; 59 typedef std::vector<pool_type> pool_vector; 60 61 #ifdef HPX_HAVE_TIMER_POOL 62 threadmanager(util::io_service_pool& timer_pool, 63 notification_policy_type& notifier); 64 #else 65 threadmanager(notification_policy_type& notifier); 66 #endif 67 ~threadmanager(); 68 69 void init(); 70 void create_pools(); 71 72 //! FIXME move to private and add --hpx:printpools cmd line option 73 void print_pools(std::ostream&); 74 75 // Get functions 76 thread_pool_base& default_pool() const; 77 78 scheduler_type& default_scheduler() const; 79 80 thread_pool_base& get_pool(std::string const& pool_name) const; 81 thread_pool_base& get_pool(pool_id_type pool_id) const; 82 thread_pool_base& get_pool(std::size_t thread_index) const; 83 84 /// The function \a register_work adds a new work item to the thread 85 /// manager. It doesn't immediately create a new \a thread, it just adds 86 /// the task parameters (function, initial state and description) to 87 /// the internal management data structures. The thread itself will be 88 /// created when the number of existing threads drops below the number 89 /// of threads specified by the constructors max_count parameter. 90 /// 91 /// \param func [in] The function or function object to execute as 92 /// the thread's function. This must have a signature as 93 /// defined by \a thread_function_type. 94 /// \param description [in] The value of this parameter allows to 95 /// specify a description of the thread to create. This 96 /// information is used for logging purposes mainly, but 97 /// might be useful for debugging as well. This parameter 98 /// is optional and defaults to an empty string. 99 /// \param initial_state 100 /// [in] The value of this parameter defines the initial 101 /// state of the newly created \a thread. This must be 102 /// one of the values as defined by the \a thread_state 103 /// enumeration (thread_state#pending, or \a 104 /// thread_state#suspended, any other value will throw a 105 /// hpx#bad_parameter exception). 106 void register_work(thread_init_data& data, 107 thread_state_enum initial_state = pending, 108 error_code& ec = throws); 109 110 /// The function \a register_thread adds a new work item to the thread 111 /// manager. It creates a new \a thread, adds it to the internal 112 /// management data structures, and schedules the new thread, if 113 /// appropriate. 114 /// 115 /// \param func [in] The function or function object to execute as 116 /// the thread's function. This must have a signature as 117 /// defined by \a thread_function_type. 118 /// \param id [out] This parameter will hold the id of the created 119 /// thread. This id is guaranteed to be validly 120 /// initialized before the thread function is executed. 121 /// \param description [in] The value of this parameter allows to 122 /// specify a description of the thread to create. This 123 /// information is used for logging purposes mainly, but 124 /// might be useful for debugging as well. This parameter 125 /// is optional and defaults to an empty string. 126 /// \param initial_state 127 /// [in] The value of this parameter defines the initial 128 /// state of the newly created \a thread. This must be 129 /// one of the values as defined by the \a thread_state 130 /// enumeration (thread_state#pending, or \a 131 /// thread_state#suspended, any other value will throw a 132 /// hpx#bad_parameter exception). 133 /// \param run_now [in] If this parameter is \a true and the initial 134 /// state is given as \a thread_state#pending the thread 135 /// will be run immediately, otherwise it will be 136 /// scheduled to run later (either this function is 137 /// called for another thread using \a true for the 138 /// parameter \a run_now or the function \a 139 /// threadmanager#do_some_work is called). This parameter 140 /// is optional and defaults to \a true. 141 void register_thread(thread_init_data& data, thread_id_type& id, 142 thread_state_enum initial_state = pending, 143 bool run_now = true, error_code& ec = throws); 144 145 /// \brief Run the thread manager's work queue. This function 146 /// instantiates the specified number of OS threads in each 147 /// pool. All OS threads are started to execute the function 148 /// \a tfunc. 149 /// 150 /// \returns The function returns \a true if the thread manager 151 /// has been started successfully, otherwise it returns 152 /// \a false. 153 bool run(); 154 155 /// \brief Forcefully stop the thread-manager 156 /// 157 /// \param blocking 158 /// 159 void stop (bool blocking = true); 160 161 // \brief Suspend all thread pools. 162 void suspend(); 163 164 // \brief Resume all thread pools. 165 void resume(); 166 167 /// \brief Return whether the thread manager is still running 168 //! This returns the "minimal state", i.e. the state of the 169 //! least advanced thread pool status() const170 state status() const 171 { 172 hpx::state result(last_valid_runtime_state); 173 174 for (auto& pool_iter : pools_) 175 { 176 hpx::state s = pool_iter->get_state(); 177 result = (std::min)(result, s); 178 } 179 180 return result; 181 } 182 183 /// \brief return the number of HPX-threads with the given state 184 /// 185 /// \note This function lock the internal OS lock in the thread manager 186 std::int64_t get_thread_count(thread_state_enum state = unknown, 187 thread_priority priority = thread_priority_default, 188 std::size_t num_thread = std::size_t(-1), bool reset = false); 189 190 std::int64_t get_background_thread_count(); 191 192 // Enumerate all matching threads 193 bool enumerate_threads( 194 util::function_nonser<bool(thread_id_type)> const& f, 195 thread_state_enum state = unknown) const; 196 197 // \brief Abort all threads which are in suspended state. This will set 198 // the state of all suspended threads to \a pending while 199 // supplying the wait_abort extended state flag 200 void abort_all_suspended_threads(); 201 202 // \brief Clean up terminated threads. This deletes all threads which 203 // have been terminated but which are still held in the queue 204 // of terminated threads. Some schedulers might not do anything 205 // here. 206 bool cleanup_terminated(bool delete_all); 207 208 /// \brief Return the number of OS threads running in this thread-manager 209 /// 210 /// This function will return correct results only if the thread-manager 211 /// is running. get_os_thread_count() const212 std::size_t get_os_thread_count() const 213 { 214 std::lock_guard<mutex_type> lk(mtx_); 215 std::size_t total = 0; 216 for (auto& pool_iter : pools_) 217 { 218 total += pool_iter->get_os_thread_count(); 219 } 220 return total; 221 } 222 get_os_thread_handle(std::size_t num_thread) const223 compat::thread& get_os_thread_handle(std::size_t num_thread) const 224 { 225 std::lock_guard<mutex_type> lk(mtx_); 226 pool_id_type id = threads_lookup_[num_thread]; 227 thread_pool_base& pool = get_pool(id); 228 return pool.get_os_thread_handle(num_thread); 229 } 230 231 public: 232 /// API functions forwarding to notification policy 233 234 /// This notifies the thread manager that the passed exception has been 235 /// raised. The exception will be routed through the notifier and the 236 /// scheduler (which will result in it being passed to the runtime 237 /// object, which in turn will report it to the console, etc.). report_error(std::size_t num_thread,std::exception_ptr const & e)238 void report_error(std::size_t num_thread, std::exception_ptr const& e) 239 { 240 // propagate the error reporting to all pools, which in turn 241 // will propagate to schedulers 242 for (auto& pool_iter : pools_) 243 { 244 pool_iter->report_error(num_thread, e); 245 } 246 } 247 248 // Return the (global) sequence number of the current thread get_worker_thread_num(bool * =nullptr)249 std::size_t get_worker_thread_num(bool* /*numa_sensitive*/ = nullptr) 250 { 251 if (get_self_ptr() == nullptr) 252 return std::size_t(-1); 253 return detail::thread_num_tss_.get_worker_thread_num(); 254 } 255 256 public: 257 /// The function register_counter_types() is called during startup to 258 /// allow the registration of all performance counter types for this 259 /// thread-manager instance. 260 void register_counter_types(); 261 262 /// Returns the mask identifying all processing units used by this 263 /// thread manager. get_used_processing_units() const264 mask_type get_used_processing_units() const 265 { 266 mask_type total_used_processing_punits = mask_type(); 267 threads::resize( 268 total_used_processing_punits, hardware_concurrency()); 269 270 for (auto& pool_iter : pools_) 271 { 272 total_used_processing_punits |= 273 pool_iter->get_used_processing_units(); 274 } 275 276 return total_used_processing_punits; 277 } 278 get_pool_numa_bitmap(const std::string & pool_name) const279 hwloc_bitmap_ptr get_pool_numa_bitmap( 280 const std::string &pool_name) const 281 { 282 return get_pool(pool_name).get_numa_domain_bitmap(); 283 } 284 set_scheduler_mode(threads::policies::scheduler_mode mode)285 void set_scheduler_mode(threads::policies::scheduler_mode mode) 286 { 287 for (auto& pool_iter : pools_) 288 { 289 pool_iter->set_scheduler_mode(mode); 290 } 291 } 292 reset_thread_distribution()293 void reset_thread_distribution() 294 { 295 for (auto& pool_iter : pools_) 296 { 297 pool_iter->reset_thread_distribution(); 298 } 299 } 300 init_tss(std::size_t num)301 void init_tss(std::size_t num) 302 { 303 detail::thread_num_tss_.init_tss(num); 304 } 305 deinit_tss()306 void deinit_tss() 307 { 308 detail::thread_num_tss_.deinit_tss(); 309 } 310 311 public: 312 std::size_t shrink_pool(std::string const& pool_name); 313 std::size_t expand_pool(std::string const& pool_name); 314 315 private: 316 // counter creator functions 317 naming::gid_type thread_counts_counter_creator( 318 performance_counters::counter_info const& info, error_code& ec); 319 naming::gid_type scheduler_utilization_counter_creator( 320 performance_counters::counter_info const& info, error_code& ec); 321 322 typedef std::int64_t (threadmanager::*threadmanager_counter_func)( 323 bool reset); 324 typedef std::int64_t (thread_pool_base::*threadpool_counter_func)( 325 std::size_t num_thread, bool reset); 326 327 naming::gid_type locality_pool_thread_counter_creator( 328 threadmanager_counter_func total_func, 329 threadpool_counter_func pool_func, 330 performance_counters::counter_info const& info, error_code& ec); 331 332 #ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME 333 naming::gid_type queue_wait_time_counter_creator( 334 threadmanager_counter_func total_func, 335 threadpool_counter_func pool_func, 336 performance_counters::counter_info const& info, error_code& ec); 337 #endif 338 339 naming::gid_type locality_pool_thread_no_total_counter_creator( 340 threadpool_counter_func pool_func, 341 performance_counters::counter_info const& info, error_code& ec); 342 343 // performance counters 344 std::int64_t get_queue_length(bool reset); 345 #ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME 346 std::int64_t get_average_thread_wait_time(bool reset); 347 std::int64_t get_average_task_wait_time(bool reset); 348 #endif 349 #if defined(HPX_HAVE_BACKGROUND_THREAD_COUNTERS) && defined(HPX_HAVE_THREAD_IDLE_RATES) 350 std::int64_t get_background_work_duration(bool reset); 351 std::int64_t get_background_overhead(bool reset); 352 #endif //HPX_HAVE_BACKGROUND_THREAD_COUNTERS 353 354 std::int64_t get_cumulative_duration(bool reset); 355 get_thread_count_unknown(bool reset)356 std::int64_t get_thread_count_unknown(bool reset) 357 { 358 return get_thread_count( 359 unknown, thread_priority_default, std::size_t(-1), reset); 360 } get_thread_count_active(bool reset)361 std::int64_t get_thread_count_active(bool reset) 362 { 363 return get_thread_count( 364 active, thread_priority_default, std::size_t(-1), reset); 365 } get_thread_count_pending(bool reset)366 std::int64_t get_thread_count_pending(bool reset) 367 { 368 return get_thread_count( 369 pending, thread_priority_default, std::size_t(-1), reset); 370 } get_thread_count_suspended(bool reset)371 std::int64_t get_thread_count_suspended(bool reset) 372 { 373 return get_thread_count( 374 suspended, thread_priority_default, std::size_t(-1), reset); 375 } get_thread_count_terminated(bool reset)376 std::int64_t get_thread_count_terminated(bool reset) 377 { 378 return get_thread_count( 379 terminated, thread_priority_default, std::size_t(-1), reset); 380 } get_thread_count_staged(bool reset)381 std::int64_t get_thread_count_staged(bool reset) 382 { 383 return get_thread_count( 384 staged, thread_priority_default, std::size_t(-1), reset); 385 } 386 387 #ifdef HPX_HAVE_THREAD_IDLE_RATES 388 std::int64_t avg_idle_rate(bool reset); 389 #ifdef HPX_HAVE_THREAD_CREATION_AND_CLEANUP_RATES 390 std::int64_t avg_creation_idle_rate(bool reset); 391 std::int64_t avg_cleanup_idle_rate(bool reset); 392 #endif 393 #endif 394 395 #ifdef HPX_HAVE_THREAD_CUMULATIVE_COUNTS 396 std::int64_t get_executed_threads(bool reset); 397 std::int64_t get_executed_thread_phases(bool reset); 398 #ifdef HPX_HAVE_THREAD_IDLE_RATES 399 std::int64_t get_thread_duration(bool reset); 400 std::int64_t get_thread_phase_duration(bool reset); 401 std::int64_t get_thread_overhead(bool reset); 402 std::int64_t get_thread_phase_overhead(bool reset); 403 std::int64_t get_cumulative_thread_duration(bool reset); 404 std::int64_t get_cumulative_thread_overhead(bool reset); 405 #endif 406 #endif 407 408 #ifdef HPX_HAVE_THREAD_STEALING_COUNTS 409 std::int64_t get_num_pending_misses(bool reset); 410 std::int64_t get_num_pending_accesses(bool reset); 411 std::int64_t get_num_stolen_from_pending(bool reset); 412 std::int64_t get_num_stolen_from_staged(bool reset); 413 std::int64_t get_num_stolen_to_pending(bool reset); 414 std::int64_t get_num_stolen_to_staged(bool reset); 415 #endif 416 417 private: 418 mutable mutex_type mtx_; // mutex protecting the members 419 420 // specified by the user in command line, or all cores by default 421 // represents the total number of OS threads, irrespective of how many 422 // are in which pool. 423 std::size_t num_threads_; 424 425 std::vector<pool_id_type> threads_lookup_; 426 427 #ifdef HPX_HAVE_TIMER_POOL 428 util::io_service_pool& timer_pool_; // used for timed set_state 429 #endif 430 pool_vector pools_; 431 432 notification_policy_type& notifier_; 433 }; 434 }} 435 436 #include <hpx/config/warnings_suffix.hpp> 437 438 #endif 439