1 //  Copyright (c)      2018 Mikael Simberg
2 //  Copyright (c) 2007-2017 Hartmut Kaiser
3 //
4 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
5 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 
7 #if !defined(HPX_RUNTIME_THREADS_DETAIL_THREAD_POOL_JUN_11_2015_1137AM)
8 #define HPX_RUNTIME_THREADS_DETAIL_THREAD_POOL_JUN_11_2015_1137AM
9 
10 #include <hpx/config.hpp>
11 #include <hpx/compat/barrier.hpp>
12 #include <hpx/compat/mutex.hpp>
13 #include <hpx/compat/thread.hpp>
14 #include <hpx/error_code.hpp>
15 #include <hpx/exception_fwd.hpp>
16 #include <hpx/lcos/future.hpp>
17 #include <hpx/lcos/local/no_mutex.hpp>
18 #include <hpx/lcos/local/spinlock.hpp>
19 #include <hpx/runtime/thread_pool_helpers.hpp>
20 #include <hpx/runtime/threads/cpu_mask.hpp>
21 #include <hpx/runtime/threads/policies/affinity_data.hpp>
22 #include <hpx/runtime/threads/policies/callback_notifier.hpp>
23 #include <hpx/runtime/threads/policies/scheduler_mode.hpp>
24 #include <hpx/runtime/threads/thread_executor.hpp>
25 #include <hpx/runtime/threads/thread_init_data.hpp>
26 #include <hpx/runtime/threads/topology.hpp>
27 #include <hpx/state.hpp>
28 #include <hpx/util/steady_clock.hpp>
29 #include <hpx/util_fwd.hpp>
30 
31 #include <cstddef>
32 #include <cstdint>
33 #include <exception>
34 #include <functional>
35 #include <iosfwd>
36 #include <memory>
37 #include <mutex>
38 #include <string>
39 #include <vector>
40 
41 #include <hpx/config/warnings_prefix.hpp>
42 
43 namespace hpx { namespace threads
44 {
45     ///////////////////////////////////////////////////////////////////////////
46     /// \cond NOINTERNAL
47     struct pool_id_type
48     {
pool_id_typehpx::threads::pool_id_type49         pool_id_type(std::size_t index, std::string const& name)
50           : index_(index), name_(name)
51         {}
52 
indexhpx::threads::pool_id_type53         std::size_t index() const { return index_; };
namehpx::threads::pool_id_type54         std::string const& name() const { return name_; }
55 
56     private:
57         std::size_t const index_;
58         std::string const name_;
59     };
60     /// \endcond
61 
62     ///////////////////////////////////////////////////////////////////////////
63     // note: this data structure has to be protected from races from the outside
64 
65     /// \brief The base class used to manage a pool of OS threads.
66     class thread_pool_base : public detail::manage_executor
67     {
68     public:
69         /// \cond NOINTERNAL
70         thread_pool_base(threads::policies::callback_notifier& notifier,
71             std::size_t index, std::string const& pool_name,
72             policies::scheduler_mode m, std::size_t thread_offset);
73 
74         virtual ~thread_pool_base() = default;
75 
76         virtual void init(std::size_t num_threads, std::size_t threads_offset);
77 
78         virtual bool run(std::unique_lock<compat::mutex>& l,
79             std::size_t num_threads) = 0;
80 
81         virtual void stop(
82             std::unique_lock<compat::mutex>& l, bool blocking = true) = 0;
83 
84         virtual void print_pool(std::ostream&) = 0;
85 
get_pool_id()86         pool_id_type get_pool_id()
87         {
88             return id_;
89         }
90         /// \endcond
91 
92         /// Resumes the thread pool. When the all OS threads on the thread pool
93         /// have been resumed the returned future will be ready.
94         ///
95         /// \note Can only be called from an HPX thread. Use resume_cb or
96         ///       resume_direct to suspend the pool from outside HPX.
97         ///
98         /// \returns A `future<void>` which is ready when the thread pool has
99         ///          been resumed.
100         ///
101         /// \throws hpx::exception if called from outside the HPX runtime.
102         virtual hpx::future<void> resume() = 0;
103 
104         /// Resumes the thread pool. Takes a callback as a parameter which will
105         /// be called when all OS threads on the thread pool have been resumed.
106         ///
107         /// \param callback [in] called when the thread pool has been resumed.
108         /// \param ec       [in,out] this represents the error status on exit, if this
109         ///                 is pre-initialized to \a hpx#throws the function will throw
110         ///                 on error instead.
111         virtual void resume_cb(
112             std::function<void(void)> callback, error_code& ec = throws) = 0;
113 
114         /// Resumes the thread pool. Blocks until all OS threads on the thread pool
115         /// have been resumed.
116         ///
117         /// \param ec [in,out] this represents the error status on exit, if this
118         ///           is pre-initialized to \a hpx#throws the function will
119         ///           throw on error instead.
120         virtual void resume_direct(error_code& ec = throws) = 0;
121 
122         /// Suspends the thread pool. When the all OS threads on the thread pool
123         /// have been suspended the returned future will be ready.
124         ///
125         /// \note Can only be called from an HPX thread. Use suspend_cb or
126         ///       suspend_direct to suspend the pool from outside HPX. A thread
127         ///       pool cannot be suspended from an HPX thread running on the
128         ///       pool itself.
129         ///
130         /// \returns A `future<void>` which is ready when the thread pool has
131         ///          been suspended.
132         ///
133         /// \throws hpx::exception if called from outside the HPX runtime.
134         virtual hpx::future<void> suspend() = 0;
135 
136         /// Suspends the thread pool. Takes a callback as a parameter which will
137         /// be called when all OS threads on the thread pool have been suspended.
138         ///
139         /// \note A thread pool cannot be suspended from an HPX thread running
140         ///       on the pool itself.
141         ///
142         /// \param callback [in] called when the thread pool has been suspended.
143         /// \param ec       [in,out] this represents the error status on exit, if this
144         ///                 is pre-initialized to \a hpx#throws the function will throw
145         ///                 on error instead.
146         ///
147         /// \throws hpx::exception if called from an HPX thread which is running
148         ///         on the pool itself.
149         virtual void suspend_cb(
150             std::function<void(void)> callback, error_code& ec = throws) = 0;
151 
152         /// Suspends the thread pool. Blocks until all OS threads on the thread pool
153         /// have been suspended.
154         ///
155         /// \note A thread pool cannot be suspended from an HPX thread running
156         ///       on the pool itself.
157         ///
158         /// \param ec [in,out] this represents the error status on exit, if this
159         ///           is pre-initialized to \a hpx#throws the function will
160         ///           throw on error instead.
161         ///
162         /// \throws hpx::exception if called from an HPX thread which is running
163         ///         on the pool itself.
164         virtual void suspend_direct(error_code& ec = throws) = 0;
165 
166     public:
167         /// \cond NOINTERNAL
168         std::size_t get_worker_thread_num() const;
169         virtual std::size_t get_os_thread_count() const = 0;
170 
171         virtual compat::thread& get_os_thread_handle(
172             std::size_t num_thread) = 0;
173 
174         virtual std::size_t get_active_os_thread_count() const;
175 
176         virtual void create_thread(thread_init_data& data, thread_id_type& id,
177             thread_state_enum initial_state, bool run_now, error_code& ec) = 0;
178         virtual void create_work(thread_init_data& data,
179             thread_state_enum initial_state, error_code& ec) = 0;
180 
181         virtual thread_state set_state(thread_id_type const& id,
182             thread_state_enum new_state, thread_state_ex_enum new_state_ex,
183             thread_priority priority, error_code& ec) = 0;
184 
185         virtual thread_id_type set_state(util::steady_time_point const& abs_time,
186             thread_id_type const& id, thread_state_enum newstate,
187             thread_state_ex_enum newstate_ex, thread_priority priority,
188             error_code& ec) = 0;
189 
get_pool_index() const190         std::size_t get_pool_index() const
191         {
192             return id_.index();
193         }
get_pool_name() const194         std::string const& get_pool_name() const
195         {
196             return id_.name();
197         }
get_thread_offset() const198         std::size_t get_thread_offset() const
199         {
200             return thread_offset_;
201         }
202 
get_scheduler() const203         virtual policies::scheduler_base* get_scheduler() const
204         {
205             return nullptr;
206         }
207 
208         mask_type get_used_processing_units() const;
209         hwloc_bitmap_ptr get_numa_domain_bitmap() const;
210 
211         // performance counters
212 #if defined(HPX_HAVE_THREAD_CUMULATIVE_COUNTS)
get_executed_threads(std::size_t,bool)213         virtual std::int64_t get_executed_threads(
214             std::size_t /*thread_num*/, bool /*reset*/) { return 0; }
get_executed_thread_phases(std::size_t,bool)215         virtual std::int64_t get_executed_thread_phases(
216             std::size_t /*thread_num*/, bool /*reset*/) { return 0; }
217 #if defined(HPX_HAVE_THREAD_IDLE_RATES)
get_thread_phase_duration(std::size_t,bool)218         virtual std::int64_t get_thread_phase_duration(
219             std::size_t /*thread_num*/, bool /*reset*/) { return 0; }
get_thread_duration(std::size_t,bool)220         virtual std::int64_t get_thread_duration(
221             std::size_t /*thread_num*/, bool /*reset*/) { return 0; }
get_thread_phase_overhead(std::size_t,bool)222         virtual std::int64_t get_thread_phase_overhead(
223             std::size_t /*thread_num*/, bool /*reset*/) { return 0; }
get_thread_overhead(std::size_t,bool)224         virtual std::int64_t get_thread_overhead(
225             std::size_t /*thread_num*/, bool /*reset*/) { return 0; }
get_cumulative_thread_duration(std::size_t,bool)226         virtual std::int64_t get_cumulative_thread_duration(
227             std::size_t /*thread_num*/, bool /*reset*/) { return 0; }
get_cumulative_thread_overhead(std::size_t,bool)228         virtual std::int64_t get_cumulative_thread_overhead(
229             std::size_t /*thread_num*/, bool /*reset*/) { return 0; }
230 #endif
231 #endif
232 
get_cumulative_duration(std::size_t,bool)233         virtual std::int64_t get_cumulative_duration(
234             std::size_t /*thread_num*/, bool /*reset*/) { return 0; }
235 
236 #if defined(HPX_HAVE_BACKGROUND_THREAD_COUNTERS) && defined(HPX_HAVE_THREAD_IDLE_RATES)
get_background_work_duration(std::size_t,bool)237         virtual std::int64_t get_background_work_duration(
238             std::size_t /*thread_num*/, bool /*reset*/) { return 0; }
get_background_overhead(std::size_t,bool)239         virtual std::int64_t get_background_overhead(
240             std::size_t /*thread_num*/, bool /*reset*/) { return 0; }
241 #endif    // HPX_HAVE_BACKGROUND_THREAD_COUNTERS
242 
243 #if defined(HPX_HAVE_THREAD_IDLE_RATES)
avg_idle_rate_all(bool)244         virtual std::int64_t avg_idle_rate_all(bool /*reset*/) { return 0; }
avg_idle_rate(std::size_t,bool)245         virtual std::int64_t avg_idle_rate(std::size_t, bool) { return 0; }
246 
247 #if defined(HPX_HAVE_THREAD_CREATION_AND_CLEANUP_RATES)
avg_creation_idle_rate(std::size_t,bool)248         virtual std::int64_t avg_creation_idle_rate(
249             std::size_t /*thread_num*/, bool /*reset*/) { return 0; }
avg_cleanup_idle_rate(std::size_t,bool)250         virtual std::int64_t avg_cleanup_idle_rate(
251             std::size_t /*thread_num*/, bool /*reset*/) { return 0; }
252 #endif
253 #endif
254 
get_queue_length(std::size_t,bool)255         virtual std::int64_t get_queue_length(std::size_t, bool) { return 0; }
256 
257 #if defined(HPX_HAVE_THREAD_QUEUE_WAITTIME)
get_average_thread_wait_time(std::size_t,bool)258         virtual std::int64_t get_average_thread_wait_time(
259             std::size_t /*thread_num*/, bool /*reset*/) { return 0; }
get_average_task_wait_time(std::size_t,bool)260         virtual std::int64_t get_average_task_wait_time(
261             std::size_t /*thread_num*/, bool /*reset*/) { return 0; }
262 #endif
263 
264 #if defined(HPX_HAVE_THREAD_STEALING_COUNTS)
get_num_pending_misses(std::size_t,bool)265         virtual std::int64_t get_num_pending_misses(
266             std::size_t /*thread_num*/, bool /*reset*/) { return 0; }
get_num_pending_accesses(std::size_t,bool)267         virtual std::int64_t get_num_pending_accesses(
268             std::size_t /*thread_num*/, bool /*reset*/) { return 0; }
269 
get_num_stolen_from_pending(std::size_t,bool)270         virtual std::int64_t get_num_stolen_from_pending(
271             std::size_t /*thread_num*/, bool /*reset*/) { return 0; }
get_num_stolen_to_pending(std::size_t,bool)272         virtual std::int64_t get_num_stolen_to_pending(
273             std::size_t /*thread_num*/, bool /*reset*/) { return 0; }
get_num_stolen_from_staged(std::size_t,bool)274         virtual std::int64_t get_num_stolen_from_staged(
275             std::size_t /*thread_num*/, bool /*reset*/) { return 0; }
get_num_stolen_to_staged(std::size_t,bool)276         virtual std::int64_t get_num_stolen_to_staged(
277             std::size_t /*thread_num*/, bool /*reset*/) { return 0; }
278 #endif
279 
get_thread_count(thread_state_enum,thread_priority,std::size_t,bool)280         virtual std::int64_t get_thread_count(thread_state_enum /*state*/,
281             thread_priority /*priority*/, std::size_t /*num_thread*/,
282             bool /*reset*/) { return 0; }
283 
get_background_thread_count()284         virtual std::int64_t get_background_thread_count() { return 0; }
285 
get_thread_count_unknown(std::size_t num_thread,bool reset)286         std::int64_t get_thread_count_unknown(
287             std::size_t num_thread, bool reset)
288         {
289             return get_thread_count(
290                 unknown, thread_priority_default, num_thread, reset);
291         }
get_thread_count_active(std::size_t num_thread,bool reset)292         std::int64_t get_thread_count_active(std::size_t num_thread, bool reset)
293         {
294             return get_thread_count(
295                 active, thread_priority_default, num_thread, reset);
296         }
get_thread_count_pending(std::size_t num_thread,bool reset)297         std::int64_t get_thread_count_pending(
298             std::size_t num_thread, bool reset)
299         {
300             return get_thread_count(
301                 pending, thread_priority_default, num_thread, reset);
302         }
get_thread_count_suspended(std::size_t num_thread,bool reset)303         std::int64_t get_thread_count_suspended(
304             std::size_t num_thread, bool reset)
305         {
306             return get_thread_count(
307                 suspended, thread_priority_default, num_thread, reset);
308         }
get_thread_count_terminated(std::size_t num_thread,bool reset)309         std::int64_t get_thread_count_terminated(
310             std::size_t num_thread, bool reset)
311         {
312             return get_thread_count(
313                 terminated, thread_priority_default, num_thread, reset);
314         }
get_thread_count_staged(std::size_t num_thread,bool reset)315         std::int64_t get_thread_count_staged(std::size_t num_thread, bool reset)
316         {
317             return get_thread_count(
318                 staged, thread_priority_default, num_thread, reset);
319         }
320 
321         virtual std::int64_t get_scheduler_utilization() const = 0;
322 
323         virtual std::int64_t get_idle_loop_count(
324             std::size_t num, bool reset) = 0;
325         virtual std::int64_t get_busy_loop_count(
326             std::size_t num, bool reset) = 0;
327 
328         ///////////////////////////////////////////////////////////////////////
enumerate_threads(util::function_nonser<bool (thread_id_type)> const &,thread_state_enum=unknown) const329         virtual bool enumerate_threads(
330             util::function_nonser<bool(thread_id_type)> const& /*f*/,
331             thread_state_enum /*state*/ = unknown) const
332         {
333             return false;
334         }
335 
reset_thread_distribution()336         virtual void reset_thread_distribution() {}
337 
set_scheduler_mode(threads::policies::scheduler_mode)338         virtual void set_scheduler_mode(threads::policies::scheduler_mode) {}
339 
340         //
abort_all_suspended_threads()341         virtual void abort_all_suspended_threads() {}
cleanup_terminated(bool)342         virtual bool cleanup_terminated(bool /*delete_all*/) { return false; }
343 
344         virtual hpx::state get_state() const = 0;
345         virtual hpx::state get_state(std::size_t num_thread) const = 0;
346 
347         virtual bool has_reached_state(hpx::state s) const = 0;
348 
do_some_work(std::size_t)349         virtual void do_some_work(std::size_t /*num_thread*/) {}
350 
report_error(std::size_t num,std::exception_ptr const & e)351         virtual void report_error(std::size_t num, std::exception_ptr const& e)
352         {
353             notifier_.on_error(num, e);
354         }
355 
356         ///////////////////////////////////////////////////////////////////////
357         // detail::manage_executor implementation
358 
359         /// \brief Return the requested policy element.
360         virtual std::size_t get_policy_element(detail::executor_parameter p,
361             error_code& ec = throws) const = 0;
362 
363         // \brief Return statistics collected by this scheduler.
364         virtual void get_statistics(executor_statistics& stats,
365             error_code& ec = throws) const = 0;
366 
367         // \brief Provide the given processing unit to the scheduler.
368         virtual void add_processing_unit(std::size_t virt_core,
369             std::size_t thread_num, error_code& ec = throws) = 0;
370 
371         // \brief Remove the given processing unit from the scheduler.
372         virtual void remove_processing_unit(std::size_t thread_num,
373             error_code& ec = throws) = 0;
374 
375         // \brief Return the description string of the underlying scheduler.
376         char const* get_description() const;
377 
378         /// \endcond
379 
380         /// Suspends the given processing unit. When the processing unit has
381         /// been suspended the returned future will be ready.
382         ///
383         /// \note Can only be called from an HPX thread. Use
384         ///       suspend_processing_unit_cb or to suspend the processing unit
385         ///       from outside HPX. Requires that the pool has
386         ///       threads::policies::enable_elasticity set.
387         ///
388         /// \param virt_core [in] The processing unit on the the pool to be
389         ///                  suspended. The processing units are indexed
390         ///                  starting from 0.
391         ///
392         /// \returns A `future<void>` which is ready when the given processing
393         ///          unit has been suspended.
394         ///
395         /// \throws hpx::exception if called from outside the HPX runtime.
396         virtual hpx::future<void> suspend_processing_unit(std::size_t virt_core) = 0;
397 
398         /// Suspends the given processing unit. Takes a callback as a parameter
399         /// which will be called when the processing unit has been suspended.
400         ///
401         /// \note Requires that the pool has
402         ///       threads::policies::enable_elasticity set.
403         ///
404         /// \param callback  [in] Callback which is called when the processing
405         ///                  unit has been suspended.
406         /// \param virt_core [in] The processing unit to suspend.
407         /// \param ec        [in,out] this represents the error status on exit, if this
408         ///                  is pre-initialized to \a hpx#throws the function will throw
409         ///                  on error instead.
410         virtual void suspend_processing_unit_cb(
411             std::function<void(void)> callback, std::size_t virt_core,
412             error_code& ec = throws) = 0;
413 
414         /// Resumes the given processing unit. When the processing unit has been
415         /// resumed the returned future will be ready.
416         ///
417         /// \note Can only be called from an HPX thread. Use
418         ///       resume_processing_unit_cb or to resume the processing unit
419         ///       from outside HPX. Requires that the pool has
420         ///       threads::policies::enable_elasticity set.
421         ///
422         /// \param virt_core [in] The processing unit on the the pool to be
423         ///                  resumed. The processing units are indexed starting
424         ///                  from 0.
425         ///
426         /// \returns A `future<void>` which is ready when the given processing
427         ///          unit has been resumed.
428         virtual hpx::future<void> resume_processing_unit(std::size_t virt_core) = 0;
429 
430         /// Resumes the given processing unit. Takes a callback as a parameter
431         /// which will be called when the processing unit has been resumed.
432         ///
433         /// \note Requires that the pool has
434         ///       threads::policies::enable_elasticity set.
435         ///
436         /// \param callback  [in] Callback which is called when the processing
437         ///                  unit has been suspended.
438         /// \param virt_core [in] The processing unit to resume.
439         /// \param ec        [in,out] this represents the error status on exit, if this
440         ///                  is pre-initialized to \a hpx#throws the function will throw
441         ///                  on error instead.
442         virtual void resume_processing_unit_cb(
443             std::function<void(void)> callback, std::size_t virt_core,
444                 error_code& ec = throws) = 0;
445 
446         /// \cond NOINTERNAL
get_scheduler_mode() const447         policies::scheduler_mode get_scheduler_mode() const
448         {
449             return mode_;
450         }
451         /// \endcond
452 
453     protected:
454         /// \cond NOINTERNAL
455         void init_pool_time_scale();
456         /// \endcond
457 
458     protected:
459         /// \cond NOINTERNAL
460         pool_id_type id_;
461 
462         // Mode of operation of the pool
463         policies::scheduler_mode mode_;
464 
465         // The thread_offset is equal to the accumulated number of
466         // threads in all pools preceding this pool
467         // in the thread indexation. That means, that in order to know
468         // the global index of a thread it owns, the pool has to compute:
469         // global index = thread_offset_ + local index.
470         std::size_t thread_offset_;
471 
472         // scale timestamps to nanoseconds
473         double timestamp_scale_;
474 
475         // callback functions to invoke at start, stop, and error
476         threads::policies::callback_notifier& notifier_;
477         /// \endcond
478     };
479 }}
480 
481 #include <hpx/config/warnings_suffix.hpp>
482 
483 #endif
484