1// Copyright (C) 2004-2006 The Trustees of Indiana University. 2 3// Use, modification and distribution is subject to the Boost Software 4// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at 5// http://www.boost.org/LICENSE_1_0.txt) 6 7// Authors: Douglas Gregor 8// Nick Edmonds 9// Andrew Lumsdaine 10#include <boost/assert.hpp> 11#include <boost/property_map/parallel/distributed_property_map.hpp> 12#include <boost/property_map/parallel/detail/untracked_pair.hpp> 13#include <boost/type_traits/is_base_and_derived.hpp> 14#include <boost/bind.hpp> 15#include <boost/property_map/parallel/simple_trigger.hpp> 16 17namespace boost { namespace parallel { 18 19template<typename ProcessGroup, typename GlobalMap, typename StorageMap> 20template<typename Reduce> 21PBGL_DISTRIB_PMAP 22::distributed_property_map(const ProcessGroup& pg, const GlobalMap& global, 23 const StorageMap& pm, const Reduce& reduce) 24 : data(new data_t(pg, global, pm, reduce, Reduce::non_default_resolver)) 25{ 26 typedef handle_message<Reduce> Handler; 27 28 data->ghost_cells.reset(new ghost_cells_type()); 29 data->reset = &data_t::template do_reset<Reduce>; 30 data->process_group.replace_handler(Handler(data, reduce)); 31 data->process_group.template get_receiver<Handler>() 32 ->setup_triggers(data->process_group); 33} 34 35template<typename ProcessGroup, typename GlobalMap, typename StorageMap> 36PBGL_DISTRIB_PMAP::~distributed_property_map() { } 37 38template<typename ProcessGroup, typename GlobalMap, typename StorageMap> 39template<typename Reduce> 40void 41PBGL_DISTRIB_PMAP::set_reduce(const Reduce& reduce) 42{ 43 typedef handle_message<Reduce> Handler; 44 data->process_group.replace_handler(Handler(data, reduce)); 45 Handler* handler = data->process_group.template get_receiver<Handler>(); 46 BOOST_ASSERT(handler); 47 handler->setup_triggers(data->process_group); 48 data->get_default_value = reduce; 49 data->has_default_resolver = Reduce::non_default_resolver; 50 int model = data->model; 51 data->reset = &data_t::template do_reset<Reduce>; 52 set_consistency_model(model); 53} 54 55template<typename ProcessGroup, typename GlobalMap, typename StorageMap> 56void PBGL_DISTRIB_PMAP::prune_ghost_cells() const 57{ 58 if (data->max_ghost_cells == 0) 59 return; 60 61 while (data->ghost_cells->size() > data->max_ghost_cells) { 62 // Evict the last ghost cell 63 64 if (data->model & cm_flush) { 65 // We need to flush values when we evict them. 66 boost::parallel::detail::untracked_pair<key_type, value_type> const& victim 67 = data->ghost_cells->back(); 68 send(data->process_group, get(data->global, victim.first).first, 69 property_map_put, victim); 70 } 71 72 // Actually remove the ghost cell 73 data->ghost_cells->pop_back(); 74 } 75} 76 77template<typename ProcessGroup, typename GlobalMap, typename StorageMap> 78typename PBGL_DISTRIB_PMAP::value_type& 79PBGL_DISTRIB_PMAP::cell(const key_type& key, bool request_if_missing) const 80{ 81 // Index by key 82 ghost_cells_key_index_type const& key_index 83 = data->ghost_cells->template get<1>(); 84 85 // Search for the ghost cell by key, and project back to the sequence 86 iterator ghost_cell 87 = data->ghost_cells->template project<0>(key_index.find(key)); 88 if (ghost_cell == data->ghost_cells->end()) { 89 value_type value; 90 if (data->has_default_resolver) 91 // Since we have a default resolver, use it to create a default 92 // value for this ghost cell. 93 value = data->get_default_value(key); 94 else if (request_if_missing) 95 // Request the actual value of this key from its owner 96 send_oob_with_reply(data->process_group, get(data->global, key).first, 97 property_map_get, key, value); 98 else 99 value = value_type(); 100 101 // Create a ghost cell containing the new value 102 ghost_cell 103 = data->ghost_cells->push_front(std::make_pair(key, value)).first; 104 105 // If we need to, prune the ghost cells 106 if (data->max_ghost_cells > 0) 107 prune_ghost_cells(); 108 } else if (data->max_ghost_cells > 0) 109 // Put this cell at the beginning of the MRU list 110 data->ghost_cells->relocate(data->ghost_cells->begin(), ghost_cell); 111 112 return const_cast<value_type&>(ghost_cell->second); 113} 114 115template<typename ProcessGroup, typename GlobalMap, typename StorageMap> 116template<typename Reduce> 117void 118PBGL_DISTRIB_PMAP 119::handle_message<Reduce>::operator()(process_id_type source, int tag) 120{ 121 BOOST_ASSERT(false); 122} 123 124template<typename ProcessGroup, typename GlobalMap, typename StorageMap> 125template<typename Reduce> 126void 127PBGL_DISTRIB_PMAP::handle_message<Reduce>:: 128handle_put(int /*source*/, int /*tag*/, 129 const boost::parallel::detail::untracked_pair<key_type, value_type>& req, trigger_receive_context) 130{ 131 using boost::get; 132 133 shared_ptr<data_t> data(data_ptr); 134 135 owner_local_pair p = get(data->global, req.first); 136 BOOST_ASSERT(p.first == process_id(data->process_group)); 137 138 detail::maybe_put(data->storage, p.second, 139 reduce(req.first, 140 get(data->storage, p.second), 141 req.second)); 142} 143 144template<typename ProcessGroup, typename GlobalMap, typename StorageMap> 145template<typename Reduce> 146typename PBGL_DISTRIB_PMAP::value_type 147PBGL_DISTRIB_PMAP::handle_message<Reduce>:: 148handle_get(int source, int /*tag*/, const key_type& key, 149 trigger_receive_context) 150{ 151 using boost::get; 152 153 shared_ptr<data_t> data(data_ptr); 154 BOOST_ASSERT(data); 155 156 owner_local_pair p = get(data->global, key); 157 return get(data->storage, p.second); 158} 159 160template<typename ProcessGroup, typename GlobalMap, typename StorageMap> 161template<typename Reduce> 162void 163PBGL_DISTRIB_PMAP::handle_message<Reduce>:: 164handle_multiget(int source, int tag, const std::vector<key_type>& keys, 165 trigger_receive_context) 166{ 167 shared_ptr<data_t> data(data_ptr); 168 BOOST_ASSERT(data); 169 170 typedef boost::parallel::detail::untracked_pair<key_type, value_type> key_value; 171 std::vector<key_value> results; 172 std::size_t n = keys.size(); 173 results.reserve(n); 174 175 using boost::get; 176 177 for (std::size_t i = 0; i < n; ++i) { 178 local_key_type local_key = get(data->global, keys[i]).second; 179 results.push_back(key_value(keys[i], get(data->storage, local_key))); 180 } 181 send(data->process_group, source, property_map_multiget_reply, results); 182} 183 184template<typename ProcessGroup, typename GlobalMap, typename StorageMap> 185template<typename Reduce> 186void 187PBGL_DISTRIB_PMAP::handle_message<Reduce>:: 188handle_multiget_reply 189 (int source, int tag, 190 const std::vector<boost::parallel::detail::untracked_pair<key_type, value_type> >& msg, 191 trigger_receive_context) 192{ 193 shared_ptr<data_t> data(data_ptr); 194 BOOST_ASSERT(data); 195 196 // Index by key 197 ghost_cells_key_index_type const& key_index 198 = data->ghost_cells->template get<1>(); 199 200 std::size_t n = msg.size(); 201 for (std::size_t i = 0; i < n; ++i) { 202 // Search for the ghost cell by key, and project back to the sequence 203 iterator position 204 = data->ghost_cells->template project<0>(key_index.find(msg[i].first)); 205 206 if (position != data->ghost_cells->end()) 207 const_cast<value_type&>(position->second) = msg[i].second; 208 } 209} 210 211template<typename ProcessGroup, typename GlobalMap, typename StorageMap> 212template<typename Reduce> 213void 214PBGL_DISTRIB_PMAP::handle_message<Reduce>:: 215handle_multiput 216 (int source, int tag, 217 const std::vector<unsafe_pair<local_key_type, value_type> >& values, 218 trigger_receive_context) 219{ 220 using boost::get; 221 222 shared_ptr<data_t> data(data_ptr); 223 BOOST_ASSERT(data); 224 225 std::size_t n = values.size(); 226 for (std::size_t i = 0; i < n; ++i) { 227 local_key_type local_key = values[i].first; 228 value_type local_value = get(data->storage, local_key); 229 detail::maybe_put(data->storage, values[i].first, 230 reduce(values[i].first, 231 local_value, 232 values[i].second)); 233 } 234} 235 236template<typename ProcessGroup, typename GlobalMap, typename StorageMap> 237template<typename Reduce> 238void 239PBGL_DISTRIB_PMAP::handle_message<Reduce>:: 240setup_triggers(process_group_type& pg) 241{ 242 using boost::parallel::simple_trigger; 243 244 simple_trigger(pg, property_map_put, this, &handle_message::handle_put); 245 simple_trigger(pg, property_map_get, this, &handle_message::handle_get); 246 simple_trigger(pg, property_map_multiget, this, 247 &handle_message::handle_multiget); 248 simple_trigger(pg, property_map_multiget_reply, this, 249 &handle_message::handle_multiget_reply); 250 simple_trigger(pg, property_map_multiput, this, 251 &handle_message::handle_multiput); 252} 253 254template<typename ProcessGroup, typename GlobalMap, typename StorageMap> 255void 256PBGL_DISTRIB_PMAP 257::on_synchronize::operator()() 258{ 259 int stage=0; // we only get called at the start now 260 shared_ptr<data_t> data(data_ptr); 261 BOOST_ASSERT(data); 262 263 // Determine in which stage backward consistency messages should be sent. 264 int backward_stage = -1; 265 if (data->model & cm_backward) { 266 if (data->model & cm_flush) backward_stage = 1; 267 else backward_stage = 0; 268 } 269 270 // Flush results in first stage 271 if (stage == 0 && data->model & cm_flush) 272 data->flush(); 273 274 // Backward consistency 275 if (stage == backward_stage && !(data->model & (cm_clear | cm_reset))) 276 data->refresh_ghost_cells(); 277 278 // Optionally clear results 279 if (data->model & cm_clear) 280 data->clear(); 281 282 // Optionally reset results 283 if (data->model & cm_reset) { 284 if (data->reset) ((*data).*data->reset)(); 285 } 286} 287 288 289template<typename ProcessGroup, typename GlobalMap, typename StorageMap> 290void 291PBGL_DISTRIB_PMAP::set_consistency_model(int model) 292{ 293 data->model = model; 294 295 bool need_on_synchronize = (model != cm_forward); 296 297 // Backward consistency is a two-stage process. 298 if (model & cm_backward) { 299 // For backward consistency to work, we absolutely cannot throw 300 // away any ghost cells. 301 data->max_ghost_cells = 0; 302 } 303 304 // attach the on_synchronize handler. 305 if (need_on_synchronize) 306 data->process_group.replace_on_synchronize_handler(on_synchronize(data)); 307} 308 309template<typename ProcessGroup, typename GlobalMap, typename StorageMap> 310void 311PBGL_DISTRIB_PMAP::set_max_ghost_cells(std::size_t max_ghost_cells) 312{ 313 if ((data->model & cm_backward) && max_ghost_cells > 0) 314 boost::throw_exception(std::runtime_error("distributed_property_map::set_max_ghost_cells: " 315 "cannot limit ghost-cell usage with a backward " 316 "consistency model")); 317 318 if (max_ghost_cells == 1) 319 // It is not safe to have only 1 ghost cell; the cell() method 320 // will fail. 321 max_ghost_cells = 2; 322 323 data->max_ghost_cells = max_ghost_cells; 324 prune_ghost_cells(); 325} 326 327template<typename ProcessGroup, typename GlobalMap, typename StorageMap> 328void PBGL_DISTRIB_PMAP::clear() 329{ 330 data->clear(); 331} 332 333template<typename ProcessGroup, typename GlobalMap, typename StorageMap> 334void PBGL_DISTRIB_PMAP::data_t::clear() 335{ 336 ghost_cells->clear(); 337} 338 339template<typename ProcessGroup, typename GlobalMap, typename StorageMap> 340void PBGL_DISTRIB_PMAP::reset() 341{ 342 if (data->reset) ((*data).*data->reset)(); 343} 344 345template<typename ProcessGroup, typename GlobalMap, typename StorageMap> 346void PBGL_DISTRIB_PMAP::flush() 347{ 348 data->flush(); 349} 350 351template<typename ProcessGroup, typename GlobalMap, typename StorageMap> 352void PBGL_DISTRIB_PMAP::data_t::refresh_ghost_cells() 353{ 354 using boost::get; 355 356 std::vector<std::vector<key_type> > keys; 357 keys.resize(num_processes(process_group)); 358 359 // Collect the set of keys for which we will request values 360 for (iterator i = ghost_cells->begin(); i != ghost_cells->end(); ++i) 361 keys[get(global, i->first).first].push_back(i->first); 362 363 // Send multiget requests to each of the other processors 364 typedef typename ProcessGroup::process_size_type process_size_type; 365 process_size_type n = num_processes(process_group); 366 process_id_type id = process_id(process_group); 367 for (process_size_type p = (id + 1) % n ; p != id ; p = (p + 1) % n) { 368 if (!keys[p].empty()) 369 send(process_group, p, property_map_multiget, keys[p]); 370 } 371} 372 373template<typename ProcessGroup, typename GlobalMap, typename StorageMap> 374void PBGL_DISTRIB_PMAP::data_t::flush() 375{ 376 using boost::get; 377 378 int n = num_processes(process_group); 379 std::vector<std::vector<unsafe_pair<local_key_type, value_type> > > values; 380 values.resize(n); 381 382 // Collect all of the flushed values 383 for (iterator i = ghost_cells->begin(); i != ghost_cells->end(); ++i) { 384 std::pair<int, local_key_type> g = get(global, i->first); 385 values[g.first].push_back(std::make_pair(g.second, i->second)); 386 } 387 388 // Transmit flushed values 389 for (int p = 0; p < n; ++p) { 390 if (!values[p].empty()) 391 send(process_group, p, property_map_multiput, values[p]); 392 } 393} 394 395template<typename ProcessGroup, typename GlobalMap, typename StorageMap> 396void PBGL_DISTRIB_PMAP::do_synchronize() 397{ 398 if (data->model & cm_backward) { 399 synchronize(data->process_group); 400 return; 401 } 402 403 // Request refreshes of the values of our ghost cells 404 data->refresh_ghost_cells(); 405 406 // Allows all of the multigets to get to their destinations 407 synchronize(data->process_group); 408 409 // Allows all of the multiget responses to get to their destinations 410 synchronize(data->process_group); 411} 412 413template<typename ProcessGroup, typename GlobalMap, typename StorageMap> 414template<typename Resolver> 415void PBGL_DISTRIB_PMAP::data_t::do_reset() 416{ 417 Resolver* resolver = get_default_value.template target<Resolver>(); 418 BOOST_ASSERT(resolver); 419 420 for (iterator i = ghost_cells->begin(); i != ghost_cells->end(); ++i) 421 const_cast<value_type&>(i->second) = (*resolver)(i->first); 422} 423 424} } // end namespace boost::parallel 425