1 // Copyright (c) 2017 Antoine Tran Tan 2 // 3 // Distributed under the Boost Software License, Version 1.0. (See accompanying 4 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 5 6 #if !defined(HPX_LCOS_SPMD_BLOCK_HPP) 7 #define HPX_LCOS_SPMD_BLOCK_HPP 8 9 #include <hpx/include/plain_actions.hpp> 10 #include <hpx/lcos/future.hpp> 11 #include <hpx/lcos/barrier.hpp> 12 #include <hpx/lcos/broadcast.hpp> 13 #include <hpx/parallel/execution.hpp> 14 #include <hpx/runtime/get_locality_id.hpp> 15 #include <hpx/runtime/launch_policy.hpp> 16 #include <hpx/runtime/serialization/serialize.hpp> 17 #include <hpx/traits/concepts.hpp> 18 #include <hpx/traits/is_action.hpp> 19 #include <hpx/traits/is_iterator.hpp> 20 #include <hpx/util/detail/pack.hpp> 21 #include <hpx/util/first_argument.hpp> 22 #include <hpx/util/jenkins_hash.hpp> 23 24 #include <boost/range/irange.hpp> 25 26 #include <cstddef> 27 #include <functional> 28 #include <map> 29 #include <memory> 30 #include <string> 31 #include <set> 32 #include <type_traits> 33 #include <utility> 34 #include <vector> 35 36 namespace hpx { namespace lcos 37 { 38 /// The class spmd_block defines an interface for launching 39 /// multiple images while giving handles to each image to interact with 40 /// the remaining images. The \a define_spmd_block function templates create 41 /// multiple images of a user-defined action and launches them in a possibly 42 /// separate thread. A temporary spmd block object is created and diffused 43 /// to each image. The constraint for the action given to the 44 /// define_spmd_block function is to accept a spmd_block as first parameter. 45 struct spmd_block 46 { 47 private: 48 using barrier_type = hpx::lcos::barrier; 49 using table_type = 50 std::map<std::set<std::size_t>,std::shared_ptr<barrier_type>>; 51 public: spmd_blockhpx::lcos::spmd_block52 spmd_block(){} 53 spmd_blockhpx::lcos::spmd_block54 explicit spmd_block(std::string name, std::size_t images_per_locality, 55 std::size_t num_images, std::size_t image_id) 56 : name_(name), images_per_locality_(images_per_locality) 57 , num_images_(num_images), image_id_(image_id) 58 , barrier_( 59 std::make_shared<hpx::lcos::barrier>( 60 name_ + "_barrier", num_images_, image_id_)) 61 {} 62 63 get_images_per_localityhpx::lcos::spmd_block64 std::size_t get_images_per_locality() const 65 { 66 return images_per_locality_; 67 } 68 69 get_num_imageshpx::lcos::spmd_block70 std::size_t get_num_images() const 71 { 72 return num_images_; 73 } 74 this_imagehpx::lcos::spmd_block75 std::size_t this_image() const 76 { 77 return image_id_; 78 } 79 sync_allhpx::lcos::spmd_block80 void sync_all() const 81 { 82 barrier_->wait(); 83 } 84 sync_allhpx::lcos::spmd_block85 hpx::future<void> sync_all(hpx::launch::async_policy const &) const 86 { 87 return barrier_->wait(hpx::launch::async); 88 } 89 90 // Synchronous versions of sync_images() 91 sync_imageshpx::lcos::spmd_block92 void sync_images(std::set<std::size_t> const & images) const 93 { 94 using list_type = std::set<std::size_t>; 95 96 typename table_type::iterator table_it(barriers_.find(images)); 97 typename list_type::iterator image_it(images.find(image_id_)); 98 99 // Is current image in the input list? 100 if(image_it != images.end()) 101 { 102 // Does the barrier for the input list non-exist? 103 if(table_it == barriers_.end()) 104 { 105 std::size_t rank = std::distance(images.begin(),image_it); 106 std::string suffix; 107 108 for(std::size_t s : images) 109 suffix += ("_" + std::to_string(s)); 110 111 table_it = barriers_.insert({images, 112 std::make_shared<barrier_type>( 113 name_ + "_barrier_" + std::to_string(hash_(suffix)), 114 images.size(), 115 rank)}).first; 116 } 117 118 table_it->second->wait(); 119 } 120 } 121 sync_imageshpx::lcos::spmd_block122 void sync_images(std::vector<std::size_t> const & input_images) const 123 { 124 std::set<std::size_t> images( 125 input_images.begin(),input_images.end()); 126 sync_images(images); 127 } 128 129 template<typename Iterator> 130 typename std::enable_if< 131 traits::is_input_iterator<Iterator>::value 132 >::type sync_imageshpx::lcos::spmd_block133 sync_images(Iterator begin, Iterator end) const 134 { 135 std::set<std::size_t> images(begin,end); 136 sync_images(images); 137 } 138 139 template<typename ... I> 140 typename std::enable_if< 141 util::detail::all_of< 142 typename std::is_integral<I>::type ... >::value 143 >::type sync_imageshpx::lcos::spmd_block144 sync_images(I... i) 145 { 146 std::set<std::size_t> images = {(std::size_t)i...}; 147 sync_images(images); 148 } 149 150 // Asynchronous versions of sync_images() 151 152 hpx::future<void> sync_imageshpx::lcos::spmd_block153 sync_images(hpx::launch::async_policy const & policy, 154 std::set<std::size_t> const & images) const 155 { 156 using list_type = std::set<std::size_t>; 157 158 typename table_type::iterator table_it(barriers_.find(images)); 159 typename list_type::iterator image_it(images.find(image_id_)); 160 161 // Is current image in the input list? 162 if(image_it != images.end()) 163 { 164 // Does the barrier for the input list non-exist? 165 if(table_it == barriers_.end()) 166 { 167 std::size_t rank = std::distance(images.begin(),image_it); 168 std::string suffix; 169 170 for(std::size_t s : images) 171 suffix += ("_" + std::to_string(s)); 172 173 table_it = barriers_.insert({images, 174 std::make_shared<barrier_type>( 175 name_ + "_barrier_" + std::to_string(hash_(suffix)), 176 images.size(), 177 rank)}).first; 178 } 179 180 return table_it->second->wait(hpx::launch::async); 181 } 182 183 return hpx::make_ready_future(); 184 } 185 186 hpx::future<void> sync_imageshpx::lcos::spmd_block187 sync_images(hpx::launch::async_policy const & policy, 188 std::vector<std::size_t> const & input_images) const 189 { 190 std::set<std::size_t> images( 191 input_images.begin(),input_images.end()); 192 return sync_images(policy,images); 193 } 194 195 template<typename Iterator> 196 typename std::enable_if< 197 traits::is_input_iterator<Iterator>::value, 198 hpx::future<void> 199 >::type sync_imageshpx::lcos::spmd_block200 sync_images(hpx::launch::async_policy const & policy, 201 Iterator begin, Iterator end) const 202 { 203 std::set<std::size_t> images(begin,end); 204 return sync_images(policy,images); 205 } 206 207 template<typename ... I> 208 typename std::enable_if< 209 util::detail::all_of< 210 typename std::is_integral<I>::type ... >::value, 211 hpx::future<void> 212 >::type sync_imageshpx::lcos::spmd_block213 sync_images(hpx::launch::async_policy const & policy, 214 I ... i) const 215 { 216 std::set<std::size_t> images = {(std::size_t)i...}; 217 return sync_images(policy,images); 218 } 219 220 private: 221 std::string name_; 222 std::size_t images_per_locality_; 223 std::size_t num_images_; 224 std::size_t image_id_; 225 hpx::util::jenkins_hash hash_; 226 227 // Note : barrier is stored as a pointer because hpx::lcos::barrier 228 // default constructor does not exist (Needed by 229 // spmd_block::spmd_block()) 230 mutable std::shared_ptr<hpx::lcos::barrier> barrier_; 231 mutable table_type barriers_; 232 233 private: 234 friend class hpx::serialization::access; 235 236 // dummy serialization functionality 237 template <typename Archive> serializehpx::lcos::spmd_block238 void serialize(Archive &, unsigned) {} 239 }; 240 241 // Helpers for bulk_execute() invoked in define_spmd_block() 242 namespace detail 243 { 244 template <typename F> 245 struct spmd_block_helper 246 { 247 std::string name_; 248 std::size_t images_per_locality_; 249 std::size_t num_images_; 250 251 template <typename ... Ts> operator ()hpx::lcos::detail::spmd_block_helper252 void operator()(std::size_t image_id, Ts && ... ts) const 253 { 254 using first_type = 255 typename hpx::util::first_argument<F>::type; 256 257 static_assert(std::is_same<hpx::lcos::spmd_block, 258 first_type>::value, 259 "define_spmd_block() needs an action that " \ 260 "has at least a spmd_block as 1st argument"); 261 262 hpx::lcos::spmd_block block(name_, images_per_locality_, 263 num_images_, image_id); 264 265 F()(hpx::launch::sync, 266 hpx::find_here(), 267 std::move(block), 268 std::forward<Ts>(ts)...); 269 } 270 }; 271 } 272 273 // Helper for define_spmd_block() 274 namespace detail 275 { 276 template <typename F, typename ... Args> 277 struct spmd_block_helper_action 278 { callhpx::lcos::detail::spmd_block_helper_action279 static void call( 280 std::string name, 281 std::size_t images_per_locality, 282 std::size_t num_images, 283 Args... args) 284 { 285 using executor_type = 286 hpx::parallel::execution::parallel_executor; 287 288 executor_type exec; 289 std::size_t offset = hpx::get_locality_id(); 290 offset *= images_per_locality; 291 292 hpx::parallel::execution::bulk_sync_execute( 293 exec, 294 detail::spmd_block_helper<F>{ 295 name,images_per_locality, num_images}, 296 boost::irange( 297 offset, offset + images_per_locality), 298 args...); 299 } 300 }; 301 } 302 303 template <typename F, typename ... Args, 304 HPX_CONCEPT_REQUIRES_(hpx::traits::is_action<F>::value) 305 > 306 hpx::future<void> define_spmd_block(std::string && name,std::size_t images_per_locality,F && f,Args &&...args)307 define_spmd_block(std::string && name, std::size_t images_per_locality, 308 F && f, Args && ... args) 309 { 310 using ftype = typename std::decay<F>::type; 311 312 using helper_type = 313 hpx::lcos::detail::spmd_block_helper_action< 314 ftype, typename std::decay<Args>::type...>; 315 316 using helper_action_type = 317 typename hpx::actions::make_action< 318 decltype( &helper_type::call ), &helper_type::call >::type; 319 320 helper_action_type act; 321 322 std::size_t num_images 323 = hpx::get_num_localities(hpx::launch::sync) * images_per_locality; 324 325 return 326 hpx::lcos::broadcast( 327 act, hpx::find_all_localities(), 328 std::forward<std::string>(name), images_per_locality, 329 num_images, std::forward<Args>(args)...); 330 } 331 }} 332 333 #endif 334