1 // Copyright (c) 2014-2018 Hartmut Kaiser 2 // 3 // Distributed under the Boost Software License, Version 1.0. (See accompanying 4 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 5 6 /// \file default_distribution_policy.hpp 7 8 #if !defined(HPX_COMPONENTS_DISTRIBUTION_POLICY_APR_07_2015_1246PM) 9 #define HPX_COMPONENTS_DISTRIBUTION_POLICY_APR_07_2015_1246PM 10 11 #include <hpx/config.hpp> 12 #include <hpx/lcos/dataflow.hpp> 13 #include <hpx/lcos/future.hpp> 14 #include <hpx/lcos/packaged_action.hpp> 15 #include <hpx/runtime/actions/action_support.hpp> 16 #include <hpx/runtime/applier/apply.hpp> 17 #include <hpx/runtime/components/stubs/stub_base.hpp> 18 #include <hpx/runtime/launch_policy.hpp> 19 #include <hpx/runtime/find_here.hpp> 20 #include <hpx/runtime/naming/id_type.hpp> 21 #include <hpx/runtime/naming/name.hpp> 22 #include <hpx/runtime/serialization/serialization_fwd.hpp> 23 #include <hpx/runtime/serialization/vector.hpp> 24 #include <hpx/runtime/serialization/shared_ptr.hpp> 25 #include <hpx/traits/extract_action.hpp> 26 #include <hpx/traits/is_distribution_policy.hpp> 27 #include <hpx/traits/promise_local_result.hpp> 28 #include <hpx/util/assert.hpp> 29 30 #include <algorithm> 31 #include <cstddef> 32 #include <memory> 33 #include <type_traits> 34 #include <utility> 35 #include <vector> 36 37 namespace hpx { namespace components 38 { 39 /////////////////////////////////////////////////////////////////////////// 40 /// \cond NOINTERNAL 41 namespace detail 42 { 43 HPX_FORCEINLINE std::size_t round_to_multiple(std::size_t n1,std::size_t n2,std::size_t n3)44 round_to_multiple(std::size_t n1, std::size_t n2, std::size_t n3) 45 { 46 return (n1 / n2) * n3; 47 } 48 } 49 /// \endcond 50 51 /// This class specifies the parameters for a simple distribution policy 52 /// to use for creating (and evenly distributing) a given number of items 53 /// on a given set of localities. 54 struct default_distribution_policy 55 { 56 public: 57 /// Default-construct a new instance of a \a default_distribution_policy. 58 /// This policy will represent one locality (the local locality). 59 default_distribution_policy() = default; 60 61 /// Create a new \a default_distribution policy representing the given 62 /// set of localities. 63 /// 64 /// \param locs [in] The list of localities the new instance should 65 /// represent operator ()hpx::components::default_distribution_policy66 default_distribution_policy operator()( 67 std::vector<id_type> const& locs) const 68 { 69 return default_distribution_policy(locs); 70 } 71 72 /// Create a new \a default_distribution policy representing the given 73 /// set of localities. 74 /// 75 /// \param locs [in] The list of localities the new instance should 76 /// represent operator ()hpx::components::default_distribution_policy77 default_distribution_policy operator()( 78 std::vector<id_type>&& locs) const 79 { 80 return default_distribution_policy(std::move(locs)); 81 } 82 83 /// Create a new \a default_distribution policy representing the given 84 /// locality 85 /// 86 /// \param loc [in] The locality the new instance should 87 /// represent operator ()hpx::components::default_distribution_policy88 default_distribution_policy operator()(id_type const& loc) const 89 { 90 return default_distribution_policy(loc); 91 } 92 93 /// Create one object on one of the localities associated by 94 /// this policy instance 95 /// 96 /// \param vs [in] The arguments which will be forwarded to the 97 /// constructor of the new object. 98 /// 99 /// \note This function is part of the placement policy implemented by 100 /// this class 101 /// 102 /// \returns A future holding the global address which represents 103 /// the newly created object 104 /// 105 template <typename Component, typename ...Ts> createhpx::components::default_distribution_policy106 hpx::future<hpx::id_type> create(Ts&&... vs) const 107 { 108 using components::stub_base; 109 110 if (localities_) 111 { 112 for (hpx::id_type const& loc: *localities_) 113 { 114 if (get_num_items(1, loc) != 0) 115 { 116 return stub_base<Component>::create_async( 117 loc, std::forward<Ts>(vs)...); 118 } 119 } 120 } 121 122 // by default the object will be created on the current 123 // locality 124 return stub_base<Component>::create_async( 125 hpx::find_here(), std::forward<Ts>(vs)...); 126 } 127 128 /// \cond NOINTERNAL 129 typedef std::pair<hpx::id_type, std::vector<hpx::id_type> > 130 bulk_locality_result; 131 /// \endcond 132 133 /// Create multiple objects on the localities associated by 134 /// this policy instance 135 /// 136 /// \param count [in] The number of objects to create 137 /// \param vs [in] The arguments which will be forwarded to the 138 /// constructors of the new objects. 139 /// 140 /// \note This function is part of the placement policy implemented by 141 /// this class 142 /// 143 /// \returns A future holding the list of global addresses which 144 /// represent the newly created objects 145 /// 146 template <typename Component, typename ...Ts> 147 hpx::future<std::vector<bulk_locality_result> > bulk_createhpx::components::default_distribution_policy148 bulk_create(std::size_t count, Ts&&... vs) const 149 { 150 using components::stub_base; 151 152 if (localities_ && localities_->size() > 1) 153 { 154 // schedule creation of all objects across given localities 155 std::vector<hpx::future<std::vector<hpx::id_type> > > objs; 156 objs.reserve(localities_->size()); 157 for (hpx::id_type const& loc: *localities_) 158 { 159 objs.push_back(stub_base<Component>::bulk_create_async( 160 loc, get_num_items(count, loc), vs...)); 161 } 162 163 // consolidate all results 164 auto localities = localities_; 165 return hpx::dataflow(hpx::launch::sync, 166 [localities]( 167 std::vector<hpx::future<std::vector<hpx::id_type> > > && v 168 ) mutable -> std::vector<bulk_locality_result> 169 { 170 HPX_ASSERT(localities->size() == v.size()); 171 172 std::vector<bulk_locality_result> result; 173 result.reserve(v.size()); 174 175 for (std::size_t i = 0; i != v.size(); ++i) 176 { 177 result.emplace_back((*localities)[i], v[i].get()); 178 } 179 return result; 180 }, 181 std::move(objs)); 182 } 183 184 // handle special cases 185 hpx::id_type id = get_next_target(); 186 187 hpx::future<std::vector<hpx::id_type> > f = 188 stub_base<Component>::bulk_create_async( 189 id, count, std::forward<Ts>(vs)...); 190 191 return f.then(hpx::launch::sync, 192 [HPX_CAPTURE_MOVE(id)]( 193 hpx::future<std::vector<hpx::id_type> > && f 194 ) -> std::vector<bulk_locality_result> 195 { 196 std::vector<bulk_locality_result> result; 197 result.emplace_back(id, f.get()); 198 return result; 199 }); 200 } 201 202 /// \note This function is part of the invocation policy implemented by 203 /// this class 204 /// 205 template <typename Action> 206 struct async_result 207 { 208 using type = hpx::future<typename traits::promise_local_result< 209 typename hpx::traits::extract_action<Action>::remote_result_type 210 >::type>; 211 }; 212 213 template <typename Action, typename ...Ts> 214 typename async_result<Action>::type asynchpx::components::default_distribution_policy215 async(launch policy, Ts&&... vs) const 216 { 217 return hpx::detail::async_impl<Action>(policy, 218 get_next_target(), std::forward<Ts>(vs)...); 219 } 220 221 /// \note This function is part of the invocation policy implemented by 222 /// this class 223 /// 224 template <typename Action, typename Callback, typename ...Ts> 225 typename async_result<Action>::type async_cbhpx::components::default_distribution_policy226 async_cb(launch policy, Callback&& cb, Ts&&... vs) const 227 { 228 return hpx::detail::async_cb_impl<Action>(policy, 229 get_next_target(), std::forward<Callback>(cb), 230 std::forward<Ts>(vs)...); 231 } 232 233 /// \note This function is part of the invocation policy implemented by 234 /// this class 235 /// 236 template <typename Action, typename Continuation, typename ...Ts> applyhpx::components::default_distribution_policy237 bool apply(Continuation && c, 238 threads::thread_priority priority, Ts&&... vs) const 239 { 240 return hpx::detail::apply_impl<Action>(std::forward<Continuation>(c), 241 get_next_target(), priority, std::forward<Ts>(vs)...); 242 } 243 244 template <typename Action, typename ...Ts> applyhpx::components::default_distribution_policy245 bool apply( 246 threads::thread_priority priority, Ts&&... vs) const 247 { 248 return hpx::detail::apply_impl<Action>( 249 get_next_target(), priority, std::forward<Ts>(vs)...); 250 } 251 252 /// \note This function is part of the invocation policy implemented by 253 /// this class 254 /// 255 template <typename Action, typename Continuation, typename Callback, 256 typename ...Ts> apply_cbhpx::components::default_distribution_policy257 bool apply_cb(Continuation && c, 258 threads::thread_priority priority, Callback&& cb, Ts&&... vs) const 259 { 260 return hpx::detail::apply_cb_impl<Action>(std::forward<Continuation>(c), 261 get_next_target(), priority, std::forward<Callback>(cb), 262 std::forward<Ts>(vs)...); 263 } 264 265 template <typename Action, typename Callback, typename ...Ts> apply_cbhpx::components::default_distribution_policy266 bool apply_cb( 267 threads::thread_priority priority, Callback&& cb, Ts&&... vs) const 268 { 269 return hpx::detail::apply_cb_impl<Action>( 270 get_next_target(), priority, std::forward<Callback>(cb), 271 std::forward<Ts>(vs)...); 272 } 273 274 /// Returns the number of associated localities for this distribution 275 /// policy 276 /// 277 /// \note This function is part of the creation policy implemented by 278 /// this class 279 /// get_num_localitieshpx::components::default_distribution_policy280 std::size_t get_num_localities() const 281 { 282 return !localities_ ? std::size_t(1) : localities_->size(); 283 } 284 285 /// Returns the locality which is anticipated to be used for the next 286 /// async operation get_next_targethpx::components::default_distribution_policy287 hpx::id_type get_next_target() const 288 { 289 return !localities_ ? hpx::find_here() : localities_->front(); 290 } 291 292 protected: 293 /// \cond NOINTERNAL get_num_itemshpx::components::default_distribution_policy294 std::size_t get_num_items( 295 std::size_t items, hpx::id_type const& loc) const 296 { 297 // make sure the given id is known to this distribution policy 298 HPX_ASSERT( 299 localities_ && 300 std::find(localities_->begin(), localities_->end(), loc) != 301 localities_->end() 302 ); 303 304 // this distribution policy places an equal number of items onto 305 // each locality 306 std::size_t locs = localities_->size(); 307 308 // the overall number of items to create is smaller than the number 309 // of localities 310 if (items < locs) 311 { 312 auto it = std::find(localities_->begin(), localities_->end(), loc); 313 std::size_t num_loc = std::distance(localities_->begin(), it); 314 return (items < num_loc) ? 1 : 0; 315 } 316 317 // the last locality might get less items 318 if (locs > 1 && loc == localities_->back()) 319 { 320 return items - detail::round_to_multiple(items, locs, locs-1); 321 } 322 323 // otherwise just distribute evenly 324 return (items + locs - 1) / locs; 325 } 326 /// \endcond 327 328 protected: 329 /// \cond NOINTERNAL default_distribution_policyhpx::components::default_distribution_policy330 default_distribution_policy(std::vector<id_type> const& localities) 331 : localities_(std::make_shared<std::vector<id_type>>(localities)) 332 { 333 if (localities_->empty()) 334 { 335 HPX_THROW_EXCEPTION(invalid_status, 336 "default_distribution_policy::default_distribution_policy", 337 "unexpectedly empty list of localities"); 338 } 339 } 340 default_distribution_policyhpx::components::default_distribution_policy341 default_distribution_policy(std::vector<id_type> && localities) 342 : localities_(std::make_shared<std::vector<id_type>>(std::move(localities))) 343 { 344 if (localities_->empty()) 345 { 346 HPX_THROW_EXCEPTION(invalid_status, 347 "default_distribution_policy::default_distribution_policy", 348 "unexpectedly empty list of localities"); 349 } 350 } 351 default_distribution_policyhpx::components::default_distribution_policy352 default_distribution_policy(id_type const& locality) 353 : localities_(std::make_shared<std::vector<id_type>>(1, locality)) 354 {} 355 356 friend class hpx::serialization::access; 357 358 template <typename Archive> serializehpx::components::default_distribution_policy359 void serialize(Archive& ar, unsigned int const) 360 { 361 ar & localities_; 362 } 363 364 // localities to create things on 365 std::shared_ptr<std::vector<id_type>> localities_; 366 /// \endcond 367 }; 368 369 /// A predefined instance of the default \a distribution_policy. It will 370 /// represent the local locality and will place all items to create here. 371 static default_distribution_policy const default_layout{}; 372 }} 373 374 /// \cond NOINTERNAL 375 namespace hpx 376 { 377 using hpx::components::default_distribution_policy; 378 using hpx::components::default_layout; 379 380 namespace traits 381 { 382 template <> 383 struct is_distribution_policy<components::default_distribution_policy> 384 : std::true_type 385 {}; 386 } 387 } 388 /// \endcond 389 390 #endif 391