1 //  Copyright (c) 2007-2018 Hartmut Kaiser
2 //
3 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
4 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5 
6 #if !defined(HPX_THREADMANAGER_SCHEDULING_SCHEDULER_BASE_JUL_14_2013_1132AM)
7 #define HPX_THREADMANAGER_SCHEDULING_SCHEDULER_BASE_JUL_14_2013_1132AM
8 
9 #include <hpx/config.hpp>
10 #include <hpx/compat/condition_variable.hpp>
11 #include <hpx/compat/mutex.hpp>
12 #include <hpx/runtime/agas/interface.hpp>
13 #include <hpx/runtime/config_entry.hpp>
14 #include <hpx/runtime/parcelset_fwd.hpp>
15 #include <hpx/runtime/resource/detail/partitioner.hpp>
16 #include <hpx/runtime/threads/policies/scheduler_mode.hpp>
17 #include <hpx/runtime/threads/thread_init_data.hpp>
18 #include <hpx/runtime/threads/thread_pool_base.hpp>
19 #include <hpx/state.hpp>
20 #include <hpx/util/assert.hpp>
21 #include <hpx/util/safe_lexical_cast.hpp>
22 #include <hpx/util/yield_while.hpp>
23 #include <hpx/util_fwd.hpp>
24 #if defined(HPX_HAVE_SCHEDULER_LOCAL_STORAGE)
25 #include <hpx/runtime/threads/coroutines/detail/tss.hpp>
26 #endif
27 
28 #include <algorithm>
29 #include <atomic>
30 #include <chrono>
31 #include <cmath>
32 #include <cstddef>
33 #include <cstdint>
34 #include <exception>
35 #include <limits>
36 #include <memory>
37 #include <mutex>
38 #include <set>
39 #include <utility>
40 #include <vector>
41 
42 #include <hpx/config/warnings_prefix.hpp>
43 
44 ///////////////////////////////////////////////////////////////////////////////
45 namespace hpx { namespace threads { namespace policies
46 {
47 #if defined(HPX_HAVE_THREAD_MANAGER_IDLE_BACKOFF)
48     namespace detail
49     {
50         struct reset_on_exit
51         {
reset_on_exithpx::threads::policies::detail::reset_on_exit52             reset_on_exit(std::atomic<std::int32_t>& counter)
53               : counter_(counter)
54             {
55                 ++counter_;
56                 HPX_ASSERT(counter_ > 0);
57             }
~reset_on_exithpx::threads::policies::detail::reset_on_exit58             ~reset_on_exit()
59             {
60                 HPX_ASSERT(counter_ > 0);
61                 --counter_;
62             }
63             std::atomic<std::int32_t>& counter_;
64         };
65     }
66 #endif
67 
68     ///////////////////////////////////////////////////////////////////////////
69     /// The scheduler_base defines the interface to be implemented by all
70     /// scheduler policies
71     struct scheduler_base
72     {
73     public:
74         HPX_NON_COPYABLE(scheduler_base);
75 
76     public:
77         typedef compat::mutex pu_mutex_type;
78 
scheduler_basehpx::threads::policies::scheduler_base79         scheduler_base(std::size_t num_threads,
80                 char const* description = "",
81                 scheduler_mode mode = nothing_special)
82           : mode_(mode)
83 #if defined(HPX_HAVE_THREAD_MANAGER_IDLE_BACKOFF)
84           , wait_count_(0)
85           , max_idle_backoff_time_(
86                 hpx::util::safe_lexical_cast<double>(
87                     hpx::get_config_entry("hpx.max_idle_backoff_time",
88                         HPX_IDLE_BACKOFF_TIME_MAX)))
89 #endif
90           , suspend_mtxs_(num_threads)
91           , suspend_conds_(num_threads)
92           , pu_mtxs_(num_threads)
93           , states_(num_threads)
94           , description_(description)
95           , parent_pool_(nullptr)
96           , background_thread_count_(0)
97         {
98             for (std::size_t i = 0; i != num_threads; ++i)
99                 states_[i].store(state_initialized);
100         }
101 
~scheduler_basehpx::threads::policies::scheduler_base102         virtual ~scheduler_base()
103         {
104         }
105 
get_parent_poolhpx::threads::policies::scheduler_base106         threads::thread_pool_base *get_parent_pool()
107         {
108             HPX_ASSERT(parent_pool_ != nullptr);
109             return parent_pool_;
110         }
111 
set_parent_poolhpx::threads::policies::scheduler_base112         void set_parent_pool(threads::thread_pool_base *p)
113         {
114             HPX_ASSERT(parent_pool_ == nullptr);
115             parent_pool_ = p;
116         }
117 
global_to_local_thread_indexhpx::threads::policies::scheduler_base118         inline std::size_t global_to_local_thread_index(std::size_t n)
119         {
120             return n - parent_pool_->get_thread_offset();
121         }
122 
local_to_global_thread_indexhpx::threads::policies::scheduler_base123         inline std::size_t local_to_global_thread_index(std::size_t n)
124         {
125             return n + parent_pool_->get_thread_offset();
126         }
127 
get_descriptionhpx::threads::policies::scheduler_base128         char const* get_description() const { return description_; }
129 
idle_callbackhpx::threads::policies::scheduler_base130         void idle_callback(std::size_t /*num_thread*/)
131         {
132 #if defined(HPX_HAVE_THREAD_MANAGER_IDLE_BACKOFF)
133             // Put this thread to sleep for some time, additionally it gets
134             // woken up on new work.
135 
136             // Exponential backoff with a maximum sleep time.
137             double exponent = (std::min)(double(wait_count_),
138                 double(std::numeric_limits<double>::max_exponent - 1));
139 
140             std::chrono::milliseconds period(std::lround(
141                 (std::min)(max_idle_backoff_time_, std::pow(2.0, exponent))));
142 
143             ++wait_count_;
144 
145             std::unique_lock<pu_mutex_type> l(mtx_);
146             cond_.wait_for(l, period);
147 #endif
148         }
149 
background_callbackhpx::threads::policies::scheduler_base150         bool background_callback(std::size_t num_thread)
151         {
152             bool result = false;
153             if (hpx::parcelset::do_background_work(num_thread))
154                 result = true;
155 
156             if (0 == num_thread)
157                 hpx::agas::garbage_collect_non_blocking();
158             return result;
159         }
160 
161         /// This function gets called by the thread-manager whenever new work
162         /// has been added, allowing the scheduler to reactivate one or more of
163         /// possibly idling OS threads
do_some_workhpx::threads::policies::scheduler_base164         void do_some_work(std::size_t num_thread)
165         {
166 #if defined(HPX_HAVE_THREAD_MANAGER_IDLE_BACKOFF)
167             wait_count_.store(0, std::memory_order_release);
168 
169             if (num_thread == std::size_t(-1))
170                 cond_.notify_all();
171             else
172                 cond_.notify_one();
173 #endif
174         }
175 
suspendhpx::threads::policies::scheduler_base176         virtual void suspend(std::size_t num_thread)
177         {
178             HPX_ASSERT(num_thread < suspend_conds_.size());
179 
180             states_[num_thread].store(state_sleeping);
181             std::unique_lock<pu_mutex_type> l(suspend_mtxs_[num_thread]);
182             suspend_conds_[num_thread].wait(l);
183 
184             // Only set running if still in state_sleeping. Can be set with
185             // non-blocking/locking functions to stopping or terminating, in
186             // which case the state is left untouched.
187             hpx::state expected = state_sleeping;
188             states_[num_thread].compare_exchange_strong(expected, state_running);
189 
190             HPX_ASSERT(expected == state_sleeping ||
191                 expected == state_stopping || expected == state_terminating);
192         }
193 
resumehpx::threads::policies::scheduler_base194         virtual void resume(std::size_t num_thread)
195         {
196             if (num_thread == std::size_t(-1))
197             {
198                 for (compat::condition_variable& c : suspend_conds_)
199                 {
200                     c.notify_one();
201                 }
202             }
203             else
204             {
205                 HPX_ASSERT(num_thread < suspend_conds_.size());
206                 suspend_conds_[num_thread].notify_one();
207             }
208         }
209 
select_active_puhpx::threads::policies::scheduler_base210         std::size_t select_active_pu(std::unique_lock<pu_mutex_type>& l,
211             std::size_t num_thread, bool allow_fallback = false)
212         {
213             if (mode_ & threads::policies::enable_elasticity)
214             {
215                 std::size_t states_size = states_.size();
216 
217                 if (!allow_fallback)
218                 {
219                     // Try indefinitely as long as at least one thread is
220                     // available for scheduling. Increase allowed state if no
221                     // threads are available for scheduling.
222                     auto max_allowed_state = state_suspended;
223 
224                     hpx::util::yield_while([this, states_size, &l, &num_thread,
225                                                &max_allowed_state]() {
226                         int num_allowed_threads = 0;
227 
228                         for (std::size_t offset = 0; offset < states_size;
229                              ++offset)
230                         {
231                             std::size_t num_thread_local =
232                                 (num_thread + offset) % states_size;
233 
234                             l = std::unique_lock<pu_mutex_type>(
235                                 pu_mtxs_[num_thread_local], std::try_to_lock);
236 
237                             if (l.owns_lock())
238                             {
239                                 if (states_[num_thread_local] <=
240                                     max_allowed_state)
241                                 {
242                                     num_thread = num_thread_local;
243                                     return false;
244                                 }
245 
246                                 l.unlock();
247                             }
248 
249                             if (states_[num_thread_local] <= max_allowed_state)
250                             {
251                                 ++num_allowed_threads;
252                             }
253                         }
254 
255                         if (0 == num_allowed_threads)
256                         {
257                             if (max_allowed_state <= state_suspended)
258                             {
259                                 max_allowed_state = state_sleeping;
260                             }
261                             else if (max_allowed_state <= state_sleeping)
262                             {
263                                 max_allowed_state = state_stopping;
264                             }
265                             else
266                             {
267                                 // All threads are terminating or stopped.
268                                 // Just return num_thread to avoid infinite
269                                 // loop.
270                                 return false;
271                             }
272                         }
273 
274                         // Yield after trying all pus, then try again
275                         return true;
276                     });
277 
278                     return num_thread;
279                 }
280 
281                 // Try all pus only once if fallback is allowed
282                 HPX_ASSERT(num_thread != std::size_t(-1));
283                 for (std::size_t offset = 0; offset < states_size; ++offset)
284                 {
285                     std::size_t num_thread_local =
286                         (num_thread + offset) % states_size;
287 
288                     l = std::unique_lock<pu_mutex_type>(
289                         pu_mtxs_[num_thread_local], std::try_to_lock);
290 
291                     if (l.owns_lock() &&
292                         states_[num_thread_local] <= state_suspended)
293                     {
294                         return num_thread_local;
295                     }
296                 }
297             }
298 
299             return num_thread;
300         }
301 
302         // allow to access/manipulate states
get_statehpx::threads::policies::scheduler_base303         std::atomic<hpx::state>& get_state(std::size_t num_thread)
304         {
305             HPX_ASSERT(num_thread < states_.size());
306             return states_[num_thread];
307         }
get_statehpx::threads::policies::scheduler_base308         std::atomic<hpx::state> const& get_state(std::size_t num_thread) const
309         {
310             HPX_ASSERT(num_thread < states_.size());
311             return states_[num_thread];
312         }
313 
set_all_stateshpx::threads::policies::scheduler_base314         void set_all_states(hpx::state s)
315         {
316             typedef std::atomic<hpx::state> state_type;
317             for (state_type& state : states_)
318             {
319                 state.store(s);
320             }
321         }
322 
set_all_states_at_leasthpx::threads::policies::scheduler_base323         void set_all_states_at_least(hpx::state s)
324         {
325             typedef std::atomic<hpx::state> state_type;
326             for (state_type& state : states_)
327             {
328                 if (state < s)
329                 {
330                     state.store(s);
331                 }
332             }
333         }
334 
335         // return whether all states are at least at the given one
has_reached_statehpx::threads::policies::scheduler_base336         bool has_reached_state(hpx::state s) const
337         {
338             typedef std::atomic<hpx::state> state_type;
339             for (state_type const& state : states_)
340             {
341                 if (state.load() < s)
342                     return false;
343             }
344             return true;
345         }
346 
is_statehpx::threads::policies::scheduler_base347         bool is_state(hpx::state s) const
348         {
349             typedef std::atomic<hpx::state> state_type;
350             for (state_type const& state : states_)
351             {
352                 if (state.load() != s)
353                     return false;
354             }
355             return true;
356         }
357 
get_minmax_statehpx::threads::policies::scheduler_base358         std::pair<hpx::state, hpx::state> get_minmax_state() const
359         {
360             std::pair<hpx::state, hpx::state> result(
361                 last_valid_runtime_state, first_valid_runtime_state);
362 
363             typedef std::atomic<hpx::state> state_type;
364             for (state_type const& state_iter : states_)
365             {
366                 hpx::state s = state_iter.load();
367                 result.first = (std::min)(result.first, s);
368                 result.second = (std::max)(result.second, s);
369             }
370 
371             return result;
372         }
373 
374         // get/set scheduler mode
get_scheduler_modehpx::threads::policies::scheduler_base375         scheduler_mode get_scheduler_mode() const
376         {
377             return mode_.load(std::memory_order_acquire);
378         }
379 
set_scheduler_modehpx::threads::policies::scheduler_base380         void set_scheduler_mode(scheduler_mode mode)
381         {
382             mode_.store(mode);
383         }
384 
get_pu_mutexhpx::threads::policies::scheduler_base385         pu_mutex_type& get_pu_mutex(std::size_t num_thread)
386         {
387             HPX_ASSERT(num_thread < pu_mtxs_.size());
388             return pu_mtxs_[num_thread];
389         }
390 
391         ///////////////////////////////////////////////////////////////////////
numa_sensitivehpx::threads::policies::scheduler_base392         virtual bool numa_sensitive() const { return false; }
has_thread_stealinghpx::threads::policies::scheduler_base393         virtual bool has_thread_stealing() const { return false; }
394 
domain_from_local_thread_indexhpx::threads::policies::scheduler_base395         inline std::size_t domain_from_local_thread_index(std::size_t n)
396         {
397             auto &rp = resource::get_partitioner();
398             auto const& topo = rp.get_topology();
399             std::size_t global_id = local_to_global_thread_index(n);
400             std::size_t pu_num = rp.get_pu_num(global_id);
401 
402             return topo.get_numa_node_number(pu_num);
403         }
404 
405         // assumes queues use index 0..N-1 and correspond to the pool cores
num_domainshpx::threads::policies::scheduler_base406         std::size_t num_domains(const std::size_t workers)
407         {
408             auto &rp = resource::get_partitioner();
409             auto const& topo = rp.get_topology();
410 
411             std::set<std::size_t> domains;
412             for (std::size_t local_id = 0; local_id != workers; ++local_id)
413             {
414                 std::size_t global_id = local_to_global_thread_index(local_id);
415                 std::size_t pu_num = rp.get_pu_num(global_id);
416                 std::size_t dom = topo.get_numa_node_number(pu_num);
417                 domains.insert(dom);
418             }
419             return domains.size();
420         }
421 
422         // either threads in same domain, or not in same domain
423         // depending on the predicate
domain_threadshpx::threads::policies::scheduler_base424         std::vector<std::size_t> domain_threads(
425             std::size_t local_id, const std::vector<std::size_t> &ts,
426             std::function<bool(std::size_t, std::size_t)> pred)
427         {
428             std::vector<std::size_t> result;
429             auto &rp = resource::get_partitioner();
430             auto const& topo = rp.get_topology();
431             std::size_t global_id = local_to_global_thread_index(local_id);
432             std::size_t pu_num = rp.get_pu_num(global_id);
433             std::size_t numa = topo.get_numa_node_number(pu_num);
434             for (auto local_id : ts)
435             {
436                 global_id = local_to_global_thread_index(local_id);
437                 pu_num = rp.get_pu_num(global_id);
438                 if (pred(numa, topo.get_numa_node_number(pu_num)))
439                 {
440                     result.push_back(local_id);
441                 }
442             }
443             return result;
444         }
445 
446 #ifdef HPX_HAVE_THREAD_CREATION_AND_CLEANUP_RATES
447         virtual std::uint64_t get_creation_time(bool reset) = 0;
448         virtual std::uint64_t get_cleanup_time(bool reset) = 0;
449 #endif
450 
451 #ifdef HPX_HAVE_THREAD_STEALING_COUNTS
452         virtual std::int64_t get_num_pending_misses(std::size_t num_thread,
453             bool reset) = 0;
454         virtual std::int64_t get_num_pending_accesses(std::size_t num_thread,
455             bool reset) = 0;
456 
457         virtual std::int64_t get_num_stolen_from_pending(std::size_t num_thread,
458             bool reset) = 0;
459         virtual std::int64_t get_num_stolen_to_pending(std::size_t num_thread,
460             bool reset) = 0;
461         virtual std::int64_t get_num_stolen_from_staged(std::size_t num_thread,
462             bool reset) = 0;
463         virtual std::int64_t get_num_stolen_to_staged(std::size_t num_thread,
464             bool reset) = 0;
465 #endif
466 
467         virtual std::int64_t get_queue_length(
468             std::size_t num_thread = std::size_t(-1)) const = 0;
469 
470         virtual std::int64_t get_thread_count(
471             thread_state_enum state = unknown,
472             thread_priority priority = thread_priority_default,
473             std::size_t num_thread = std::size_t(-1),
474             bool reset = false) const = 0;
475 
get_background_thread_counthpx::threads::policies::scheduler_base476         std::int64_t get_background_thread_count()
477         {
478             return background_thread_count_;
479         }
480 
increment_background_thread_counthpx::threads::policies::scheduler_base481         void increment_background_thread_count()
482         {
483             ++background_thread_count_;
484         }
485 
decrement_background_thread_counthpx::threads::policies::scheduler_base486         void decrement_background_thread_count()
487         {
488             --background_thread_count_;
489         }
490 
491         // Enumerate all matching threads
492         virtual bool enumerate_threads(
493             util::function_nonser<bool(thread_id_type)> const& f,
494             thread_state_enum state = unknown) const = 0;
495 
496         virtual void abort_all_suspended_threads() = 0;
497 
498         virtual bool cleanup_terminated(bool delete_all) = 0;
499         virtual bool cleanup_terminated(std::size_t num_thread, bool delete_all) = 0;
500 
501         virtual void create_thread(thread_init_data& data, thread_id_type* id,
502             thread_state_enum initial_state, bool run_now, error_code& ec) = 0;
503 
504         virtual bool get_next_thread(std::size_t num_thread, bool running,
505             std::int64_t& idle_loop_count, threads::thread_data*& thrd) = 0;
506 
507         virtual void schedule_thread(threads::thread_data* thrd,
508             threads::thread_schedule_hint schedulehint,
509             bool allow_fallback = false,
510             thread_priority priority = thread_priority_normal) = 0;
511 
512         virtual void schedule_thread_last(threads::thread_data* thrd,
513             threads::thread_schedule_hint schedulehint,
514             bool allow_fallback = false,
515             thread_priority priority = thread_priority_normal) = 0;
516 
517         virtual void destroy_thread(threads::thread_data* thrd,
518             std::int64_t& busy_count) = 0;
519 
520         virtual bool wait_or_add_new(std::size_t num_thread, bool running,
521             std::int64_t& idle_loop_count) = 0;
522 
523         virtual void on_start_thread(std::size_t num_thread) = 0;
524         virtual void on_stop_thread(std::size_t num_thread) = 0;
525         virtual void on_error(std::size_t num_thread,
526             std::exception_ptr const& e) = 0;
527 
528 #ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME
529         virtual std::int64_t get_average_thread_wait_time(
530             std::size_t num_thread = std::size_t(-1)) const = 0;
531         virtual std::int64_t get_average_task_wait_time(
532             std::size_t num_thread = std::size_t(-1)) const = 0;
533 #endif
534 
start_periodic_maintenancehpx::threads::policies::scheduler_base535         virtual void start_periodic_maintenance(
536             std::atomic<hpx::state>& /*global_state*/)
537         {}
538 
reset_thread_distributionhpx::threads::policies::scheduler_base539         virtual void reset_thread_distribution() {}
540 
541     protected:
542         std::atomic<scheduler_mode> mode_;
543 
544 #if defined(HPX_HAVE_THREAD_MANAGER_IDLE_BACKOFF)
545         // support for suspension on idle queues
546         pu_mutex_type mtx_;
547         compat::condition_variable cond_;
548         std::atomic<std::uint32_t> wait_count_;
549         double max_idle_backoff_time_;
550 #endif
551 
552         // support for suspension of pus
553         std::vector<pu_mutex_type> suspend_mtxs_;
554         std::vector<compat::condition_variable> suspend_conds_;
555 
556         std::vector<pu_mutex_type> pu_mtxs_;
557 
558         std::vector<std::atomic<hpx::state> > states_;
559         char const* description_;
560 
561         // the pool that owns this scheduler
562         threads::thread_pool_base *parent_pool_;
563 
564         std::atomic<std::int64_t> background_thread_count_;
565 
566 #if defined(HPX_HAVE_SCHEDULER_LOCAL_STORAGE)
567     public:
find_tss_datahpx::threads::policies::scheduler_base568         coroutines::detail::tss_data_node* find_tss_data(void const* key)
569         {
570             if (!thread_data_)
571                 return nullptr;
572             return thread_data_->find(key);
573         }
574 
add_new_tss_nodehpx::threads::policies::scheduler_base575         void add_new_tss_node(void const* key,
576             std::shared_ptr<coroutines::detail::tss_cleanup_function>
577                 const& func, void* tss_data)
578         {
579             if (!thread_data_)
580             {
581                 thread_data_ =
582                     std::make_shared<coroutines::detail::tss_storage>();
583             }
584             thread_data_->insert(key, func, tss_data);
585         }
586 
erase_tss_nodehpx::threads::policies::scheduler_base587         void erase_tss_node(void const* key, bool cleanup_existing)
588         {
589             if (thread_data_)
590                 thread_data_->erase(key, cleanup_existing);
591         }
592 
get_tss_datahpx::threads::policies::scheduler_base593         void* get_tss_data(void const* key)
594         {
595             if (coroutines::detail::tss_data_node* const current_node =
596                     find_tss_data(key))
597             {
598                 return current_node->get_value();
599             }
600             return nullptr;
601         }
602 
set_tss_datahpx::threads::policies::scheduler_base603         void set_tss_data(void const* key,
604             std::shared_ptr<coroutines::detail::tss_cleanup_function>
605                 const& func, void* tss_data, bool cleanup_existing)
606         {
607             if (coroutines::detail::tss_data_node* const current_node =
608                     find_tss_data(key))
609             {
610                 if (func || (tss_data != 0))
611                     current_node->reinit(func, tss_data, cleanup_existing);
612                 else
613                     erase_tss_node(key, cleanup_existing);
614             }
615             else if(func || (tss_data != 0))
616             {
617                 add_new_tss_node(key, func, tss_data);
618             }
619         }
620 
621     protected:
622         std::shared_ptr<coroutines::detail::tss_storage> thread_data_;
623 #endif
624     };
625 }}}
626 
627 #include <hpx/config/warnings_suffix.hpp>
628 
629 #endif
630