1 //  Copyright (c) 2017 Antoine Tran Tan
2 //
3 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
4 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5 
6 #if !defined(HPX_LCOS_SPMD_BLOCK_HPP)
7 #define HPX_LCOS_SPMD_BLOCK_HPP
8 
9 #include <hpx/include/plain_actions.hpp>
10 #include <hpx/lcos/future.hpp>
11 #include <hpx/lcos/barrier.hpp>
12 #include <hpx/lcos/broadcast.hpp>
13 #include <hpx/parallel/execution.hpp>
14 #include <hpx/runtime/get_locality_id.hpp>
15 #include <hpx/runtime/launch_policy.hpp>
16 #include <hpx/runtime/serialization/serialize.hpp>
17 #include <hpx/traits/concepts.hpp>
18 #include <hpx/traits/is_action.hpp>
19 #include <hpx/traits/is_iterator.hpp>
20 #include <hpx/util/detail/pack.hpp>
21 #include <hpx/util/first_argument.hpp>
22 #include <hpx/util/jenkins_hash.hpp>
23 
24 #include <boost/range/irange.hpp>
25 
26 #include <cstddef>
27 #include <functional>
28 #include <map>
29 #include <memory>
30 #include <string>
31 #include <set>
32 #include <type_traits>
33 #include <utility>
34 #include <vector>
35 
36 namespace hpx { namespace lcos
37 {
38     /// The class spmd_block defines an interface for launching
39     /// multiple images while giving handles to each image to interact with
40     /// the remaining images. The \a define_spmd_block function templates create
41     /// multiple images of a user-defined action and launches them in a possibly
42     /// separate thread. A temporary spmd block object is created and diffused
43     /// to each image. The constraint for the action given to the
44     /// define_spmd_block function is to accept a spmd_block as first parameter.
45     struct spmd_block
46     {
47     private:
48         using barrier_type = hpx::lcos::barrier;
49         using table_type =
50             std::map<std::set<std::size_t>,std::shared_ptr<barrier_type>>;
51     public:
spmd_blockhpx::lcos::spmd_block52         spmd_block(){}
53 
spmd_blockhpx::lcos::spmd_block54         explicit spmd_block(std::string name, std::size_t images_per_locality,
55             std::size_t num_images, std::size_t image_id)
56         : name_(name), images_per_locality_(images_per_locality)
57         , num_images_(num_images), image_id_(image_id)
58         , barrier_(
59             std::make_shared<hpx::lcos::barrier>(
60                 name_ + "_barrier", num_images_, image_id_))
61         {}
62 
63 
get_images_per_localityhpx::lcos::spmd_block64         std::size_t get_images_per_locality() const
65         {
66             return images_per_locality_;
67         }
68 
69 
get_num_imageshpx::lcos::spmd_block70         std::size_t get_num_images() const
71         {
72             return num_images_;
73         }
74 
this_imagehpx::lcos::spmd_block75         std::size_t this_image() const
76         {
77             return image_id_;
78         }
79 
sync_allhpx::lcos::spmd_block80         void sync_all() const
81         {
82            barrier_->wait();
83         }
84 
sync_allhpx::lcos::spmd_block85         hpx::future<void> sync_all(hpx::launch::async_policy const &) const
86         {
87            return barrier_->wait(hpx::launch::async);
88         }
89 
90         // Synchronous versions of sync_images()
91 
sync_imageshpx::lcos::spmd_block92         void sync_images(std::set<std::size_t> const & images) const
93         {
94             using list_type = std::set<std::size_t>;
95 
96             typename table_type::iterator table_it(barriers_.find(images));
97             typename list_type::iterator image_it(images.find(image_id_));
98 
99             // Is current image in the input list?
100             if(image_it != images.end())
101             {
102                 // Does the barrier for the input list non-exist?
103                 if(table_it == barriers_.end())
104                 {
105                     std::size_t rank = std::distance(images.begin(),image_it);
106                     std::string suffix;
107 
108                     for(std::size_t s : images)
109                         suffix += ("_" + std::to_string(s));
110 
111                     table_it = barriers_.insert({images,
112                         std::make_shared<barrier_type>(
113                             name_ + "_barrier_" + std::to_string(hash_(suffix)),
114                             images.size(),
115                             rank)}).first;
116                 }
117 
118                 table_it->second->wait();
119             }
120         }
121 
sync_imageshpx::lcos::spmd_block122         void sync_images(std::vector<std::size_t> const & input_images) const
123         {
124             std::set<std::size_t> images(
125                 input_images.begin(),input_images.end());
126             sync_images(images);
127         }
128 
129         template<typename Iterator>
130         typename std::enable_if<
131             traits::is_input_iterator<Iterator>::value
132         >::type
sync_imageshpx::lcos::spmd_block133         sync_images(Iterator begin, Iterator end) const
134         {
135             std::set<std::size_t> images(begin,end);
136             sync_images(images);
137         }
138 
139         template<typename ... I>
140         typename std::enable_if<
141             util::detail::all_of<
142                 typename std::is_integral<I>::type ... >::value
143         >::type
sync_imageshpx::lcos::spmd_block144         sync_images(I... i)
145         {
146             std::set<std::size_t> images = {(std::size_t)i...};
147             sync_images(images);
148         }
149 
150         // Asynchronous versions of sync_images()
151 
152         hpx::future<void>
sync_imageshpx::lcos::spmd_block153         sync_images(hpx::launch::async_policy const & policy,
154             std::set<std::size_t> const & images) const
155         {
156             using list_type = std::set<std::size_t>;
157 
158             typename table_type::iterator table_it(barriers_.find(images));
159             typename list_type::iterator image_it(images.find(image_id_));
160 
161             // Is current image in the input list?
162             if(image_it != images.end())
163             {
164                 // Does the barrier for the input list non-exist?
165                 if(table_it == barriers_.end())
166                 {
167                     std::size_t rank = std::distance(images.begin(),image_it);
168                     std::string suffix;
169 
170                     for(std::size_t s : images)
171                         suffix += ("_" + std::to_string(s));
172 
173                     table_it = barriers_.insert({images,
174                         std::make_shared<barrier_type>(
175                             name_ + "_barrier_" + std::to_string(hash_(suffix)),
176                             images.size(),
177                             rank)}).first;
178                 }
179 
180                 return table_it->second->wait(hpx::launch::async);
181             }
182 
183             return hpx::make_ready_future();
184         }
185 
186         hpx::future<void>
sync_imageshpx::lcos::spmd_block187         sync_images(hpx::launch::async_policy const & policy,
188             std::vector<std::size_t> const & input_images) const
189         {
190             std::set<std::size_t> images(
191                 input_images.begin(),input_images.end());
192             return sync_images(policy,images);
193         }
194 
195         template<typename Iterator>
196         typename std::enable_if<
197             traits::is_input_iterator<Iterator>::value,
198             hpx::future<void>
199         >::type
sync_imageshpx::lcos::spmd_block200         sync_images(hpx::launch::async_policy const & policy,
201             Iterator begin, Iterator end) const
202         {
203             std::set<std::size_t> images(begin,end);
204             return sync_images(policy,images);
205         }
206 
207         template<typename ... I>
208         typename std::enable_if<
209             util::detail::all_of<
210                 typename std::is_integral<I>::type ... >::value,
211             hpx::future<void>
212         >::type
sync_imageshpx::lcos::spmd_block213         sync_images(hpx::launch::async_policy const & policy,
214             I ... i) const
215         {
216             std::set<std::size_t> images = {(std::size_t)i...};
217             return sync_images(policy,images);
218         }
219 
220     private:
221         std::string name_;
222         std::size_t images_per_locality_;
223         std::size_t num_images_;
224         std::size_t image_id_;
225         hpx::util::jenkins_hash hash_;
226 
227         // Note : barrier is stored as a pointer because hpx::lcos::barrier
228         // default constructor does not exist (Needed by
229         // spmd_block::spmd_block())
230         mutable std::shared_ptr<hpx::lcos::barrier> barrier_;
231         mutable table_type barriers_;
232 
233     private:
234         friend class hpx::serialization::access;
235 
236         // dummy serialization functionality
237         template <typename Archive>
serializehpx::lcos::spmd_block238         void serialize(Archive &, unsigned) {}
239     };
240 
241     // Helpers for bulk_execute() invoked in define_spmd_block()
242     namespace detail
243     {
244         template <typename F>
245         struct spmd_block_helper
246         {
247             std::string name_;
248             std::size_t images_per_locality_;
249             std::size_t num_images_;
250 
251             template <typename ... Ts>
operator ()hpx::lcos::detail::spmd_block_helper252             void operator()(std::size_t image_id, Ts && ... ts) const
253             {
254                 using first_type =
255                     typename hpx::util::first_argument<F>::type;
256 
257                 static_assert(std::is_same<hpx::lcos::spmd_block,
258                     first_type>::value,
259                         "define_spmd_block() needs an action that " \
260                         "has at least a spmd_block as 1st argument");
261 
262                 hpx::lcos::spmd_block block(name_, images_per_locality_,
263                     num_images_, image_id);
264 
265                 F()(hpx::launch::sync,
266                     hpx::find_here(),
267                     std::move(block),
268                     std::forward<Ts>(ts)...);
269             }
270         };
271     }
272 
273     // Helper for define_spmd_block()
274     namespace detail
275     {
276         template <typename F, typename ... Args>
277         struct spmd_block_helper_action
278         {
callhpx::lcos::detail::spmd_block_helper_action279             static void call(
280                 std::string name,
281                 std::size_t images_per_locality,
282                 std::size_t num_images,
283                 Args... args)
284             {
285                 using executor_type =
286                     hpx::parallel::execution::parallel_executor;
287 
288                 executor_type exec;
289                 std::size_t offset = hpx::get_locality_id();
290                 offset *= images_per_locality;
291 
292                 hpx::parallel::execution::bulk_sync_execute(
293                     exec,
294                     detail::spmd_block_helper<F>{
295                         name,images_per_locality, num_images},
296                     boost::irange(
297                         offset, offset + images_per_locality),
298                     args...);
299             }
300         };
301     }
302 
303     template <typename F, typename ... Args,
304         HPX_CONCEPT_REQUIRES_(hpx::traits::is_action<F>::value)
305         >
306     hpx::future<void>
define_spmd_block(std::string && name,std::size_t images_per_locality,F && f,Args &&...args)307     define_spmd_block(std::string && name, std::size_t images_per_locality,
308         F && f, Args && ... args)
309     {
310         using ftype = typename std::decay<F>::type;
311 
312         using helper_type =
313             hpx::lcos::detail::spmd_block_helper_action<
314                ftype, typename std::decay<Args>::type...>;
315 
316         using helper_action_type =
317             typename hpx::actions::make_action<
318                 decltype( &helper_type::call ), &helper_type::call >::type;
319 
320         helper_action_type act;
321 
322         std::size_t num_images
323             = hpx::get_num_localities(hpx::launch::sync) * images_per_locality;
324 
325         return
326             hpx::lcos::broadcast(
327                 act, hpx::find_all_localities(),
328                     std::forward<std::string>(name), images_per_locality,
329                         num_images, std::forward<Args>(args)...);
330     }
331 }}
332 
333 #endif
334