1 //  Copyright (c) 2014-2018 Hartmut Kaiser
2 //
3 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
4 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5 
6 /// \file default_distribution_policy.hpp
7 
8 #if !defined(HPX_COMPONENTS_DISTRIBUTION_POLICY_APR_07_2015_1246PM)
9 #define HPX_COMPONENTS_DISTRIBUTION_POLICY_APR_07_2015_1246PM
10 
11 #include <hpx/config.hpp>
12 #include <hpx/lcos/dataflow.hpp>
13 #include <hpx/lcos/future.hpp>
14 #include <hpx/lcos/packaged_action.hpp>
15 #include <hpx/runtime/actions/action_support.hpp>
16 #include <hpx/runtime/applier/apply.hpp>
17 #include <hpx/runtime/components/stubs/stub_base.hpp>
18 #include <hpx/runtime/launch_policy.hpp>
19 #include <hpx/runtime/find_here.hpp>
20 #include <hpx/runtime/naming/id_type.hpp>
21 #include <hpx/runtime/naming/name.hpp>
22 #include <hpx/runtime/serialization/serialization_fwd.hpp>
23 #include <hpx/runtime/serialization/vector.hpp>
24 #include <hpx/runtime/serialization/shared_ptr.hpp>
25 #include <hpx/traits/extract_action.hpp>
26 #include <hpx/traits/is_distribution_policy.hpp>
27 #include <hpx/traits/promise_local_result.hpp>
28 #include <hpx/util/assert.hpp>
29 
30 #include <algorithm>
31 #include <cstddef>
32 #include <memory>
33 #include <type_traits>
34 #include <utility>
35 #include <vector>
36 
37 namespace hpx { namespace components
38 {
39     ///////////////////////////////////////////////////////////////////////////
40     /// \cond NOINTERNAL
41     namespace detail
42     {
43         HPX_FORCEINLINE std::size_t
round_to_multiple(std::size_t n1,std::size_t n2,std::size_t n3)44         round_to_multiple(std::size_t n1, std::size_t n2, std::size_t n3)
45         {
46             return (n1 / n2) * n3;
47         }
48     }
49     /// \endcond
50 
51     /// This class specifies the parameters for a simple distribution policy
52     /// to use for creating (and evenly distributing) a given number of items
53     /// on a given set of localities.
54     struct default_distribution_policy
55     {
56     public:
57         /// Default-construct a new instance of a \a default_distribution_policy.
58         /// This policy will represent one locality (the local locality).
59         default_distribution_policy() = default;
60 
61         /// Create a new \a default_distribution policy representing the given
62         /// set of localities.
63         ///
64         /// \param locs     [in] The list of localities the new instance should
65         ///                 represent
operator ()hpx::components::default_distribution_policy66         default_distribution_policy operator()(
67             std::vector<id_type> const& locs) const
68         {
69             return default_distribution_policy(locs);
70         }
71 
72         /// Create a new \a default_distribution policy representing the given
73         /// set of localities.
74         ///
75         /// \param locs     [in] The list of localities the new instance should
76         ///                 represent
operator ()hpx::components::default_distribution_policy77         default_distribution_policy operator()(
78             std::vector<id_type>&& locs) const
79         {
80             return default_distribution_policy(std::move(locs));
81         }
82 
83         /// Create a new \a default_distribution policy representing the given
84         /// locality
85         ///
86         /// \param loc     [in] The locality the new instance should
87         ///                 represent
operator ()hpx::components::default_distribution_policy88         default_distribution_policy operator()(id_type const& loc) const
89         {
90             return default_distribution_policy(loc);
91         }
92 
93         /// Create one object on one of the localities associated by
94         /// this policy instance
95         ///
96         /// \param vs  [in] The arguments which will be forwarded to the
97         ///            constructor of the new object.
98         ///
99         /// \note This function is part of the placement policy implemented by
100         ///       this class
101         ///
102         /// \returns A future holding the global address which represents
103         ///          the newly created object
104         ///
105         template <typename Component, typename ...Ts>
createhpx::components::default_distribution_policy106         hpx::future<hpx::id_type> create(Ts&&... vs) const
107         {
108             using components::stub_base;
109 
110             if (localities_)
111             {
112                 for (hpx::id_type const& loc: *localities_)
113                 {
114                     if (get_num_items(1, loc) != 0)
115                     {
116                         return stub_base<Component>::create_async(
117                             loc, std::forward<Ts>(vs)...);
118                     }
119                 }
120             }
121 
122             // by default the object will be created on the current
123             // locality
124             return stub_base<Component>::create_async(
125                 hpx::find_here(), std::forward<Ts>(vs)...);
126         }
127 
128         /// \cond NOINTERNAL
129         typedef std::pair<hpx::id_type, std::vector<hpx::id_type> >
130             bulk_locality_result;
131         /// \endcond
132 
133         /// Create multiple objects on the localities associated by
134         /// this policy instance
135         ///
136         /// \param count [in] The number of objects to create
137         /// \param vs   [in] The arguments which will be forwarded to the
138         ///             constructors of the new objects.
139         ///
140         /// \note This function is part of the placement policy implemented by
141         ///       this class
142         ///
143         /// \returns A future holding the list of global addresses which
144         ///          represent the newly created objects
145         ///
146         template <typename Component, typename ...Ts>
147         hpx::future<std::vector<bulk_locality_result> >
bulk_createhpx::components::default_distribution_policy148         bulk_create(std::size_t count, Ts&&... vs) const
149         {
150             using components::stub_base;
151 
152             if (localities_ && localities_->size() > 1)
153             {
154                 // schedule creation of all objects across given localities
155                 std::vector<hpx::future<std::vector<hpx::id_type> > > objs;
156                 objs.reserve(localities_->size());
157                 for (hpx::id_type const& loc: *localities_)
158                 {
159                     objs.push_back(stub_base<Component>::bulk_create_async(
160                         loc, get_num_items(count, loc), vs...));
161                 }
162 
163                 // consolidate all results
164                 auto localities = localities_;
165                 return hpx::dataflow(hpx::launch::sync,
166                     [localities](
167                         std::vector<hpx::future<std::vector<hpx::id_type> > > && v
168                     ) mutable -> std::vector<bulk_locality_result>
169                     {
170                         HPX_ASSERT(localities->size() == v.size());
171 
172                         std::vector<bulk_locality_result> result;
173                         result.reserve(v.size());
174 
175                         for (std::size_t i = 0; i != v.size(); ++i)
176                         {
177                             result.emplace_back((*localities)[i], v[i].get());
178                         }
179                         return result;
180                     },
181                     std::move(objs));
182             }
183 
184             // handle special cases
185             hpx::id_type id = get_next_target();
186 
187             hpx::future<std::vector<hpx::id_type> > f =
188                 stub_base<Component>::bulk_create_async(
189                     id, count, std::forward<Ts>(vs)...);
190 
191             return f.then(hpx::launch::sync,
192                 [HPX_CAPTURE_MOVE(id)](
193                     hpx::future<std::vector<hpx::id_type> > && f
194                 ) -> std::vector<bulk_locality_result>
195                 {
196                     std::vector<bulk_locality_result> result;
197                     result.emplace_back(id, f.get());
198                     return result;
199                 });
200         }
201 
202         /// \note This function is part of the invocation policy implemented by
203         ///       this class
204         ///
205         template <typename Action>
206         struct async_result
207         {
208             using type = hpx::future<typename traits::promise_local_result<
209                 typename hpx::traits::extract_action<Action>::remote_result_type
210             >::type>;
211         };
212 
213         template <typename Action, typename ...Ts>
214         typename async_result<Action>::type
asynchpx::components::default_distribution_policy215         async(launch policy, Ts&&... vs) const
216         {
217             return hpx::detail::async_impl<Action>(policy,
218                 get_next_target(), std::forward<Ts>(vs)...);
219         }
220 
221         /// \note This function is part of the invocation policy implemented by
222         ///       this class
223         ///
224         template <typename Action, typename Callback, typename ...Ts>
225         typename async_result<Action>::type
async_cbhpx::components::default_distribution_policy226         async_cb(launch policy, Callback&& cb, Ts&&... vs) const
227         {
228             return hpx::detail::async_cb_impl<Action>(policy,
229                 get_next_target(), std::forward<Callback>(cb),
230                 std::forward<Ts>(vs)...);
231         }
232 
233         /// \note This function is part of the invocation policy implemented by
234         ///       this class
235         ///
236         template <typename Action, typename Continuation, typename ...Ts>
applyhpx::components::default_distribution_policy237         bool apply(Continuation && c,
238             threads::thread_priority priority, Ts&&... vs) const
239         {
240             return hpx::detail::apply_impl<Action>(std::forward<Continuation>(c),
241                 get_next_target(), priority, std::forward<Ts>(vs)...);
242         }
243 
244         template <typename Action, typename ...Ts>
applyhpx::components::default_distribution_policy245         bool apply(
246             threads::thread_priority priority, Ts&&... vs) const
247         {
248             return hpx::detail::apply_impl<Action>(
249                 get_next_target(), priority, std::forward<Ts>(vs)...);
250         }
251 
252         /// \note This function is part of the invocation policy implemented by
253         ///       this class
254         ///
255         template <typename Action, typename Continuation, typename Callback,
256             typename ...Ts>
apply_cbhpx::components::default_distribution_policy257         bool apply_cb(Continuation && c,
258             threads::thread_priority priority, Callback&& cb, Ts&&... vs) const
259         {
260             return hpx::detail::apply_cb_impl<Action>(std::forward<Continuation>(c),
261                 get_next_target(), priority, std::forward<Callback>(cb),
262                 std::forward<Ts>(vs)...);
263         }
264 
265         template <typename Action, typename Callback, typename ...Ts>
apply_cbhpx::components::default_distribution_policy266         bool apply_cb(
267             threads::thread_priority priority, Callback&& cb, Ts&&... vs) const
268         {
269             return hpx::detail::apply_cb_impl<Action>(
270                 get_next_target(), priority, std::forward<Callback>(cb),
271                 std::forward<Ts>(vs)...);
272         }
273 
274         /// Returns the number of associated localities for this distribution
275         /// policy
276         ///
277         /// \note This function is part of the creation policy implemented by
278         ///       this class
279         ///
get_num_localitieshpx::components::default_distribution_policy280         std::size_t get_num_localities() const
281         {
282             return !localities_ ? std::size_t(1) : localities_->size();
283         }
284 
285         /// Returns the locality which is anticipated to be used for the next
286         /// async operation
get_next_targethpx::components::default_distribution_policy287         hpx::id_type get_next_target() const
288         {
289             return !localities_ ? hpx::find_here() : localities_->front();
290         }
291 
292     protected:
293         /// \cond NOINTERNAL
get_num_itemshpx::components::default_distribution_policy294         std::size_t get_num_items(
295             std::size_t items, hpx::id_type const& loc) const
296         {
297             // make sure the given id is known to this distribution policy
298             HPX_ASSERT(
299                 localities_ &&
300                 std::find(localities_->begin(), localities_->end(), loc) !=
301                     localities_->end()
302             );
303 
304             // this distribution policy places an equal number of items onto
305             // each locality
306             std::size_t locs = localities_->size();
307 
308             // the overall number of items to create is smaller than the number
309             // of localities
310             if (items < locs)
311             {
312                 auto it = std::find(localities_->begin(), localities_->end(), loc);
313                 std::size_t num_loc = std::distance(localities_->begin(), it);
314                 return (items < num_loc) ? 1 : 0;
315             }
316 
317             // the last locality might get less items
318             if (locs > 1 && loc == localities_->back())
319             {
320                 return items - detail::round_to_multiple(items, locs, locs-1);
321             }
322 
323             // otherwise just distribute evenly
324             return (items + locs - 1) / locs;
325         }
326         /// \endcond
327 
328     protected:
329         /// \cond NOINTERNAL
default_distribution_policyhpx::components::default_distribution_policy330         default_distribution_policy(std::vector<id_type> const& localities)
331           : localities_(std::make_shared<std::vector<id_type>>(localities))
332         {
333             if (localities_->empty())
334             {
335                 HPX_THROW_EXCEPTION(invalid_status,
336                     "default_distribution_policy::default_distribution_policy",
337                     "unexpectedly empty list of localities");
338             }
339         }
340 
default_distribution_policyhpx::components::default_distribution_policy341         default_distribution_policy(std::vector<id_type> && localities)
342           : localities_(std::make_shared<std::vector<id_type>>(std::move(localities)))
343         {
344             if (localities_->empty())
345             {
346                 HPX_THROW_EXCEPTION(invalid_status,
347                     "default_distribution_policy::default_distribution_policy",
348                     "unexpectedly empty list of localities");
349             }
350         }
351 
default_distribution_policyhpx::components::default_distribution_policy352         default_distribution_policy(id_type const& locality)
353           : localities_(std::make_shared<std::vector<id_type>>(1, locality))
354         {}
355 
356         friend class hpx::serialization::access;
357 
358         template <typename Archive>
serializehpx::components::default_distribution_policy359         void serialize(Archive& ar, unsigned int const)
360         {
361             ar & localities_;
362         }
363 
364         // localities to create things on
365         std::shared_ptr<std::vector<id_type>> localities_;
366         /// \endcond
367     };
368 
369     /// A predefined instance of the default \a distribution_policy. It will
370     /// represent the local locality and will place all items to create here.
371     static default_distribution_policy const default_layout{};
372 }}
373 
374 /// \cond NOINTERNAL
375 namespace hpx
376 {
377     using hpx::components::default_distribution_policy;
378     using hpx::components::default_layout;
379 
380     namespace traits
381     {
382         template <>
383         struct is_distribution_policy<components::default_distribution_policy>
384           : std::true_type
385         {};
386     }
387 }
388 /// \endcond
389 
390 #endif
391