1 //  Copyright (c) 2007-2017 Hartmut Kaiser
2 //  Copyright (c) 2007-2009 Chirag Dekate, Anshul Tandon
3 //  Copyright (c)      2011 Bryce Lelbach, Katelyn Kufahl
4 //  Copyright (c)      2017 Shoshana Jakobovits
5 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
6 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 
8 #if !defined(HPX_THREADMANAGER_HPP)
9 #define HPX_THREADMANAGER_HPP
10 
11 #include <hpx/config.hpp>
12 #include <hpx/compat/barrier.hpp>
13 #include <hpx/compat/mutex.hpp>
14 #include <hpx/compat/thread.hpp>
15 #include <hpx/exception_fwd.hpp>
16 #include <hpx/performance_counters/counters_fwd.hpp>
17 #include <hpx/runtime/naming/name.hpp>
18 #include <hpx/runtime/resource/detail/partitioner.hpp>
19 #include <hpx/runtime/threads/detail/thread_num_tss.hpp>
20 #include <hpx/runtime/threads/policies/scheduler_mode.hpp>
21 #include <hpx/runtime/threads/thread_init_data.hpp>
22 #include <hpx/runtime/threads/thread_pool_base.hpp>
23 #include <hpx/state.hpp>
24 #include <hpx/util/block_profiler.hpp>
25 #include <hpx/util/io_service_pool.hpp>
26 #include <hpx/util/spinlock.hpp>
27 
28 #include <atomic>
29 #include <cstddef>
30 #include <cstdint>
31 #include <exception>
32 #include <iosfwd>
33 #include <memory>
34 #include <mutex>
35 #include <numeric>
36 #include <string>
37 #include <type_traits>
38 #include <vector>
39 #include <utility>
40 
41 #include <hpx/config/warnings_prefix.hpp>
42 
43 namespace hpx { namespace threads
44 {
45     ///////////////////////////////////////////////////////////////////////////
46     /// The \a thread-manager class is the central instance of management for
47     /// all (non-depleted) threads
48     class HPX_EXPORT threadmanager
49     {
50     private:
51         // we use a simple mutex to protect the data members of the
52         // thread manager for now
53         typedef compat::mutex mutex_type;
54 
55     public:
56         typedef threads::policies::callback_notifier notification_policy_type;
57         typedef std::unique_ptr<thread_pool_base> pool_type;
58         typedef threads::policies::scheduler_base scheduler_type;
59         typedef std::vector<pool_type> pool_vector;
60 
61 #ifdef HPX_HAVE_TIMER_POOL
62         threadmanager(util::io_service_pool& timer_pool,
63                 notification_policy_type& notifier);
64 #else
65         threadmanager(notification_policy_type& notifier);
66 #endif
67         ~threadmanager();
68 
69         void init();
70         void create_pools();
71 
72         //! FIXME move to private and add --hpx:printpools cmd line option
73         void print_pools(std::ostream&);
74 
75         // Get functions
76         thread_pool_base& default_pool() const;
77 
78         scheduler_type& default_scheduler() const;
79 
80         thread_pool_base& get_pool(std::string const& pool_name) const;
81         thread_pool_base& get_pool(pool_id_type pool_id) const;
82         thread_pool_base& get_pool(std::size_t thread_index) const;
83 
84         /// The function \a register_work adds a new work item to the thread
85         /// manager. It doesn't immediately create a new \a thread, it just adds
86         /// the task parameters (function, initial state and description) to
87         /// the internal management data structures. The thread itself will be
88         /// created when the number of existing threads drops below the number
89         /// of threads specified by the constructors max_count parameter.
90         ///
91         /// \param func   [in] The function or function object to execute as
92         ///               the thread's function. This must have a signature as
93         ///               defined by \a thread_function_type.
94         /// \param description [in] The value of this parameter allows to
95         ///               specify a description of the thread to create. This
96         ///               information is used for logging purposes mainly, but
97         ///               might be useful for debugging as well. This parameter
98         ///               is optional and defaults to an empty string.
99         /// \param initial_state
100         ///               [in] The value of this parameter defines the initial
101         ///               state of the newly created \a thread. This must be
102         ///               one of the values as defined by the \a thread_state
103         ///               enumeration (thread_state#pending, or \a
104         ///               thread_state#suspended, any other value will throw a
105         ///               hpx#bad_parameter exception).
106         void register_work(thread_init_data& data,
107             thread_state_enum initial_state = pending,
108             error_code& ec = throws);
109 
110         /// The function \a register_thread adds a new work item to the thread
111         /// manager. It creates a new \a thread, adds it to the internal
112         /// management data structures, and schedules the new thread, if
113         /// appropriate.
114         ///
115         /// \param func   [in] The function or function object to execute as
116         ///               the thread's function. This must have a signature as
117         ///               defined by \a thread_function_type.
118         /// \param id     [out] This parameter will hold the id of the created
119         ///               thread. This id is guaranteed to be validly
120         ///               initialized before the thread function is executed.
121         /// \param description [in] The value of this parameter allows to
122         ///               specify a description of the thread to create. This
123         ///               information is used for logging purposes mainly, but
124         ///               might be useful for debugging as well. This parameter
125         ///               is optional and defaults to an empty string.
126         /// \param initial_state
127         ///               [in] The value of this parameter defines the initial
128         ///               state of the newly created \a thread. This must be
129         ///               one of the values as defined by the \a thread_state
130         ///               enumeration (thread_state#pending, or \a
131         ///               thread_state#suspended, any other value will throw a
132         ///               hpx#bad_parameter exception).
133         /// \param run_now [in] If this parameter is \a true and the initial
134         ///               state is given as \a thread_state#pending the thread
135         ///               will be run immediately, otherwise it will be
136         ///               scheduled to run later (either this function is
137         ///               called for another thread using \a true for the
138         ///               parameter \a run_now or the function \a
139         ///               threadmanager#do_some_work is called). This parameter
140         ///               is optional and defaults to \a true.
141         void register_thread(thread_init_data& data, thread_id_type& id,
142             thread_state_enum initial_state = pending,
143             bool run_now = true, error_code& ec = throws);
144 
145         /// \brief  Run the thread manager's work queue. This function
146         ///         instantiates the specified number of OS threads in each
147         ///         pool. All OS threads are started to execute the function
148         ///         \a tfunc.
149         ///
150         /// \returns      The function returns \a true if the thread manager
151         ///               has been started successfully, otherwise it returns
152         ///               \a false.
153         bool run();
154 
155         /// \brief Forcefully stop the thread-manager
156         ///
157         /// \param blocking
158         ///
159         void stop (bool blocking = true);
160 
161         // \brief Suspend all thread pools.
162         void suspend();
163 
164         // \brief Resume all thread pools.
165         void resume();
166 
167         /// \brief Return whether the thread manager is still running
168         //! This returns the "minimal state", i.e. the state of the
169         //! least advanced thread pool
status() const170         state status() const
171         {
172             hpx::state result(last_valid_runtime_state);
173 
174             for (auto& pool_iter : pools_)
175             {
176                 hpx::state s = pool_iter->get_state();
177                 result = (std::min)(result, s);
178             }
179 
180             return result;
181         }
182 
183         /// \brief return the number of HPX-threads with the given state
184         ///
185         /// \note This function lock the internal OS lock in the thread manager
186         std::int64_t get_thread_count(thread_state_enum state = unknown,
187             thread_priority priority = thread_priority_default,
188             std::size_t num_thread = std::size_t(-1), bool reset = false);
189 
190         std::int64_t get_background_thread_count();
191 
192         // Enumerate all matching threads
193         bool enumerate_threads(
194             util::function_nonser<bool(thread_id_type)> const& f,
195             thread_state_enum state = unknown) const;
196 
197         // \brief Abort all threads which are in suspended state. This will set
198         //        the state of all suspended threads to \a pending while
199         //        supplying the wait_abort extended state flag
200         void abort_all_suspended_threads();
201 
202         // \brief Clean up terminated threads. This deletes all threads which
203         //        have been terminated but which are still held in the queue
204         //        of terminated threads. Some schedulers might not do anything
205         //        here.
206         bool cleanup_terminated(bool delete_all);
207 
208         /// \brief Return the number of OS threads running in this thread-manager
209         ///
210         /// This function will return correct results only if the thread-manager
211         /// is running.
get_os_thread_count() const212         std::size_t get_os_thread_count() const
213         {
214             std::lock_guard<mutex_type> lk(mtx_);
215             std::size_t total = 0;
216             for (auto& pool_iter : pools_)
217             {
218                 total += pool_iter->get_os_thread_count();
219             }
220             return total;
221         }
222 
get_os_thread_handle(std::size_t num_thread) const223         compat::thread& get_os_thread_handle(std::size_t num_thread) const
224         {
225             std::lock_guard<mutex_type> lk(mtx_);
226             pool_id_type id = threads_lookup_[num_thread];
227             thread_pool_base& pool = get_pool(id);
228             return pool.get_os_thread_handle(num_thread);
229         }
230 
231     public:
232         /// API functions forwarding to notification policy
233 
234         /// This notifies the thread manager that the passed exception has been
235         /// raised. The exception will be routed through the notifier and the
236         /// scheduler (which will result in it being passed to the runtime
237         /// object, which in turn will report it to the console, etc.).
report_error(std::size_t num_thread,std::exception_ptr const & e)238         void report_error(std::size_t num_thread, std::exception_ptr const& e)
239         {
240             // propagate the error reporting to all pools, which in turn
241             // will propagate to schedulers
242             for (auto& pool_iter : pools_)
243             {
244                 pool_iter->report_error(num_thread, e);
245             }
246         }
247 
248         // Return the (global) sequence number of the current thread
get_worker_thread_num(bool * =nullptr)249         std::size_t get_worker_thread_num(bool* /*numa_sensitive*/ = nullptr)
250         {
251             if (get_self_ptr() == nullptr)
252                 return std::size_t(-1);
253             return detail::thread_num_tss_.get_worker_thread_num();
254         }
255 
256     public:
257         /// The function register_counter_types() is called during startup to
258         /// allow the registration of all performance counter types for this
259         /// thread-manager instance.
260         void register_counter_types();
261 
262         /// Returns the mask identifying all processing units used by this
263         /// thread manager.
get_used_processing_units() const264         mask_type get_used_processing_units() const
265         {
266             mask_type total_used_processing_punits = mask_type();
267             threads::resize(
268                 total_used_processing_punits, hardware_concurrency());
269 
270             for (auto& pool_iter : pools_)
271             {
272                 total_used_processing_punits |=
273                     pool_iter->get_used_processing_units();
274             }
275 
276             return total_used_processing_punits;
277         }
278 
get_pool_numa_bitmap(const std::string & pool_name) const279         hwloc_bitmap_ptr get_pool_numa_bitmap(
280             const std::string &pool_name) const
281         {
282             return get_pool(pool_name).get_numa_domain_bitmap();
283         }
284 
set_scheduler_mode(threads::policies::scheduler_mode mode)285         void set_scheduler_mode(threads::policies::scheduler_mode mode)
286         {
287             for (auto& pool_iter : pools_)
288             {
289                 pool_iter->set_scheduler_mode(mode);
290             }
291         }
292 
reset_thread_distribution()293         void reset_thread_distribution()
294         {
295             for (auto& pool_iter : pools_)
296             {
297                 pool_iter->reset_thread_distribution();
298             }
299         }
300 
init_tss(std::size_t num)301         void init_tss(std::size_t num)
302         {
303             detail::thread_num_tss_.init_tss(num);
304         }
305 
deinit_tss()306         void deinit_tss()
307         {
308             detail::thread_num_tss_.deinit_tss();
309         }
310 
311     public:
312         std::size_t shrink_pool(std::string const& pool_name);
313         std::size_t expand_pool(std::string const& pool_name);
314 
315     private:
316         // counter creator functions
317         naming::gid_type thread_counts_counter_creator(
318             performance_counters::counter_info const& info, error_code& ec);
319         naming::gid_type scheduler_utilization_counter_creator(
320             performance_counters::counter_info const& info, error_code& ec);
321 
322         typedef std::int64_t (threadmanager::*threadmanager_counter_func)(
323             bool reset);
324         typedef std::int64_t (thread_pool_base::*threadpool_counter_func)(
325             std::size_t num_thread, bool reset);
326 
327         naming::gid_type locality_pool_thread_counter_creator(
328             threadmanager_counter_func total_func,
329             threadpool_counter_func pool_func,
330             performance_counters::counter_info const& info, error_code& ec);
331 
332 #ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME
333         naming::gid_type queue_wait_time_counter_creator(
334             threadmanager_counter_func total_func,
335             threadpool_counter_func pool_func,
336             performance_counters::counter_info const& info, error_code& ec);
337 #endif
338 
339         naming::gid_type locality_pool_thread_no_total_counter_creator(
340             threadpool_counter_func pool_func,
341             performance_counters::counter_info const& info, error_code& ec);
342 
343         // performance counters
344         std::int64_t get_queue_length(bool reset);
345 #ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME
346         std::int64_t get_average_thread_wait_time(bool reset);
347         std::int64_t get_average_task_wait_time(bool reset);
348 #endif
349 #if defined(HPX_HAVE_BACKGROUND_THREAD_COUNTERS) && defined(HPX_HAVE_THREAD_IDLE_RATES)
350         std::int64_t get_background_work_duration(bool reset);
351         std::int64_t get_background_overhead(bool reset);
352 #endif    //HPX_HAVE_BACKGROUND_THREAD_COUNTERS
353 
354         std::int64_t get_cumulative_duration(bool reset);
355 
get_thread_count_unknown(bool reset)356         std::int64_t get_thread_count_unknown(bool reset)
357         {
358             return get_thread_count(
359                 unknown, thread_priority_default, std::size_t(-1), reset);
360         }
get_thread_count_active(bool reset)361         std::int64_t get_thread_count_active(bool reset)
362         {
363             return get_thread_count(
364                 active, thread_priority_default, std::size_t(-1), reset);
365         }
get_thread_count_pending(bool reset)366         std::int64_t get_thread_count_pending(bool reset)
367         {
368             return get_thread_count(
369                 pending, thread_priority_default, std::size_t(-1), reset);
370         }
get_thread_count_suspended(bool reset)371         std::int64_t get_thread_count_suspended(bool reset)
372         {
373             return get_thread_count(
374                 suspended, thread_priority_default, std::size_t(-1), reset);
375         }
get_thread_count_terminated(bool reset)376         std::int64_t get_thread_count_terminated(bool reset)
377         {
378             return get_thread_count(
379                 terminated, thread_priority_default, std::size_t(-1), reset);
380         }
get_thread_count_staged(bool reset)381         std::int64_t get_thread_count_staged(bool reset)
382         {
383             return get_thread_count(
384                 staged, thread_priority_default, std::size_t(-1), reset);
385         }
386 
387 #ifdef HPX_HAVE_THREAD_IDLE_RATES
388         std::int64_t avg_idle_rate(bool reset);
389 #ifdef HPX_HAVE_THREAD_CREATION_AND_CLEANUP_RATES
390         std::int64_t avg_creation_idle_rate(bool reset);
391         std::int64_t avg_cleanup_idle_rate(bool reset);
392 #endif
393 #endif
394 
395 #ifdef HPX_HAVE_THREAD_CUMULATIVE_COUNTS
396         std::int64_t get_executed_threads(bool reset);
397         std::int64_t get_executed_thread_phases(bool reset);
398 #ifdef HPX_HAVE_THREAD_IDLE_RATES
399         std::int64_t get_thread_duration(bool reset);
400         std::int64_t get_thread_phase_duration(bool reset);
401         std::int64_t get_thread_overhead(bool reset);
402         std::int64_t get_thread_phase_overhead(bool reset);
403         std::int64_t get_cumulative_thread_duration(bool reset);
404         std::int64_t get_cumulative_thread_overhead(bool reset);
405 #endif
406 #endif
407 
408 #ifdef HPX_HAVE_THREAD_STEALING_COUNTS
409         std::int64_t get_num_pending_misses(bool reset);
410         std::int64_t get_num_pending_accesses(bool reset);
411         std::int64_t get_num_stolen_from_pending(bool reset);
412         std::int64_t get_num_stolen_from_staged(bool reset);
413         std::int64_t get_num_stolen_to_pending(bool reset);
414         std::int64_t get_num_stolen_to_staged(bool reset);
415 #endif
416 
417 private:
418         mutable mutex_type mtx_; // mutex protecting the members
419 
420         // specified by the user in command line, or all cores by default
421         // represents the total number of OS threads, irrespective of how many
422         // are in which pool.
423         std::size_t num_threads_;
424 
425         std::vector<pool_id_type> threads_lookup_;
426 
427 #ifdef HPX_HAVE_TIMER_POOL
428         util::io_service_pool& timer_pool_;     // used for timed set_state
429 #endif
430         pool_vector pools_;
431 
432         notification_policy_type& notifier_;
433     };
434 }}
435 
436 #include <hpx/config/warnings_suffix.hpp>
437 
438 #endif
439