1 //  Copyright (c) 2007-2018 Hartmut Kaiser
2 //
3 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
4 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5 
6 /// \file binpacking_distribution_policy.hpp
7 
8 #if !defined(HPX_COMPONENTS_BINPACKING_DISTRIBUTION_POLICY_APR_10_2015_0344PM)
9 #define HPX_COMPONENTS_BINPACKING_DISTRIBUTION_POLICY_APR_10_2015_0344PM
10 
11 #include <hpx/config.hpp>
12 #include <hpx/dataflow.hpp>
13 #include <hpx/lcos/future.hpp>
14 #include <hpx/performance_counters/performance_counter.hpp>
15 #include <hpx/runtime/components/component_type.hpp>
16 #include <hpx/runtime/components/stubs/stub_base.hpp>
17 #include <hpx/runtime/find_here.hpp>
18 #include <hpx/runtime/naming/id_type.hpp>
19 #include <hpx/runtime/naming/name.hpp>
20 #include <hpx/runtime/serialization/serialization_fwd.hpp>
21 #include <hpx/runtime/serialization/string.hpp>
22 #include <hpx/runtime/serialization/vector.hpp>
23 #include <hpx/traits/is_distribution_policy.hpp>
24 #include <hpx/util/assert.hpp>
25 #include <hpx/util/bind_back.hpp>
26 #include <hpx/util/unwrap.hpp>
27 
28 #include <algorithm>
29 #include <cstddef>
30 #include <cstdint>
31 #include <iterator>
32 #include <string>
33 #include <type_traits>
34 #include <utility>
35 #include <vector>
36 
37 namespace hpx { namespace components
38 {
39     static char const* const default_binpacking_counter_name =
40         "/runtime{locality/total}/count/component@";
41 
42     namespace detail
43     {
44         /// \cond NOINTERNAL
45         HPX_EXPORT std::vector<std::size_t>
46         get_items_count(std::size_t count, std::vector<std::uint64_t> const& values);
47 
48         HPX_EXPORT hpx::future<std::vector<std::uint64_t> >
49         retrieve_counter_values(
50             std::vector<performance_counters::performance_counter> && counters);
51 
52         HPX_EXPORT hpx::future<std::vector<std::uint64_t> > get_counter_values(
53             std::string component_name, std::string const& counter_name,
54             std::vector<hpx::id_type> const& localities);
55 
56         HPX_EXPORT hpx::id_type const& get_best_locality(
57             hpx::future<std::vector<std::uint64_t> > && f,
58             std::vector<hpx::id_type> const& localities);
59 
60         template <typename Component>
61         struct create_helper
62         {
create_helperhpx::components::detail::create_helper63             create_helper(std::vector<hpx::id_type> const& localities)
64               : localities_(localities)
65             {}
66 
67             template <typename ...Ts>
operator ()hpx::components::detail::create_helper68             hpx::future<hpx::id_type> operator()(
69                 hpx::future<std::vector<std::uint64_t> > && values,
70                 Ts&&... vs) const
71             {
72                 hpx::id_type const& best_locality =
73                     get_best_locality(std::move(values), localities_);
74 
75                 return stub_base<Component>::create_async(
76                     best_locality, std::forward<Ts>(vs)...);
77             }
78 
79             std::vector<hpx::id_type> const& localities_;
80         };
81 
82         template <typename Component>
83         struct create_bulk_helper
84         {
85             typedef std::pair<hpx::id_type, std::vector<hpx::id_type> >
86                 bulk_locality_result;
87 
create_bulk_helperhpx::components::detail::create_bulk_helper88             create_bulk_helper(std::vector<hpx::id_type> const& localities)
89               : localities_(localities)
90             {}
91 
92             template <typename ...Ts>
93             hpx::future<std::vector<bulk_locality_result> >
operator ()hpx::components::detail::create_bulk_helper94             operator()(
95                 hpx::future<std::vector<std::uint64_t> > && values,
96                 std::size_t count, Ts&&... vs) const
97             {
98                 std::vector<std::size_t> to_create =
99                     detail::get_items_count(count, values.get());
100 
101                 std::vector<hpx::future<std::vector<hpx::id_type> > > objs;
102                 objs.reserve(localities_.size());
103 
104                 for (std::size_t i = 0; i != to_create.size(); ++i)
105                 {
106                     objs.push_back(stub_base<Component>::bulk_create_async(
107                         localities_[i], to_create[i], vs...));
108                 }
109 
110                 // consolidate all results
111                 return hpx::dataflow(hpx::launch::sync,
112                     [=](std::vector<hpx::future<std::vector<hpx::id_type> > > && v)
113                         mutable -> std::vector<bulk_locality_result>
114                     {
115                         HPX_ASSERT(localities_.size() == v.size());
116 
117                         std::vector<bulk_locality_result> result;
118                         result.reserve(v.size());
119 
120                         for (std::size_t i = 0; i != v.size(); ++i)
121                         {
122                             result.emplace_back(
123                                     std::move(localities_[i]), v[i].get()
124                                 );
125                         }
126                         return result;
127                     },
128                     std::move(objs));
129             }
130 
131             std::vector<hpx::id_type> const& localities_;
132         };
133         /// \endcond
134     }
135 
136     /// This class specifies the parameters for a binpacking distribution policy
137     /// to use for creating a given number of items on a given set of localities.
138     /// The binpacking policy will distribute the new objects in a way such that
139     /// each of the localities will equalize the number of overall objects of
140     /// this type based on a given criteria (by default this criteria is the
141     /// overall number of objects of this type).
142     struct binpacking_distribution_policy
143     {
144     public:
145         /// Default-construct a new instance of a \a binpacking_distribution_policy.
146         /// This policy will represent one locality (the local locality).
binpacking_distribution_policyhpx::components::binpacking_distribution_policy147         binpacking_distribution_policy()
148           : counter_name_(default_binpacking_counter_name)
149         {}
150 
151         /// Create a new \a default_distribution policy representing the given
152         /// set of localities.
153         ///
154         /// \param locs     [in] The list of localities the new instance should
155         ///                 represent
156         /// \param perf_counter_name  [in] The name of the performance counter which
157         ///                      should be used as the distribution criteria
158         ///                      (by default the overall number of existing
159         ///                      instances of the given component type will be
160         ///                      used).
161         ///
operator ()hpx::components::binpacking_distribution_policy162         binpacking_distribution_policy operator()(
163             std::vector<id_type> const& locs,
164             char const* perf_counter_name = default_binpacking_counter_name) const
165         {
166 #if defined(HPX_DEBUG)
167             for (id_type const& loc: locs)
168             {
169                 HPX_ASSERT(naming::is_locality(loc));
170             }
171 #endif
172             return binpacking_distribution_policy(locs, perf_counter_name);
173         }
174 
175         /// Create a new \a default_distribution policy representing the given
176         /// set of localities.
177         ///
178         /// \param locs     [in] The list of localities the new instance should
179         ///                 represent
180         /// \param perf_counter_name  [in] The name of the performance counter which
181         ///                      should be used as the distribution criteria
182         ///                      (by default the overall number of existing
183         ///                      instances of the given component type will be
184         ///                      used).
185         ///
operator ()hpx::components::binpacking_distribution_policy186         binpacking_distribution_policy operator()(
187             std::vector<id_type> && locs,
188             char const* perf_counter_name = default_binpacking_counter_name) const
189         {
190 #if defined(HPX_DEBUG)
191             for (id_type const& loc: locs)
192             {
193                 HPX_ASSERT(naming::is_locality(loc));
194             }
195 #endif
196             return binpacking_distribution_policy(std::move(locs),
197                 perf_counter_name);
198         }
199 
200         /// Create a new \a default_distribution policy representing the given
201         /// locality
202         ///
203         /// \param loc     [in] The locality the new instance should
204         ///                 represent
205         /// \param perf_counter_name  [in] The name of the performance counter which
206         ///                      should be used as the distribution criteria
207         ///                      (by default the overall number of existing
208         ///                      instances of the given component type will be
209         ///                      used).
210         ///
operator ()hpx::components::binpacking_distribution_policy211         binpacking_distribution_policy operator()(id_type const& loc,
212             char const* perf_counter_name = default_binpacking_counter_name) const
213         {
214             HPX_ASSERT(naming::is_locality(loc));
215             return binpacking_distribution_policy(loc, perf_counter_name);
216         }
217 
218         /// Create one object on one of the localities associated by
219         /// this policy instance
220         ///
221         /// \param vs  [in] The arguments which will be forwarded to the
222         ///            constructor of the new object.
223         ///
224         /// \returns A future holding the global address which represents
225         ///          the newly created object
226         ///
227         template <typename Component, typename ...Ts>
createhpx::components::binpacking_distribution_policy228         hpx::future<hpx::id_type> create(Ts&&... vs) const
229         {
230             using components::stub_base;
231 
232             // handle special cases
233             if (localities_.size() == 0)
234             {
235                 return stub_base<Component>::create_async(
236                     hpx::find_here(), std::forward<Ts>(vs)...);
237             }
238             else if (localities_.size() == 1)
239             {
240                 return stub_base<Component>::create_async(
241                     localities_.front(), std::forward<Ts>(vs)...);
242             }
243 
244             // schedule creation of all objects across given localities
245             hpx::future<std::vector<std::uint64_t> > values =
246                 detail::get_counter_values(
247                     get_component_name<Component>(),
248                     counter_name_, localities_);
249 
250             return values.then(hpx::util::bind_back(
251                 detail::create_helper<Component>(localities_),
252                 std::forward<Ts>(vs)...));
253         }
254 
255         /// \cond NOINTERNAL
256         typedef std::pair<hpx::id_type, std::vector<hpx::id_type> >
257             bulk_locality_result;
258         /// \endcond
259 
260         /// Create multiple objects on the localities associated by
261         /// this policy instance
262         ///
263         /// \param count [in] The number of objects to create
264         /// \param vs   [in] The arguments which will be forwarded to the
265         ///             constructors of the new objects.
266         ///
267         /// \returns A future holding the list of global addresses which
268         ///          represent the newly created objects
269         ///
270         template <typename Component, typename ...Ts>
271         hpx::future<std::vector<bulk_locality_result> >
bulk_createhpx::components::binpacking_distribution_policy272         bulk_create(std::size_t count, Ts&&... vs) const
273         {
274             using components::stub_base;
275 
276             if (localities_.size() > 1)
277             {
278                 // schedule creation of all objects across given localities
279                 hpx::future<std::vector<std::uint64_t> > values =
280                     detail::get_counter_values(
281                     get_component_name<Component>(),
282                     counter_name_, localities_);
283 
284                 return values.then(
285                     hpx::util::bind_back(
286                         detail::create_bulk_helper<Component>(localities_),
287                         count, std::forward<Ts>(vs)...));
288             }
289 
290             // handle special cases
291             hpx::id_type id =
292                 localities_.empty() ? hpx::find_here() : localities_.front();
293 
294             hpx::future<std::vector<hpx::id_type> > f =
295                 stub_base<Component>::bulk_create_async(
296                     id, count, std::forward<Ts>(vs)...);
297 
298             return f.then(hpx::launch::sync,
299                 [HPX_CAPTURE_MOVE(id)](
300                     hpx::future<std::vector<hpx::id_type> > && f
301                 ) -> std::vector<bulk_locality_result>
302                 {
303                     std::vector<bulk_locality_result> result;
304                     result.emplace_back(id, f.get());
305                     return result;
306                 });
307         }
308 
309         /// Returns the name of the performance counter associated with this
310         /// policy instance.
get_counter_namehpx::components::binpacking_distribution_policy311         std::string const& get_counter_name() const
312         {
313             return counter_name_;
314         }
315 
316         /// Returns the number of associated localities for this distribution
317         /// policy
318         ///
319         /// \note This function is part of the creation policy implemented by
320         ///       this class
321         ///
get_num_localitieshpx::components::binpacking_distribution_policy322         std::size_t get_num_localities() const
323         {
324             return localities_.size();
325         }
326 
327     protected:
328         /// \cond NOINTERNAL
binpacking_distribution_policyhpx::components::binpacking_distribution_policy329         binpacking_distribution_policy(std::vector<id_type> const& localities,
330                 char const* perf_counter_name)
331           : localities_(localities),
332             counter_name_(perf_counter_name)
333         {}
334 
binpacking_distribution_policyhpx::components::binpacking_distribution_policy335         binpacking_distribution_policy(std::vector<id_type> && localities,
336                 char const* perf_counter_name)
337           : localities_(std::move(localities)),
338             counter_name_(perf_counter_name)
339         {}
340 
binpacking_distribution_policyhpx::components::binpacking_distribution_policy341         binpacking_distribution_policy(id_type const& locality,
342                 char const* perf_counter_name)
343           : counter_name_(perf_counter_name)
344         {
345             localities_.push_back(locality);
346         }
347 
348         friend class hpx::serialization::access;
349 
350         template <typename Archive>
serializehpx::components::binpacking_distribution_policy351         void serialize(Archive& ar, unsigned int const)
352         {
353             ar & counter_name_ & localities_;
354         }
355 
356         std::vector<id_type> localities_;   // localities to create things on
357         std::string counter_name_;          // name of counter to use as criteria
358         /// \endcond
359     };
360 
361     /// A predefined instance of the binpacking \a distribution_policy. It will
362     /// represent the local locality and will place all items to create here.
363     static binpacking_distribution_policy const binpacked;
364 }}
365 
366 /// \cond NOINTERNAL
367 namespace hpx
368 {
369     using hpx::components::binpacking_distribution_policy;
370     using hpx::components::binpacked;
371 
372     namespace traits
373     {
374         template <>
375         struct is_distribution_policy<components::binpacking_distribution_policy>
376           : std::true_type
377         {};
378     }
379 }
380 /// \endcond
381 
382 #endif
383