1 //  Copyright (c) 2007-2016 Hartmut Kaiser
2 //  Copyright (c) 2015 Nidhi Makhijani
3 //
4 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
5 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 
7 #if !defined(HPX_RUNTIME_THREADS_RESOURCE_MANAGER_JAN_16_2013_0830AM)
8 #define HPX_RUNTIME_THREADS_RESOURCE_MANAGER_JAN_16_2013_0830AM
9 
10 #include <hpx/config.hpp>
11 #include <hpx/lcos/local/spinlock.hpp>
12 #include <hpx/runtime/threads/thread_executor.hpp>
13 #include <hpx/runtime/threads/topology.hpp>
14 
15 #include <atomic>
16 #include <cstddef>
17 #include <map>
18 #include <memory>
19 #include <utility>
20 #include <vector>
21 #include <utility>
22 
23 #include <hpx/config/warnings_prefix.hpp>
24 
25 namespace hpx { namespace  threads
26 {
27     ///////////////////////////////////////////////////////////////////////////
28     /// Status of a given processing unit
29     enum class punit_status
30     {
31         unassigned = 0,
32         reserved = 1,
33         assigned = 2
34     };
35 
36     /// Data describing the current allocation for a single executor known
37     /// To the resource manager
38     struct resource_allocation
39     {
40         // mapping of physical core to virtual core
41         typedef std::pair<std::size_t, std::size_t>  coreids_type;
42 
resource_allocationhpx::threads::resource_allocation43         resource_allocation()
44           : description_("")
45         {}
46 
resource_allocationhpx::threads::resource_allocation47         resource_allocation(char const* desc,
48                 std::vector<coreids_type> const& core_ids)
49           : description_(desc),
50             core_ids_(core_ids)
51         {}
52 
53         char const*description_;                ///< name of the executor
54         std::vector<coreids_type> core_ids_;    ///< mapping of cores
55     };
56 
57     /// In short, there are two main responsibilities of the Resource Manager:
58     ///
59     /// * Initial Allocation: Allocating resources to executors when executors
60     ///   are created.
61     /// * Dynamic Migration: Constantly monitoring utilization of resources
62     ///   by executors, and dynamically migrating resources between them
63     ///   (not implemented yet).
64     ///
65     class resource_manager
66     {
67         typedef lcos::local::spinlock mutex_type;
68         struct tag {};
69 
70         // mapping of physical core to virtual core
71         typedef typename resource_allocation::coreids_type coreids_type;
72 
73     public:
74         resource_manager();
75 
76         // Request an initial resource allocation
77         std::size_t initial_allocation(detail::manage_executor* proxy,
78                 error_code& ec = throws);
79 
80         // Stop the executor identified by the given cookie
81         void stop_executor(std::size_t cookie, error_code& ec = throws);
82 
83         // Detach the executor identified by the given cookie
84         void detach(std::size_t cookie, error_code& ec = throws);
85 
86         // Return the singleton resource manager instance
87         static resource_manager& get();
88 
89         // Return the current schedulers and the corresponding allocation data
90         std::vector<resource_allocation> get_resource_allocation() const;
91 
92     protected:
93         // allocate virtual cores
94         // called by initial_allocation
95         std::vector<coreids_type> allocate_virt_cores(
96                 detail::manage_executor* proxy, std::size_t min_punits,
97                 std::size_t max_punits, error_code& ec);
98 
99         // reserve virtual cores for scheduler
100         std::size_t reserve_processing_units(
101                 std::size_t use_count, std::size_t desired,
102                 std::vector<punit_status>& available_punits);
103 
104         // reserve virtual cores for scheduler at higher use count
105         std::size_t reserve_at_higher_use_count(
106                 std::size_t desired,
107                 std::vector<punit_status>& available_punits);
108 
109     private:
110         mutable mutex_type mtx_;
111         std::atomic<std::size_t> next_cookie_;
112 
113         ///////////////////////////////////////////////////////////////////////
114         // Store information about the physical processing units available to
115         // this resource manager.
116         struct punit_data
117         {
punit_datahpx::threads::resource_manager::punit_data118             punit_data() : use_count_(0) {}
119 
120             std::size_t use_count_;   // number of schedulers using this core
121         };
122 
123         typedef std::vector<punit_data> punit_array_type;
124         punit_array_type punits_;
125 
126         threads::topology const& topology_;
127 
128         ///////////////////////////////////////////////////////////////////////
129         // Store information about the virtual processing unit allocation for
130         // each of the scheduler proxies attached to this resource manager.
131         struct proxy_data
132         {
133         public:
proxy_datahpx::threads::resource_manager::proxy_data134             proxy_data(detail::manage_executor* proxy,
135                     std::vector<coreids_type> && core_ids)
136               : proxy_(proxy), core_ids_(std::move(core_ids))
137             {}
138 
proxy_datahpx::threads::resource_manager::proxy_data139             proxy_data(proxy_data const& rhs)
140               : proxy_(rhs.proxy_),
141                 core_ids_(rhs.core_ids_)
142             {}
143 
proxy_datahpx::threads::resource_manager::proxy_data144             proxy_data(proxy_data && rhs)
145               : proxy_(std::move(rhs.proxy_)),
146                 core_ids_(std::move(rhs.core_ids_))
147             {}
148 
operator =hpx::threads::resource_manager::proxy_data149             proxy_data& operator=(proxy_data const& rhs)
150             {
151                 if (this != &rhs) {
152                     proxy_ = rhs.proxy_;
153                     core_ids_ = rhs.core_ids_;
154                 }
155                 return *this;
156             }
157 
operator =hpx::threads::resource_manager::proxy_data158             proxy_data& operator=(proxy_data && rhs)
159             {
160                 if (this != &rhs) {
161                     proxy_ = std::move(rhs.proxy_);
162                     core_ids_ = std::move(rhs.core_ids_);
163                 }
164                 return *this;
165             }
166 
167             // hold on to proxy
168             std::shared_ptr<detail::manage_executor> proxy_;
169 
170             // map physical to logical processing unit ids
171             std::vector<coreids_type> core_ids_;
172         };
173 
174         typedef std::map<std::size_t, proxy_data> proxies_map_type;
175         proxies_map_type proxies_;
176 
177         ///////////////////////////////////////////////////////////////////////
178         // Used to store information during static and dynamic allocation.
179         struct static_allocation_data
180         {
static_allocation_datahpx::threads::resource_manager::static_allocation_data181             static_allocation_data()
182               : allocation_(0),
183                 scaled_allocation_(0.0),
184                 num_borrowed_cores_(0),
185                 num_owned_cores_(0),
186                 min_proxy_cores_(0),
187                 max_proxy_cores_(0),
188                 adjusted_desired_(0),
189                 num_cores_stolen_(0)
190             {}
191 
192             // The scheduler proxy this allocation data is for.
193             std::shared_ptr<detail::manage_executor> proxy_;  // hold on to proxy
194 
195             // Additional allocation to give to a scheduler after proportional
196             // allocation decisions are made.
197             std::size_t allocation_;
198 
199             // Scaled allocation value during proportional allocation.
200             double scaled_allocation_;
201 
202             std::size_t num_borrowed_cores_; // borrowed cores held by scheduler
203             std::size_t num_owned_cores_;    // owned cores held by scheduler
204             std::size_t min_proxy_cores_;    // min cores required by scheduler
205             std::size_t max_proxy_cores_;    // max cores required by scheduler
206 
207             // A field used during static allocation to decide on an allocation
208             // proportional to each scheduler's desired value.
209             double adjusted_desired_;
210 
211             // Keeps track of stolen cores during static allocation.
212             std::size_t num_cores_stolen_;
213         };
214 
215         typedef std::map<std::size_t, static_allocation_data>
216             allocation_data_map_type;
217         allocation_data_map_type proxies_static_allocation_data_;
218 
219         // stores static allocation data for all schedulers
220         std::size_t preprocess_static_allocation(std::size_t min_punits,
221             std::size_t max_punits);
222 
223         // constants used for parameters to the release_scheduler API
224         static std::size_t const release_borrowed_cores = std::size_t(-1);
225         static std::size_t const release_cores_to_min = std::size_t(-2);
226 
227         // release cores from scheduler
228         bool release_scheduler_resources(
229             allocation_data_map_type::iterator it,
230             std::size_t number_to_free,
231             std::vector<punit_status>& available_punits);
232 
233         // release cores from all schedulers
234         // calls release_scheduler_resources
235         std::size_t release_cores_on_existing_schedulers(
236             std::size_t request, std::size_t number_to_free,
237             std::vector<punit_status>& available_punits,
238             std::size_t new_allocation);
239 
240         // distribute cores to schedulers proportional to max_punits of
241         // the schedulers
242         std::size_t redistribute_cores_among_all(std::size_t reserved,
243             std::size_t min_punits, std::size_t max_punits,
244             std::vector<punit_status>& available_punits,
245             std::size_t new_allocation);
246 
247         void roundup_scaled_allocations(
248             allocation_data_map_type &scaled_static_allocation_data,
249             std::size_t total_allocated);
250     };
251 
252     // Return the current schedulers and the corresponding allocation data
253     HPX_EXPORT std::vector<resource_allocation> get_resource_allocation();
254 }}
255 
256 #include <hpx/config/warnings_suffix.hpp>
257 
258 #endif
259