1 // Copyright (c) 2007-2018 Hartmut Kaiser 2 // 3 // Distributed under the Boost Software License, Version 1.0. (See accompanying 4 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 5 6 /// \file binpacking_distribution_policy.hpp 7 8 #if !defined(HPX_COMPONENTS_BINPACKING_DISTRIBUTION_POLICY_APR_10_2015_0344PM) 9 #define HPX_COMPONENTS_BINPACKING_DISTRIBUTION_POLICY_APR_10_2015_0344PM 10 11 #include <hpx/config.hpp> 12 #include <hpx/dataflow.hpp> 13 #include <hpx/lcos/future.hpp> 14 #include <hpx/performance_counters/performance_counter.hpp> 15 #include <hpx/runtime/components/component_type.hpp> 16 #include <hpx/runtime/components/stubs/stub_base.hpp> 17 #include <hpx/runtime/find_here.hpp> 18 #include <hpx/runtime/naming/id_type.hpp> 19 #include <hpx/runtime/naming/name.hpp> 20 #include <hpx/runtime/serialization/serialization_fwd.hpp> 21 #include <hpx/runtime/serialization/string.hpp> 22 #include <hpx/runtime/serialization/vector.hpp> 23 #include <hpx/traits/is_distribution_policy.hpp> 24 #include <hpx/util/assert.hpp> 25 #include <hpx/util/bind_back.hpp> 26 #include <hpx/util/unwrap.hpp> 27 28 #include <algorithm> 29 #include <cstddef> 30 #include <cstdint> 31 #include <iterator> 32 #include <string> 33 #include <type_traits> 34 #include <utility> 35 #include <vector> 36 37 namespace hpx { namespace components 38 { 39 static char const* const default_binpacking_counter_name = 40 "/runtime{locality/total}/count/component@"; 41 42 namespace detail 43 { 44 /// \cond NOINTERNAL 45 HPX_EXPORT std::vector<std::size_t> 46 get_items_count(std::size_t count, std::vector<std::uint64_t> const& values); 47 48 HPX_EXPORT hpx::future<std::vector<std::uint64_t> > 49 retrieve_counter_values( 50 std::vector<performance_counters::performance_counter> && counters); 51 52 HPX_EXPORT hpx::future<std::vector<std::uint64_t> > get_counter_values( 53 std::string component_name, std::string const& counter_name, 54 std::vector<hpx::id_type> const& localities); 55 56 HPX_EXPORT hpx::id_type const& get_best_locality( 57 hpx::future<std::vector<std::uint64_t> > && f, 58 std::vector<hpx::id_type> const& localities); 59 60 template <typename Component> 61 struct create_helper 62 { create_helperhpx::components::detail::create_helper63 create_helper(std::vector<hpx::id_type> const& localities) 64 : localities_(localities) 65 {} 66 67 template <typename ...Ts> operator ()hpx::components::detail::create_helper68 hpx::future<hpx::id_type> operator()( 69 hpx::future<std::vector<std::uint64_t> > && values, 70 Ts&&... vs) const 71 { 72 hpx::id_type const& best_locality = 73 get_best_locality(std::move(values), localities_); 74 75 return stub_base<Component>::create_async( 76 best_locality, std::forward<Ts>(vs)...); 77 } 78 79 std::vector<hpx::id_type> const& localities_; 80 }; 81 82 template <typename Component> 83 struct create_bulk_helper 84 { 85 typedef std::pair<hpx::id_type, std::vector<hpx::id_type> > 86 bulk_locality_result; 87 create_bulk_helperhpx::components::detail::create_bulk_helper88 create_bulk_helper(std::vector<hpx::id_type> const& localities) 89 : localities_(localities) 90 {} 91 92 template <typename ...Ts> 93 hpx::future<std::vector<bulk_locality_result> > operator ()hpx::components::detail::create_bulk_helper94 operator()( 95 hpx::future<std::vector<std::uint64_t> > && values, 96 std::size_t count, Ts&&... vs) const 97 { 98 std::vector<std::size_t> to_create = 99 detail::get_items_count(count, values.get()); 100 101 std::vector<hpx::future<std::vector<hpx::id_type> > > objs; 102 objs.reserve(localities_.size()); 103 104 for (std::size_t i = 0; i != to_create.size(); ++i) 105 { 106 objs.push_back(stub_base<Component>::bulk_create_async( 107 localities_[i], to_create[i], vs...)); 108 } 109 110 // consolidate all results 111 return hpx::dataflow(hpx::launch::sync, 112 [=](std::vector<hpx::future<std::vector<hpx::id_type> > > && v) 113 mutable -> std::vector<bulk_locality_result> 114 { 115 HPX_ASSERT(localities_.size() == v.size()); 116 117 std::vector<bulk_locality_result> result; 118 result.reserve(v.size()); 119 120 for (std::size_t i = 0; i != v.size(); ++i) 121 { 122 result.emplace_back( 123 std::move(localities_[i]), v[i].get() 124 ); 125 } 126 return result; 127 }, 128 std::move(objs)); 129 } 130 131 std::vector<hpx::id_type> const& localities_; 132 }; 133 /// \endcond 134 } 135 136 /// This class specifies the parameters for a binpacking distribution policy 137 /// to use for creating a given number of items on a given set of localities. 138 /// The binpacking policy will distribute the new objects in a way such that 139 /// each of the localities will equalize the number of overall objects of 140 /// this type based on a given criteria (by default this criteria is the 141 /// overall number of objects of this type). 142 struct binpacking_distribution_policy 143 { 144 public: 145 /// Default-construct a new instance of a \a binpacking_distribution_policy. 146 /// This policy will represent one locality (the local locality). binpacking_distribution_policyhpx::components::binpacking_distribution_policy147 binpacking_distribution_policy() 148 : counter_name_(default_binpacking_counter_name) 149 {} 150 151 /// Create a new \a default_distribution policy representing the given 152 /// set of localities. 153 /// 154 /// \param locs [in] The list of localities the new instance should 155 /// represent 156 /// \param perf_counter_name [in] The name of the performance counter which 157 /// should be used as the distribution criteria 158 /// (by default the overall number of existing 159 /// instances of the given component type will be 160 /// used). 161 /// operator ()hpx::components::binpacking_distribution_policy162 binpacking_distribution_policy operator()( 163 std::vector<id_type> const& locs, 164 char const* perf_counter_name = default_binpacking_counter_name) const 165 { 166 #if defined(HPX_DEBUG) 167 for (id_type const& loc: locs) 168 { 169 HPX_ASSERT(naming::is_locality(loc)); 170 } 171 #endif 172 return binpacking_distribution_policy(locs, perf_counter_name); 173 } 174 175 /// Create a new \a default_distribution policy representing the given 176 /// set of localities. 177 /// 178 /// \param locs [in] The list of localities the new instance should 179 /// represent 180 /// \param perf_counter_name [in] The name of the performance counter which 181 /// should be used as the distribution criteria 182 /// (by default the overall number of existing 183 /// instances of the given component type will be 184 /// used). 185 /// operator ()hpx::components::binpacking_distribution_policy186 binpacking_distribution_policy operator()( 187 std::vector<id_type> && locs, 188 char const* perf_counter_name = default_binpacking_counter_name) const 189 { 190 #if defined(HPX_DEBUG) 191 for (id_type const& loc: locs) 192 { 193 HPX_ASSERT(naming::is_locality(loc)); 194 } 195 #endif 196 return binpacking_distribution_policy(std::move(locs), 197 perf_counter_name); 198 } 199 200 /// Create a new \a default_distribution policy representing the given 201 /// locality 202 /// 203 /// \param loc [in] The locality the new instance should 204 /// represent 205 /// \param perf_counter_name [in] The name of the performance counter which 206 /// should be used as the distribution criteria 207 /// (by default the overall number of existing 208 /// instances of the given component type will be 209 /// used). 210 /// operator ()hpx::components::binpacking_distribution_policy211 binpacking_distribution_policy operator()(id_type const& loc, 212 char const* perf_counter_name = default_binpacking_counter_name) const 213 { 214 HPX_ASSERT(naming::is_locality(loc)); 215 return binpacking_distribution_policy(loc, perf_counter_name); 216 } 217 218 /// Create one object on one of the localities associated by 219 /// this policy instance 220 /// 221 /// \param vs [in] The arguments which will be forwarded to the 222 /// constructor of the new object. 223 /// 224 /// \returns A future holding the global address which represents 225 /// the newly created object 226 /// 227 template <typename Component, typename ...Ts> createhpx::components::binpacking_distribution_policy228 hpx::future<hpx::id_type> create(Ts&&... vs) const 229 { 230 using components::stub_base; 231 232 // handle special cases 233 if (localities_.size() == 0) 234 { 235 return stub_base<Component>::create_async( 236 hpx::find_here(), std::forward<Ts>(vs)...); 237 } 238 else if (localities_.size() == 1) 239 { 240 return stub_base<Component>::create_async( 241 localities_.front(), std::forward<Ts>(vs)...); 242 } 243 244 // schedule creation of all objects across given localities 245 hpx::future<std::vector<std::uint64_t> > values = 246 detail::get_counter_values( 247 get_component_name<Component>(), 248 counter_name_, localities_); 249 250 return values.then(hpx::util::bind_back( 251 detail::create_helper<Component>(localities_), 252 std::forward<Ts>(vs)...)); 253 } 254 255 /// \cond NOINTERNAL 256 typedef std::pair<hpx::id_type, std::vector<hpx::id_type> > 257 bulk_locality_result; 258 /// \endcond 259 260 /// Create multiple objects on the localities associated by 261 /// this policy instance 262 /// 263 /// \param count [in] The number of objects to create 264 /// \param vs [in] The arguments which will be forwarded to the 265 /// constructors of the new objects. 266 /// 267 /// \returns A future holding the list of global addresses which 268 /// represent the newly created objects 269 /// 270 template <typename Component, typename ...Ts> 271 hpx::future<std::vector<bulk_locality_result> > bulk_createhpx::components::binpacking_distribution_policy272 bulk_create(std::size_t count, Ts&&... vs) const 273 { 274 using components::stub_base; 275 276 if (localities_.size() > 1) 277 { 278 // schedule creation of all objects across given localities 279 hpx::future<std::vector<std::uint64_t> > values = 280 detail::get_counter_values( 281 get_component_name<Component>(), 282 counter_name_, localities_); 283 284 return values.then( 285 hpx::util::bind_back( 286 detail::create_bulk_helper<Component>(localities_), 287 count, std::forward<Ts>(vs)...)); 288 } 289 290 // handle special cases 291 hpx::id_type id = 292 localities_.empty() ? hpx::find_here() : localities_.front(); 293 294 hpx::future<std::vector<hpx::id_type> > f = 295 stub_base<Component>::bulk_create_async( 296 id, count, std::forward<Ts>(vs)...); 297 298 return f.then(hpx::launch::sync, 299 [HPX_CAPTURE_MOVE(id)]( 300 hpx::future<std::vector<hpx::id_type> > && f 301 ) -> std::vector<bulk_locality_result> 302 { 303 std::vector<bulk_locality_result> result; 304 result.emplace_back(id, f.get()); 305 return result; 306 }); 307 } 308 309 /// Returns the name of the performance counter associated with this 310 /// policy instance. get_counter_namehpx::components::binpacking_distribution_policy311 std::string const& get_counter_name() const 312 { 313 return counter_name_; 314 } 315 316 /// Returns the number of associated localities for this distribution 317 /// policy 318 /// 319 /// \note This function is part of the creation policy implemented by 320 /// this class 321 /// get_num_localitieshpx::components::binpacking_distribution_policy322 std::size_t get_num_localities() const 323 { 324 return localities_.size(); 325 } 326 327 protected: 328 /// \cond NOINTERNAL binpacking_distribution_policyhpx::components::binpacking_distribution_policy329 binpacking_distribution_policy(std::vector<id_type> const& localities, 330 char const* perf_counter_name) 331 : localities_(localities), 332 counter_name_(perf_counter_name) 333 {} 334 binpacking_distribution_policyhpx::components::binpacking_distribution_policy335 binpacking_distribution_policy(std::vector<id_type> && localities, 336 char const* perf_counter_name) 337 : localities_(std::move(localities)), 338 counter_name_(perf_counter_name) 339 {} 340 binpacking_distribution_policyhpx::components::binpacking_distribution_policy341 binpacking_distribution_policy(id_type const& locality, 342 char const* perf_counter_name) 343 : counter_name_(perf_counter_name) 344 { 345 localities_.push_back(locality); 346 } 347 348 friend class hpx::serialization::access; 349 350 template <typename Archive> serializehpx::components::binpacking_distribution_policy351 void serialize(Archive& ar, unsigned int const) 352 { 353 ar & counter_name_ & localities_; 354 } 355 356 std::vector<id_type> localities_; // localities to create things on 357 std::string counter_name_; // name of counter to use as criteria 358 /// \endcond 359 }; 360 361 /// A predefined instance of the binpacking \a distribution_policy. It will 362 /// represent the local locality and will place all items to create here. 363 static binpacking_distribution_policy const binpacked; 364 }} 365 366 /// \cond NOINTERNAL 367 namespace hpx 368 { 369 using hpx::components::binpacking_distribution_policy; 370 using hpx::components::binpacked; 371 372 namespace traits 373 { 374 template <> 375 struct is_distribution_policy<components::binpacking_distribution_policy> 376 : std::true_type 377 {}; 378 } 379 } 380 /// \endcond 381 382 #endif 383