1 // Copyright (c) 2018 Mikael Simberg 2 // Copyright (c) 2007-2017 Hartmut Kaiser 3 // 4 // Distributed under the Boost Software License, Version 1.0. (See accompanying 5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 6 7 #if !defined(HPX_RUNTIME_THREADS_DETAIL_THREAD_POOL_JUN_11_2015_1137AM) 8 #define HPX_RUNTIME_THREADS_DETAIL_THREAD_POOL_JUN_11_2015_1137AM 9 10 #include <hpx/config.hpp> 11 #include <hpx/compat/barrier.hpp> 12 #include <hpx/compat/mutex.hpp> 13 #include <hpx/compat/thread.hpp> 14 #include <hpx/error_code.hpp> 15 #include <hpx/exception_fwd.hpp> 16 #include <hpx/lcos/future.hpp> 17 #include <hpx/lcos/local/no_mutex.hpp> 18 #include <hpx/lcos/local/spinlock.hpp> 19 #include <hpx/runtime/thread_pool_helpers.hpp> 20 #include <hpx/runtime/threads/cpu_mask.hpp> 21 #include <hpx/runtime/threads/policies/affinity_data.hpp> 22 #include <hpx/runtime/threads/policies/callback_notifier.hpp> 23 #include <hpx/runtime/threads/policies/scheduler_mode.hpp> 24 #include <hpx/runtime/threads/thread_executor.hpp> 25 #include <hpx/runtime/threads/thread_init_data.hpp> 26 #include <hpx/runtime/threads/topology.hpp> 27 #include <hpx/state.hpp> 28 #include <hpx/util/steady_clock.hpp> 29 #include <hpx/util_fwd.hpp> 30 31 #include <cstddef> 32 #include <cstdint> 33 #include <exception> 34 #include <functional> 35 #include <iosfwd> 36 #include <memory> 37 #include <mutex> 38 #include <string> 39 #include <vector> 40 41 #include <hpx/config/warnings_prefix.hpp> 42 43 namespace hpx { namespace threads 44 { 45 /////////////////////////////////////////////////////////////////////////// 46 /// \cond NOINTERNAL 47 struct pool_id_type 48 { pool_id_typehpx::threads::pool_id_type49 pool_id_type(std::size_t index, std::string const& name) 50 : index_(index), name_(name) 51 {} 52 indexhpx::threads::pool_id_type53 std::size_t index() const { return index_; }; namehpx::threads::pool_id_type54 std::string const& name() const { return name_; } 55 56 private: 57 std::size_t const index_; 58 std::string const name_; 59 }; 60 /// \endcond 61 62 /////////////////////////////////////////////////////////////////////////// 63 // note: this data structure has to be protected from races from the outside 64 65 /// \brief The base class used to manage a pool of OS threads. 66 class thread_pool_base : public detail::manage_executor 67 { 68 public: 69 /// \cond NOINTERNAL 70 thread_pool_base(threads::policies::callback_notifier& notifier, 71 std::size_t index, std::string const& pool_name, 72 policies::scheduler_mode m, std::size_t thread_offset); 73 74 virtual ~thread_pool_base() = default; 75 76 virtual void init(std::size_t num_threads, std::size_t threads_offset); 77 78 virtual bool run(std::unique_lock<compat::mutex>& l, 79 std::size_t num_threads) = 0; 80 81 virtual void stop( 82 std::unique_lock<compat::mutex>& l, bool blocking = true) = 0; 83 84 virtual void print_pool(std::ostream&) = 0; 85 get_pool_id()86 pool_id_type get_pool_id() 87 { 88 return id_; 89 } 90 /// \endcond 91 92 /// Resumes the thread pool. When the all OS threads on the thread pool 93 /// have been resumed the returned future will be ready. 94 /// 95 /// \note Can only be called from an HPX thread. Use resume_cb or 96 /// resume_direct to suspend the pool from outside HPX. 97 /// 98 /// \returns A `future<void>` which is ready when the thread pool has 99 /// been resumed. 100 /// 101 /// \throws hpx::exception if called from outside the HPX runtime. 102 virtual hpx::future<void> resume() = 0; 103 104 /// Resumes the thread pool. Takes a callback as a parameter which will 105 /// be called when all OS threads on the thread pool have been resumed. 106 /// 107 /// \param callback [in] called when the thread pool has been resumed. 108 /// \param ec [in,out] this represents the error status on exit, if this 109 /// is pre-initialized to \a hpx#throws the function will throw 110 /// on error instead. 111 virtual void resume_cb( 112 std::function<void(void)> callback, error_code& ec = throws) = 0; 113 114 /// Resumes the thread pool. Blocks until all OS threads on the thread pool 115 /// have been resumed. 116 /// 117 /// \param ec [in,out] this represents the error status on exit, if this 118 /// is pre-initialized to \a hpx#throws the function will 119 /// throw on error instead. 120 virtual void resume_direct(error_code& ec = throws) = 0; 121 122 /// Suspends the thread pool. When the all OS threads on the thread pool 123 /// have been suspended the returned future will be ready. 124 /// 125 /// \note Can only be called from an HPX thread. Use suspend_cb or 126 /// suspend_direct to suspend the pool from outside HPX. A thread 127 /// pool cannot be suspended from an HPX thread running on the 128 /// pool itself. 129 /// 130 /// \returns A `future<void>` which is ready when the thread pool has 131 /// been suspended. 132 /// 133 /// \throws hpx::exception if called from outside the HPX runtime. 134 virtual hpx::future<void> suspend() = 0; 135 136 /// Suspends the thread pool. Takes a callback as a parameter which will 137 /// be called when all OS threads on the thread pool have been suspended. 138 /// 139 /// \note A thread pool cannot be suspended from an HPX thread running 140 /// on the pool itself. 141 /// 142 /// \param callback [in] called when the thread pool has been suspended. 143 /// \param ec [in,out] this represents the error status on exit, if this 144 /// is pre-initialized to \a hpx#throws the function will throw 145 /// on error instead. 146 /// 147 /// \throws hpx::exception if called from an HPX thread which is running 148 /// on the pool itself. 149 virtual void suspend_cb( 150 std::function<void(void)> callback, error_code& ec = throws) = 0; 151 152 /// Suspends the thread pool. Blocks until all OS threads on the thread pool 153 /// have been suspended. 154 /// 155 /// \note A thread pool cannot be suspended from an HPX thread running 156 /// on the pool itself. 157 /// 158 /// \param ec [in,out] this represents the error status on exit, if this 159 /// is pre-initialized to \a hpx#throws the function will 160 /// throw on error instead. 161 /// 162 /// \throws hpx::exception if called from an HPX thread which is running 163 /// on the pool itself. 164 virtual void suspend_direct(error_code& ec = throws) = 0; 165 166 public: 167 /// \cond NOINTERNAL 168 std::size_t get_worker_thread_num() const; 169 virtual std::size_t get_os_thread_count() const = 0; 170 171 virtual compat::thread& get_os_thread_handle( 172 std::size_t num_thread) = 0; 173 174 virtual std::size_t get_active_os_thread_count() const; 175 176 virtual void create_thread(thread_init_data& data, thread_id_type& id, 177 thread_state_enum initial_state, bool run_now, error_code& ec) = 0; 178 virtual void create_work(thread_init_data& data, 179 thread_state_enum initial_state, error_code& ec) = 0; 180 181 virtual thread_state set_state(thread_id_type const& id, 182 thread_state_enum new_state, thread_state_ex_enum new_state_ex, 183 thread_priority priority, error_code& ec) = 0; 184 185 virtual thread_id_type set_state(util::steady_time_point const& abs_time, 186 thread_id_type const& id, thread_state_enum newstate, 187 thread_state_ex_enum newstate_ex, thread_priority priority, 188 error_code& ec) = 0; 189 get_pool_index() const190 std::size_t get_pool_index() const 191 { 192 return id_.index(); 193 } get_pool_name() const194 std::string const& get_pool_name() const 195 { 196 return id_.name(); 197 } get_thread_offset() const198 std::size_t get_thread_offset() const 199 { 200 return thread_offset_; 201 } 202 get_scheduler() const203 virtual policies::scheduler_base* get_scheduler() const 204 { 205 return nullptr; 206 } 207 208 mask_type get_used_processing_units() const; 209 hwloc_bitmap_ptr get_numa_domain_bitmap() const; 210 211 // performance counters 212 #if defined(HPX_HAVE_THREAD_CUMULATIVE_COUNTS) get_executed_threads(std::size_t,bool)213 virtual std::int64_t get_executed_threads( 214 std::size_t /*thread_num*/, bool /*reset*/) { return 0; } get_executed_thread_phases(std::size_t,bool)215 virtual std::int64_t get_executed_thread_phases( 216 std::size_t /*thread_num*/, bool /*reset*/) { return 0; } 217 #if defined(HPX_HAVE_THREAD_IDLE_RATES) get_thread_phase_duration(std::size_t,bool)218 virtual std::int64_t get_thread_phase_duration( 219 std::size_t /*thread_num*/, bool /*reset*/) { return 0; } get_thread_duration(std::size_t,bool)220 virtual std::int64_t get_thread_duration( 221 std::size_t /*thread_num*/, bool /*reset*/) { return 0; } get_thread_phase_overhead(std::size_t,bool)222 virtual std::int64_t get_thread_phase_overhead( 223 std::size_t /*thread_num*/, bool /*reset*/) { return 0; } get_thread_overhead(std::size_t,bool)224 virtual std::int64_t get_thread_overhead( 225 std::size_t /*thread_num*/, bool /*reset*/) { return 0; } get_cumulative_thread_duration(std::size_t,bool)226 virtual std::int64_t get_cumulative_thread_duration( 227 std::size_t /*thread_num*/, bool /*reset*/) { return 0; } get_cumulative_thread_overhead(std::size_t,bool)228 virtual std::int64_t get_cumulative_thread_overhead( 229 std::size_t /*thread_num*/, bool /*reset*/) { return 0; } 230 #endif 231 #endif 232 get_cumulative_duration(std::size_t,bool)233 virtual std::int64_t get_cumulative_duration( 234 std::size_t /*thread_num*/, bool /*reset*/) { return 0; } 235 236 #if defined(HPX_HAVE_BACKGROUND_THREAD_COUNTERS) && defined(HPX_HAVE_THREAD_IDLE_RATES) get_background_work_duration(std::size_t,bool)237 virtual std::int64_t get_background_work_duration( 238 std::size_t /*thread_num*/, bool /*reset*/) { return 0; } get_background_overhead(std::size_t,bool)239 virtual std::int64_t get_background_overhead( 240 std::size_t /*thread_num*/, bool /*reset*/) { return 0; } 241 #endif // HPX_HAVE_BACKGROUND_THREAD_COUNTERS 242 243 #if defined(HPX_HAVE_THREAD_IDLE_RATES) avg_idle_rate_all(bool)244 virtual std::int64_t avg_idle_rate_all(bool /*reset*/) { return 0; } avg_idle_rate(std::size_t,bool)245 virtual std::int64_t avg_idle_rate(std::size_t, bool) { return 0; } 246 247 #if defined(HPX_HAVE_THREAD_CREATION_AND_CLEANUP_RATES) avg_creation_idle_rate(std::size_t,bool)248 virtual std::int64_t avg_creation_idle_rate( 249 std::size_t /*thread_num*/, bool /*reset*/) { return 0; } avg_cleanup_idle_rate(std::size_t,bool)250 virtual std::int64_t avg_cleanup_idle_rate( 251 std::size_t /*thread_num*/, bool /*reset*/) { return 0; } 252 #endif 253 #endif 254 get_queue_length(std::size_t,bool)255 virtual std::int64_t get_queue_length(std::size_t, bool) { return 0; } 256 257 #if defined(HPX_HAVE_THREAD_QUEUE_WAITTIME) get_average_thread_wait_time(std::size_t,bool)258 virtual std::int64_t get_average_thread_wait_time( 259 std::size_t /*thread_num*/, bool /*reset*/) { return 0; } get_average_task_wait_time(std::size_t,bool)260 virtual std::int64_t get_average_task_wait_time( 261 std::size_t /*thread_num*/, bool /*reset*/) { return 0; } 262 #endif 263 264 #if defined(HPX_HAVE_THREAD_STEALING_COUNTS) get_num_pending_misses(std::size_t,bool)265 virtual std::int64_t get_num_pending_misses( 266 std::size_t /*thread_num*/, bool /*reset*/) { return 0; } get_num_pending_accesses(std::size_t,bool)267 virtual std::int64_t get_num_pending_accesses( 268 std::size_t /*thread_num*/, bool /*reset*/) { return 0; } 269 get_num_stolen_from_pending(std::size_t,bool)270 virtual std::int64_t get_num_stolen_from_pending( 271 std::size_t /*thread_num*/, bool /*reset*/) { return 0; } get_num_stolen_to_pending(std::size_t,bool)272 virtual std::int64_t get_num_stolen_to_pending( 273 std::size_t /*thread_num*/, bool /*reset*/) { return 0; } get_num_stolen_from_staged(std::size_t,bool)274 virtual std::int64_t get_num_stolen_from_staged( 275 std::size_t /*thread_num*/, bool /*reset*/) { return 0; } get_num_stolen_to_staged(std::size_t,bool)276 virtual std::int64_t get_num_stolen_to_staged( 277 std::size_t /*thread_num*/, bool /*reset*/) { return 0; } 278 #endif 279 get_thread_count(thread_state_enum,thread_priority,std::size_t,bool)280 virtual std::int64_t get_thread_count(thread_state_enum /*state*/, 281 thread_priority /*priority*/, std::size_t /*num_thread*/, 282 bool /*reset*/) { return 0; } 283 get_background_thread_count()284 virtual std::int64_t get_background_thread_count() { return 0; } 285 get_thread_count_unknown(std::size_t num_thread,bool reset)286 std::int64_t get_thread_count_unknown( 287 std::size_t num_thread, bool reset) 288 { 289 return get_thread_count( 290 unknown, thread_priority_default, num_thread, reset); 291 } get_thread_count_active(std::size_t num_thread,bool reset)292 std::int64_t get_thread_count_active(std::size_t num_thread, bool reset) 293 { 294 return get_thread_count( 295 active, thread_priority_default, num_thread, reset); 296 } get_thread_count_pending(std::size_t num_thread,bool reset)297 std::int64_t get_thread_count_pending( 298 std::size_t num_thread, bool reset) 299 { 300 return get_thread_count( 301 pending, thread_priority_default, num_thread, reset); 302 } get_thread_count_suspended(std::size_t num_thread,bool reset)303 std::int64_t get_thread_count_suspended( 304 std::size_t num_thread, bool reset) 305 { 306 return get_thread_count( 307 suspended, thread_priority_default, num_thread, reset); 308 } get_thread_count_terminated(std::size_t num_thread,bool reset)309 std::int64_t get_thread_count_terminated( 310 std::size_t num_thread, bool reset) 311 { 312 return get_thread_count( 313 terminated, thread_priority_default, num_thread, reset); 314 } get_thread_count_staged(std::size_t num_thread,bool reset)315 std::int64_t get_thread_count_staged(std::size_t num_thread, bool reset) 316 { 317 return get_thread_count( 318 staged, thread_priority_default, num_thread, reset); 319 } 320 321 virtual std::int64_t get_scheduler_utilization() const = 0; 322 323 virtual std::int64_t get_idle_loop_count( 324 std::size_t num, bool reset) = 0; 325 virtual std::int64_t get_busy_loop_count( 326 std::size_t num, bool reset) = 0; 327 328 /////////////////////////////////////////////////////////////////////// enumerate_threads(util::function_nonser<bool (thread_id_type)> const &,thread_state_enum=unknown) const329 virtual bool enumerate_threads( 330 util::function_nonser<bool(thread_id_type)> const& /*f*/, 331 thread_state_enum /*state*/ = unknown) const 332 { 333 return false; 334 } 335 reset_thread_distribution()336 virtual void reset_thread_distribution() {} 337 set_scheduler_mode(threads::policies::scheduler_mode)338 virtual void set_scheduler_mode(threads::policies::scheduler_mode) {} 339 340 // abort_all_suspended_threads()341 virtual void abort_all_suspended_threads() {} cleanup_terminated(bool)342 virtual bool cleanup_terminated(bool /*delete_all*/) { return false; } 343 344 virtual hpx::state get_state() const = 0; 345 virtual hpx::state get_state(std::size_t num_thread) const = 0; 346 347 virtual bool has_reached_state(hpx::state s) const = 0; 348 do_some_work(std::size_t)349 virtual void do_some_work(std::size_t /*num_thread*/) {} 350 report_error(std::size_t num,std::exception_ptr const & e)351 virtual void report_error(std::size_t num, std::exception_ptr const& e) 352 { 353 notifier_.on_error(num, e); 354 } 355 356 /////////////////////////////////////////////////////////////////////// 357 // detail::manage_executor implementation 358 359 /// \brief Return the requested policy element. 360 virtual std::size_t get_policy_element(detail::executor_parameter p, 361 error_code& ec = throws) const = 0; 362 363 // \brief Return statistics collected by this scheduler. 364 virtual void get_statistics(executor_statistics& stats, 365 error_code& ec = throws) const = 0; 366 367 // \brief Provide the given processing unit to the scheduler. 368 virtual void add_processing_unit(std::size_t virt_core, 369 std::size_t thread_num, error_code& ec = throws) = 0; 370 371 // \brief Remove the given processing unit from the scheduler. 372 virtual void remove_processing_unit(std::size_t thread_num, 373 error_code& ec = throws) = 0; 374 375 // \brief Return the description string of the underlying scheduler. 376 char const* get_description() const; 377 378 /// \endcond 379 380 /// Suspends the given processing unit. When the processing unit has 381 /// been suspended the returned future will be ready. 382 /// 383 /// \note Can only be called from an HPX thread. Use 384 /// suspend_processing_unit_cb or to suspend the processing unit 385 /// from outside HPX. Requires that the pool has 386 /// threads::policies::enable_elasticity set. 387 /// 388 /// \param virt_core [in] The processing unit on the the pool to be 389 /// suspended. The processing units are indexed 390 /// starting from 0. 391 /// 392 /// \returns A `future<void>` which is ready when the given processing 393 /// unit has been suspended. 394 /// 395 /// \throws hpx::exception if called from outside the HPX runtime. 396 virtual hpx::future<void> suspend_processing_unit(std::size_t virt_core) = 0; 397 398 /// Suspends the given processing unit. Takes a callback as a parameter 399 /// which will be called when the processing unit has been suspended. 400 /// 401 /// \note Requires that the pool has 402 /// threads::policies::enable_elasticity set. 403 /// 404 /// \param callback [in] Callback which is called when the processing 405 /// unit has been suspended. 406 /// \param virt_core [in] The processing unit to suspend. 407 /// \param ec [in,out] this represents the error status on exit, if this 408 /// is pre-initialized to \a hpx#throws the function will throw 409 /// on error instead. 410 virtual void suspend_processing_unit_cb( 411 std::function<void(void)> callback, std::size_t virt_core, 412 error_code& ec = throws) = 0; 413 414 /// Resumes the given processing unit. When the processing unit has been 415 /// resumed the returned future will be ready. 416 /// 417 /// \note Can only be called from an HPX thread. Use 418 /// resume_processing_unit_cb or to resume the processing unit 419 /// from outside HPX. Requires that the pool has 420 /// threads::policies::enable_elasticity set. 421 /// 422 /// \param virt_core [in] The processing unit on the the pool to be 423 /// resumed. The processing units are indexed starting 424 /// from 0. 425 /// 426 /// \returns A `future<void>` which is ready when the given processing 427 /// unit has been resumed. 428 virtual hpx::future<void> resume_processing_unit(std::size_t virt_core) = 0; 429 430 /// Resumes the given processing unit. Takes a callback as a parameter 431 /// which will be called when the processing unit has been resumed. 432 /// 433 /// \note Requires that the pool has 434 /// threads::policies::enable_elasticity set. 435 /// 436 /// \param callback [in] Callback which is called when the processing 437 /// unit has been suspended. 438 /// \param virt_core [in] The processing unit to resume. 439 /// \param ec [in,out] this represents the error status on exit, if this 440 /// is pre-initialized to \a hpx#throws the function will throw 441 /// on error instead. 442 virtual void resume_processing_unit_cb( 443 std::function<void(void)> callback, std::size_t virt_core, 444 error_code& ec = throws) = 0; 445 446 /// \cond NOINTERNAL get_scheduler_mode() const447 policies::scheduler_mode get_scheduler_mode() const 448 { 449 return mode_; 450 } 451 /// \endcond 452 453 protected: 454 /// \cond NOINTERNAL 455 void init_pool_time_scale(); 456 /// \endcond 457 458 protected: 459 /// \cond NOINTERNAL 460 pool_id_type id_; 461 462 // Mode of operation of the pool 463 policies::scheduler_mode mode_; 464 465 // The thread_offset is equal to the accumulated number of 466 // threads in all pools preceding this pool 467 // in the thread indexation. That means, that in order to know 468 // the global index of a thread it owns, the pool has to compute: 469 // global index = thread_offset_ + local index. 470 std::size_t thread_offset_; 471 472 // scale timestamps to nanoseconds 473 double timestamp_scale_; 474 475 // callback functions to invoke at start, stop, and error 476 threads::policies::callback_notifier& notifier_; 477 /// \endcond 478 }; 479 }} 480 481 #include <hpx/config/warnings_suffix.hpp> 482 483 #endif 484