1 //  Copyright (c) 2016 Hartmut Kaiser
2 //
3 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
4 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5 
6 /// \file host/target_distribution_policy.hpp
7 
8 #if !defined(HPX_COMPUTE_HOST_TARGET_DISTRIBUTION_POLICY)
9 #define HPX_COMPUTE_HOST_TARGET_DISTRIBUTION_POLICY
10 
11 #include <hpx/config.hpp>
12 
13 #include <hpx/lcos/dataflow.hpp>
14 #include <hpx/lcos/future.hpp>
15 #include <hpx/runtime/components/stubs/stub_base.hpp>
16 #include <hpx/runtime/serialization/base_object.hpp>
17 #include <hpx/traits/is_distribution_policy.hpp>
18 #include <hpx/util/assert.hpp>
19 
20 #include <hpx/compute/detail/target_distribution_policy.hpp>
21 #include <hpx/compute/host/target.hpp>
22 
23 #include <algorithm>
24 #include <cstddef>
25 #include <map>
26 #include <type_traits>
27 #include <utility>
28 #include <vector>
29 
30 namespace hpx { namespace compute { namespace host
31 {
32     /// A target_distribution_policy used for CPU bound localities.
33     struct target_distribution_policy
34       : compute::detail::target_distribution_policy<host::target>
35     {
36         typedef compute::detail::target_distribution_policy<host::target>
37             base_type;
38 
39         /// Default-construct a new instance of a \a target_distribution_policy.
40         /// This policy will represent all devices on the current locality.
41         ///
target_distribution_policyhpx::compute::host::target_distribution_policy42         target_distribution_policy() {}
43 
44         /// Create a new \a target_distribution_policy representing the given
45         /// set of targets
46         ///
47         /// \param targets [in] The targets the new instances should represent
48         ///
operator ()hpx::compute::host::target_distribution_policy49         target_distribution_policy operator()(
50             std::vector<target_type> const& targets,
51             std::size_t num_partitions = std::size_t(-1)) const
52         {
53             if (num_partitions == std::size_t(-1))
54                 num_partitions = targets.size();
55             return target_distribution_policy(targets, num_partitions);
56         }
57 
58         /// Create a new \a target_distribution_policy representing the given
59         /// set of targets
60         ///
61         /// \param targets [in] The targets the new instances should represent
62         ///
operator ()hpx::compute::host::target_distribution_policy63         target_distribution_policy operator()(
64             std::vector<target_type> && targets,
65             std::size_t num_partitions = std::size_t(-1)) const
66         {
67             if (num_partitions == std::size_t(-1))
68                 num_partitions = targets.size();
69             return target_distribution_policy(std::move(targets), num_partitions);
70         }
71 
72         /// Create a new \a target_distribution_policy representing the given
73         /// target
74         ///
75         /// \param target [in] The target the new instances should represent
76         ///
operator ()hpx::compute::host::target_distribution_policy77         target_distribution_policy operator()(
78             target_type const& target, std::size_t num_partitions = 1) const
79         {
80             std::vector<target_type> targets;
81             targets.push_back(target);
82             return target_distribution_policy(std::move(targets), num_partitions);
83         }
84 
85         /// Create a new \a target_distribution_policy representing the given
86         /// target
87         ///
88         /// \param target [in] The target the new instances should represent
89         ///
operator ()hpx::compute::host::target_distribution_policy90         target_distribution_policy operator()(
91             target_type && target, std::size_t num_partitions = 1) const
92         {
93             std::vector<target_type> targets;
94             targets.push_back(std::move(target));
95             return target_distribution_policy(std::move(targets), num_partitions);
96         }
97 
98         /// Create one object on one of the localities associated by
99         /// this policy instance
100         ///
101         /// \param ts  [in] The arguments which will be forwarded to the
102         ///            constructor of the new object.
103         ///
104         /// \note This function is part of the placement policy implemented by
105         ///       this class
106         ///
107         /// \returns A future holding the global address which represents
108         ///          the newly created object
109         ///
110         template <typename Component, typename ... Ts>
createhpx::compute::host::target_distribution_policy111         hpx::future<hpx::id_type> create(Ts &&... ts) const
112         {
113             target_type t = this->get_next_target();
114             hpx::id_type target_locality = t.get_locality();
115             return components::stub_base<Component>::create_async(
116                 target_locality, std::forward<Ts>(ts)..., std::move(t));
117         }
118 
119         /// \cond NOINTERNAL
120         typedef std::pair<hpx::id_type, std::vector<hpx::id_type> >
121             bulk_locality_result;
122         /// \endcond
123 
124         /// Create multiple objects on the localities associated by
125         /// this policy instance
126         ///
127         /// \param count [in] The number of objects to create
128         /// \param vs   [in] The arguments which will be forwarded to the
129         ///             constructors of the new objects.
130         ///
131         /// \note This function is part of the placement policy implemented by
132         ///       this class
133         ///
134         /// \returns A future holding the list of global addresses which
135         ///          represent the newly created objects
136         ///
137         template <typename Component, typename ...Ts>
138         hpx::future<std::vector<bulk_locality_result> >
bulk_createhpx::compute::host::target_distribution_policy139         bulk_create(std::size_t count, Ts &&... ts) const
140         {
141             // collect all targets per locality
142             std::map<hpx::id_type, std::vector<target_type> > m;
143             for(target_type const& t : this->targets_)
144             {
145                 m[t.get_locality()].push_back(t);
146             }
147 
148             std::vector<hpx::id_type> localities;
149             localities.reserve(m.size());
150 
151             std::vector<hpx::future<std::vector<hpx::id_type> > > objs;
152             objs.reserve(m.size());
153 
154             auto end = m.end();
155             for (auto it = m.begin(); it != end; ++it)
156             {
157                 localities.push_back(std::move(it->first));
158 
159                 std::size_t num_partitions = 0;
160                 for (target_type const& t : it->second)
161                 {
162                     num_partitions += this->get_num_items(count, t);
163                 }
164 
165                 objs.push_back(
166                     components::stub_base<Component>::bulk_create_async(
167                         localities.back(), num_partitions, ts...,
168                         std::move(it->second)));
169             }
170 
171             return hpx::dataflow(
172                 [=](std::vector<hpx::future<std::vector<hpx::id_type> > > && v)
173                     mutable -> std::vector<bulk_locality_result>
174                 {
175                     HPX_ASSERT(localities.size() == v.size());
176 
177                     std::vector<bulk_locality_result> result;
178                     result.reserve(v.size());
179 
180                     for (std::size_t i = 0; i != v.size(); ++i)
181                     {
182                         result.emplace_back(
183                                 std::move(localities[i]), v[i].get()
184                             );
185                     }
186 
187                     return result;
188                 },
189                 std::move(objs));
190         }
191 
192     protected:
193         /// \cond NOINTERNAL
target_distribution_policyhpx::compute::host::target_distribution_policy194         target_distribution_policy(std::vector<target_type> const& targets,
195                 std::size_t num_partitions)
196           : base_type(targets, num_partitions)
197         {}
198 
target_distribution_policyhpx::compute::host::target_distribution_policy199         target_distribution_policy(std::vector<target_type> && targets,
200                 std::size_t num_partitions)
201           : base_type(std::move(targets), num_partitions)
202         {}
203 
204         friend class hpx::serialization::access;
205 
206         template <typename Archive>
serializehpx::compute::host::target_distribution_policy207         void serialize(Archive& ar, unsigned int const)
208         {
209             ar & serialization::base_object<base_type>(*this);
210         }
211         /// \endcond
212     };
213 
214     /// A predefined instance of the \a target_distribution_policy for
215     /// localities. It will represent all NUMA domains of the given locality
216     /// and will place all items to create here.
217     static target_distribution_policy const target_layout;
218 }}}
219 
220 /// \cond NOINTERNAL
221 namespace hpx { namespace traits
222 {
223     template <>
224     struct is_distribution_policy<compute::host::target_distribution_policy>
225       : std::true_type
226     {};
227 
228     template <>
229     struct num_container_partitions<compute::host::target_distribution_policy>
230     {
231         static std::size_t
callhpx::traits::num_container_partitions232         call(compute::host::target_distribution_policy const& policy)
233         {
234             return policy.get_num_partitions();
235         }
236     };
237 }}
238 /// \endcond
239 
240 #endif
241