1 // Copyright (c) 2007-2018 Hartmut Kaiser 2 // 3 // Distributed under the Boost Software License, Version 1.0. (See accompanying 4 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 5 6 #if !defined(HPX_THREADMANAGER_SCHEDULING_SCHEDULER_BASE_JUL_14_2013_1132AM) 7 #define HPX_THREADMANAGER_SCHEDULING_SCHEDULER_BASE_JUL_14_2013_1132AM 8 9 #include <hpx/config.hpp> 10 #include <hpx/compat/condition_variable.hpp> 11 #include <hpx/compat/mutex.hpp> 12 #include <hpx/runtime/agas/interface.hpp> 13 #include <hpx/runtime/config_entry.hpp> 14 #include <hpx/runtime/parcelset_fwd.hpp> 15 #include <hpx/runtime/resource/detail/partitioner.hpp> 16 #include <hpx/runtime/threads/policies/scheduler_mode.hpp> 17 #include <hpx/runtime/threads/thread_init_data.hpp> 18 #include <hpx/runtime/threads/thread_pool_base.hpp> 19 #include <hpx/state.hpp> 20 #include <hpx/util/assert.hpp> 21 #include <hpx/util/safe_lexical_cast.hpp> 22 #include <hpx/util/yield_while.hpp> 23 #include <hpx/util_fwd.hpp> 24 #if defined(HPX_HAVE_SCHEDULER_LOCAL_STORAGE) 25 #include <hpx/runtime/threads/coroutines/detail/tss.hpp> 26 #endif 27 28 #include <algorithm> 29 #include <atomic> 30 #include <chrono> 31 #include <cmath> 32 #include <cstddef> 33 #include <cstdint> 34 #include <exception> 35 #include <limits> 36 #include <memory> 37 #include <mutex> 38 #include <set> 39 #include <utility> 40 #include <vector> 41 42 #include <hpx/config/warnings_prefix.hpp> 43 44 /////////////////////////////////////////////////////////////////////////////// 45 namespace hpx { namespace threads { namespace policies 46 { 47 #if defined(HPX_HAVE_THREAD_MANAGER_IDLE_BACKOFF) 48 namespace detail 49 { 50 struct reset_on_exit 51 { reset_on_exithpx::threads::policies::detail::reset_on_exit52 reset_on_exit(std::atomic<std::int32_t>& counter) 53 : counter_(counter) 54 { 55 ++counter_; 56 HPX_ASSERT(counter_ > 0); 57 } ~reset_on_exithpx::threads::policies::detail::reset_on_exit58 ~reset_on_exit() 59 { 60 HPX_ASSERT(counter_ > 0); 61 --counter_; 62 } 63 std::atomic<std::int32_t>& counter_; 64 }; 65 } 66 #endif 67 68 /////////////////////////////////////////////////////////////////////////// 69 /// The scheduler_base defines the interface to be implemented by all 70 /// scheduler policies 71 struct scheduler_base 72 { 73 public: 74 HPX_NON_COPYABLE(scheduler_base); 75 76 public: 77 typedef compat::mutex pu_mutex_type; 78 scheduler_basehpx::threads::policies::scheduler_base79 scheduler_base(std::size_t num_threads, 80 char const* description = "", 81 scheduler_mode mode = nothing_special) 82 : mode_(mode) 83 #if defined(HPX_HAVE_THREAD_MANAGER_IDLE_BACKOFF) 84 , wait_count_(0) 85 , max_idle_backoff_time_( 86 hpx::util::safe_lexical_cast<double>( 87 hpx::get_config_entry("hpx.max_idle_backoff_time", 88 HPX_IDLE_BACKOFF_TIME_MAX))) 89 #endif 90 , suspend_mtxs_(num_threads) 91 , suspend_conds_(num_threads) 92 , pu_mtxs_(num_threads) 93 , states_(num_threads) 94 , description_(description) 95 , parent_pool_(nullptr) 96 , background_thread_count_(0) 97 { 98 for (std::size_t i = 0; i != num_threads; ++i) 99 states_[i].store(state_initialized); 100 } 101 ~scheduler_basehpx::threads::policies::scheduler_base102 virtual ~scheduler_base() 103 { 104 } 105 get_parent_poolhpx::threads::policies::scheduler_base106 threads::thread_pool_base *get_parent_pool() 107 { 108 HPX_ASSERT(parent_pool_ != nullptr); 109 return parent_pool_; 110 } 111 set_parent_poolhpx::threads::policies::scheduler_base112 void set_parent_pool(threads::thread_pool_base *p) 113 { 114 HPX_ASSERT(parent_pool_ == nullptr); 115 parent_pool_ = p; 116 } 117 global_to_local_thread_indexhpx::threads::policies::scheduler_base118 inline std::size_t global_to_local_thread_index(std::size_t n) 119 { 120 return n - parent_pool_->get_thread_offset(); 121 } 122 local_to_global_thread_indexhpx::threads::policies::scheduler_base123 inline std::size_t local_to_global_thread_index(std::size_t n) 124 { 125 return n + parent_pool_->get_thread_offset(); 126 } 127 get_descriptionhpx::threads::policies::scheduler_base128 char const* get_description() const { return description_; } 129 idle_callbackhpx::threads::policies::scheduler_base130 void idle_callback(std::size_t /*num_thread*/) 131 { 132 #if defined(HPX_HAVE_THREAD_MANAGER_IDLE_BACKOFF) 133 // Put this thread to sleep for some time, additionally it gets 134 // woken up on new work. 135 136 // Exponential backoff with a maximum sleep time. 137 double exponent = (std::min)(double(wait_count_), 138 double(std::numeric_limits<double>::max_exponent - 1)); 139 140 std::chrono::milliseconds period(std::lround( 141 (std::min)(max_idle_backoff_time_, std::pow(2.0, exponent)))); 142 143 ++wait_count_; 144 145 std::unique_lock<pu_mutex_type> l(mtx_); 146 cond_.wait_for(l, period); 147 #endif 148 } 149 background_callbackhpx::threads::policies::scheduler_base150 bool background_callback(std::size_t num_thread) 151 { 152 bool result = false; 153 if (hpx::parcelset::do_background_work(num_thread)) 154 result = true; 155 156 if (0 == num_thread) 157 hpx::agas::garbage_collect_non_blocking(); 158 return result; 159 } 160 161 /// This function gets called by the thread-manager whenever new work 162 /// has been added, allowing the scheduler to reactivate one or more of 163 /// possibly idling OS threads do_some_workhpx::threads::policies::scheduler_base164 void do_some_work(std::size_t num_thread) 165 { 166 #if defined(HPX_HAVE_THREAD_MANAGER_IDLE_BACKOFF) 167 wait_count_.store(0, std::memory_order_release); 168 169 if (num_thread == std::size_t(-1)) 170 cond_.notify_all(); 171 else 172 cond_.notify_one(); 173 #endif 174 } 175 suspendhpx::threads::policies::scheduler_base176 virtual void suspend(std::size_t num_thread) 177 { 178 HPX_ASSERT(num_thread < suspend_conds_.size()); 179 180 states_[num_thread].store(state_sleeping); 181 std::unique_lock<pu_mutex_type> l(suspend_mtxs_[num_thread]); 182 suspend_conds_[num_thread].wait(l); 183 184 // Only set running if still in state_sleeping. Can be set with 185 // non-blocking/locking functions to stopping or terminating, in 186 // which case the state is left untouched. 187 hpx::state expected = state_sleeping; 188 states_[num_thread].compare_exchange_strong(expected, state_running); 189 190 HPX_ASSERT(expected == state_sleeping || 191 expected == state_stopping || expected == state_terminating); 192 } 193 resumehpx::threads::policies::scheduler_base194 virtual void resume(std::size_t num_thread) 195 { 196 if (num_thread == std::size_t(-1)) 197 { 198 for (compat::condition_variable& c : suspend_conds_) 199 { 200 c.notify_one(); 201 } 202 } 203 else 204 { 205 HPX_ASSERT(num_thread < suspend_conds_.size()); 206 suspend_conds_[num_thread].notify_one(); 207 } 208 } 209 select_active_puhpx::threads::policies::scheduler_base210 std::size_t select_active_pu(std::unique_lock<pu_mutex_type>& l, 211 std::size_t num_thread, bool allow_fallback = false) 212 { 213 if (mode_ & threads::policies::enable_elasticity) 214 { 215 std::size_t states_size = states_.size(); 216 217 if (!allow_fallback) 218 { 219 // Try indefinitely as long as at least one thread is 220 // available for scheduling. Increase allowed state if no 221 // threads are available for scheduling. 222 auto max_allowed_state = state_suspended; 223 224 hpx::util::yield_while([this, states_size, &l, &num_thread, 225 &max_allowed_state]() { 226 int num_allowed_threads = 0; 227 228 for (std::size_t offset = 0; offset < states_size; 229 ++offset) 230 { 231 std::size_t num_thread_local = 232 (num_thread + offset) % states_size; 233 234 l = std::unique_lock<pu_mutex_type>( 235 pu_mtxs_[num_thread_local], std::try_to_lock); 236 237 if (l.owns_lock()) 238 { 239 if (states_[num_thread_local] <= 240 max_allowed_state) 241 { 242 num_thread = num_thread_local; 243 return false; 244 } 245 246 l.unlock(); 247 } 248 249 if (states_[num_thread_local] <= max_allowed_state) 250 { 251 ++num_allowed_threads; 252 } 253 } 254 255 if (0 == num_allowed_threads) 256 { 257 if (max_allowed_state <= state_suspended) 258 { 259 max_allowed_state = state_sleeping; 260 } 261 else if (max_allowed_state <= state_sleeping) 262 { 263 max_allowed_state = state_stopping; 264 } 265 else 266 { 267 // All threads are terminating or stopped. 268 // Just return num_thread to avoid infinite 269 // loop. 270 return false; 271 } 272 } 273 274 // Yield after trying all pus, then try again 275 return true; 276 }); 277 278 return num_thread; 279 } 280 281 // Try all pus only once if fallback is allowed 282 HPX_ASSERT(num_thread != std::size_t(-1)); 283 for (std::size_t offset = 0; offset < states_size; ++offset) 284 { 285 std::size_t num_thread_local = 286 (num_thread + offset) % states_size; 287 288 l = std::unique_lock<pu_mutex_type>( 289 pu_mtxs_[num_thread_local], std::try_to_lock); 290 291 if (l.owns_lock() && 292 states_[num_thread_local] <= state_suspended) 293 { 294 return num_thread_local; 295 } 296 } 297 } 298 299 return num_thread; 300 } 301 302 // allow to access/manipulate states get_statehpx::threads::policies::scheduler_base303 std::atomic<hpx::state>& get_state(std::size_t num_thread) 304 { 305 HPX_ASSERT(num_thread < states_.size()); 306 return states_[num_thread]; 307 } get_statehpx::threads::policies::scheduler_base308 std::atomic<hpx::state> const& get_state(std::size_t num_thread) const 309 { 310 HPX_ASSERT(num_thread < states_.size()); 311 return states_[num_thread]; 312 } 313 set_all_stateshpx::threads::policies::scheduler_base314 void set_all_states(hpx::state s) 315 { 316 typedef std::atomic<hpx::state> state_type; 317 for (state_type& state : states_) 318 { 319 state.store(s); 320 } 321 } 322 set_all_states_at_leasthpx::threads::policies::scheduler_base323 void set_all_states_at_least(hpx::state s) 324 { 325 typedef std::atomic<hpx::state> state_type; 326 for (state_type& state : states_) 327 { 328 if (state < s) 329 { 330 state.store(s); 331 } 332 } 333 } 334 335 // return whether all states are at least at the given one has_reached_statehpx::threads::policies::scheduler_base336 bool has_reached_state(hpx::state s) const 337 { 338 typedef std::atomic<hpx::state> state_type; 339 for (state_type const& state : states_) 340 { 341 if (state.load() < s) 342 return false; 343 } 344 return true; 345 } 346 is_statehpx::threads::policies::scheduler_base347 bool is_state(hpx::state s) const 348 { 349 typedef std::atomic<hpx::state> state_type; 350 for (state_type const& state : states_) 351 { 352 if (state.load() != s) 353 return false; 354 } 355 return true; 356 } 357 get_minmax_statehpx::threads::policies::scheduler_base358 std::pair<hpx::state, hpx::state> get_minmax_state() const 359 { 360 std::pair<hpx::state, hpx::state> result( 361 last_valid_runtime_state, first_valid_runtime_state); 362 363 typedef std::atomic<hpx::state> state_type; 364 for (state_type const& state_iter : states_) 365 { 366 hpx::state s = state_iter.load(); 367 result.first = (std::min)(result.first, s); 368 result.second = (std::max)(result.second, s); 369 } 370 371 return result; 372 } 373 374 // get/set scheduler mode get_scheduler_modehpx::threads::policies::scheduler_base375 scheduler_mode get_scheduler_mode() const 376 { 377 return mode_.load(std::memory_order_acquire); 378 } 379 set_scheduler_modehpx::threads::policies::scheduler_base380 void set_scheduler_mode(scheduler_mode mode) 381 { 382 mode_.store(mode); 383 } 384 get_pu_mutexhpx::threads::policies::scheduler_base385 pu_mutex_type& get_pu_mutex(std::size_t num_thread) 386 { 387 HPX_ASSERT(num_thread < pu_mtxs_.size()); 388 return pu_mtxs_[num_thread]; 389 } 390 391 /////////////////////////////////////////////////////////////////////// numa_sensitivehpx::threads::policies::scheduler_base392 virtual bool numa_sensitive() const { return false; } has_thread_stealinghpx::threads::policies::scheduler_base393 virtual bool has_thread_stealing() const { return false; } 394 domain_from_local_thread_indexhpx::threads::policies::scheduler_base395 inline std::size_t domain_from_local_thread_index(std::size_t n) 396 { 397 auto &rp = resource::get_partitioner(); 398 auto const& topo = rp.get_topology(); 399 std::size_t global_id = local_to_global_thread_index(n); 400 std::size_t pu_num = rp.get_pu_num(global_id); 401 402 return topo.get_numa_node_number(pu_num); 403 } 404 405 // assumes queues use index 0..N-1 and correspond to the pool cores num_domainshpx::threads::policies::scheduler_base406 std::size_t num_domains(const std::size_t workers) 407 { 408 auto &rp = resource::get_partitioner(); 409 auto const& topo = rp.get_topology(); 410 411 std::set<std::size_t> domains; 412 for (std::size_t local_id = 0; local_id != workers; ++local_id) 413 { 414 std::size_t global_id = local_to_global_thread_index(local_id); 415 std::size_t pu_num = rp.get_pu_num(global_id); 416 std::size_t dom = topo.get_numa_node_number(pu_num); 417 domains.insert(dom); 418 } 419 return domains.size(); 420 } 421 422 // either threads in same domain, or not in same domain 423 // depending on the predicate domain_threadshpx::threads::policies::scheduler_base424 std::vector<std::size_t> domain_threads( 425 std::size_t local_id, const std::vector<std::size_t> &ts, 426 std::function<bool(std::size_t, std::size_t)> pred) 427 { 428 std::vector<std::size_t> result; 429 auto &rp = resource::get_partitioner(); 430 auto const& topo = rp.get_topology(); 431 std::size_t global_id = local_to_global_thread_index(local_id); 432 std::size_t pu_num = rp.get_pu_num(global_id); 433 std::size_t numa = topo.get_numa_node_number(pu_num); 434 for (auto local_id : ts) 435 { 436 global_id = local_to_global_thread_index(local_id); 437 pu_num = rp.get_pu_num(global_id); 438 if (pred(numa, topo.get_numa_node_number(pu_num))) 439 { 440 result.push_back(local_id); 441 } 442 } 443 return result; 444 } 445 446 #ifdef HPX_HAVE_THREAD_CREATION_AND_CLEANUP_RATES 447 virtual std::uint64_t get_creation_time(bool reset) = 0; 448 virtual std::uint64_t get_cleanup_time(bool reset) = 0; 449 #endif 450 451 #ifdef HPX_HAVE_THREAD_STEALING_COUNTS 452 virtual std::int64_t get_num_pending_misses(std::size_t num_thread, 453 bool reset) = 0; 454 virtual std::int64_t get_num_pending_accesses(std::size_t num_thread, 455 bool reset) = 0; 456 457 virtual std::int64_t get_num_stolen_from_pending(std::size_t num_thread, 458 bool reset) = 0; 459 virtual std::int64_t get_num_stolen_to_pending(std::size_t num_thread, 460 bool reset) = 0; 461 virtual std::int64_t get_num_stolen_from_staged(std::size_t num_thread, 462 bool reset) = 0; 463 virtual std::int64_t get_num_stolen_to_staged(std::size_t num_thread, 464 bool reset) = 0; 465 #endif 466 467 virtual std::int64_t get_queue_length( 468 std::size_t num_thread = std::size_t(-1)) const = 0; 469 470 virtual std::int64_t get_thread_count( 471 thread_state_enum state = unknown, 472 thread_priority priority = thread_priority_default, 473 std::size_t num_thread = std::size_t(-1), 474 bool reset = false) const = 0; 475 get_background_thread_counthpx::threads::policies::scheduler_base476 std::int64_t get_background_thread_count() 477 { 478 return background_thread_count_; 479 } 480 increment_background_thread_counthpx::threads::policies::scheduler_base481 void increment_background_thread_count() 482 { 483 ++background_thread_count_; 484 } 485 decrement_background_thread_counthpx::threads::policies::scheduler_base486 void decrement_background_thread_count() 487 { 488 --background_thread_count_; 489 } 490 491 // Enumerate all matching threads 492 virtual bool enumerate_threads( 493 util::function_nonser<bool(thread_id_type)> const& f, 494 thread_state_enum state = unknown) const = 0; 495 496 virtual void abort_all_suspended_threads() = 0; 497 498 virtual bool cleanup_terminated(bool delete_all) = 0; 499 virtual bool cleanup_terminated(std::size_t num_thread, bool delete_all) = 0; 500 501 virtual void create_thread(thread_init_data& data, thread_id_type* id, 502 thread_state_enum initial_state, bool run_now, error_code& ec) = 0; 503 504 virtual bool get_next_thread(std::size_t num_thread, bool running, 505 std::int64_t& idle_loop_count, threads::thread_data*& thrd) = 0; 506 507 virtual void schedule_thread(threads::thread_data* thrd, 508 threads::thread_schedule_hint schedulehint, 509 bool allow_fallback = false, 510 thread_priority priority = thread_priority_normal) = 0; 511 512 virtual void schedule_thread_last(threads::thread_data* thrd, 513 threads::thread_schedule_hint schedulehint, 514 bool allow_fallback = false, 515 thread_priority priority = thread_priority_normal) = 0; 516 517 virtual void destroy_thread(threads::thread_data* thrd, 518 std::int64_t& busy_count) = 0; 519 520 virtual bool wait_or_add_new(std::size_t num_thread, bool running, 521 std::int64_t& idle_loop_count) = 0; 522 523 virtual void on_start_thread(std::size_t num_thread) = 0; 524 virtual void on_stop_thread(std::size_t num_thread) = 0; 525 virtual void on_error(std::size_t num_thread, 526 std::exception_ptr const& e) = 0; 527 528 #ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME 529 virtual std::int64_t get_average_thread_wait_time( 530 std::size_t num_thread = std::size_t(-1)) const = 0; 531 virtual std::int64_t get_average_task_wait_time( 532 std::size_t num_thread = std::size_t(-1)) const = 0; 533 #endif 534 start_periodic_maintenancehpx::threads::policies::scheduler_base535 virtual void start_periodic_maintenance( 536 std::atomic<hpx::state>& /*global_state*/) 537 {} 538 reset_thread_distributionhpx::threads::policies::scheduler_base539 virtual void reset_thread_distribution() {} 540 541 protected: 542 std::atomic<scheduler_mode> mode_; 543 544 #if defined(HPX_HAVE_THREAD_MANAGER_IDLE_BACKOFF) 545 // support for suspension on idle queues 546 pu_mutex_type mtx_; 547 compat::condition_variable cond_; 548 std::atomic<std::uint32_t> wait_count_; 549 double max_idle_backoff_time_; 550 #endif 551 552 // support for suspension of pus 553 std::vector<pu_mutex_type> suspend_mtxs_; 554 std::vector<compat::condition_variable> suspend_conds_; 555 556 std::vector<pu_mutex_type> pu_mtxs_; 557 558 std::vector<std::atomic<hpx::state> > states_; 559 char const* description_; 560 561 // the pool that owns this scheduler 562 threads::thread_pool_base *parent_pool_; 563 564 std::atomic<std::int64_t> background_thread_count_; 565 566 #if defined(HPX_HAVE_SCHEDULER_LOCAL_STORAGE) 567 public: find_tss_datahpx::threads::policies::scheduler_base568 coroutines::detail::tss_data_node* find_tss_data(void const* key) 569 { 570 if (!thread_data_) 571 return nullptr; 572 return thread_data_->find(key); 573 } 574 add_new_tss_nodehpx::threads::policies::scheduler_base575 void add_new_tss_node(void const* key, 576 std::shared_ptr<coroutines::detail::tss_cleanup_function> 577 const& func, void* tss_data) 578 { 579 if (!thread_data_) 580 { 581 thread_data_ = 582 std::make_shared<coroutines::detail::tss_storage>(); 583 } 584 thread_data_->insert(key, func, tss_data); 585 } 586 erase_tss_nodehpx::threads::policies::scheduler_base587 void erase_tss_node(void const* key, bool cleanup_existing) 588 { 589 if (thread_data_) 590 thread_data_->erase(key, cleanup_existing); 591 } 592 get_tss_datahpx::threads::policies::scheduler_base593 void* get_tss_data(void const* key) 594 { 595 if (coroutines::detail::tss_data_node* const current_node = 596 find_tss_data(key)) 597 { 598 return current_node->get_value(); 599 } 600 return nullptr; 601 } 602 set_tss_datahpx::threads::policies::scheduler_base603 void set_tss_data(void const* key, 604 std::shared_ptr<coroutines::detail::tss_cleanup_function> 605 const& func, void* tss_data, bool cleanup_existing) 606 { 607 if (coroutines::detail::tss_data_node* const current_node = 608 find_tss_data(key)) 609 { 610 if (func || (tss_data != 0)) 611 current_node->reinit(func, tss_data, cleanup_existing); 612 else 613 erase_tss_node(key, cleanup_existing); 614 } 615 else if(func || (tss_data != 0)) 616 { 617 add_new_tss_node(key, func, tss_data); 618 } 619 } 620 621 protected: 622 std::shared_ptr<coroutines::detail::tss_storage> thread_data_; 623 #endif 624 }; 625 }}} 626 627 #include <hpx/config/warnings_suffix.hpp> 628 629 #endif 630