1// Copyright (C) 2004-2006 The Trustees of Indiana University.
2
3// Use, modification and distribution is subject to the Boost Software
4// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
5// http://www.boost.org/LICENSE_1_0.txt)
6
7//  Authors: Douglas Gregor
8//           Nick Edmonds
9//           Andrew Lumsdaine
10#include <boost/assert.hpp>
11#include <boost/property_map/parallel/distributed_property_map.hpp>
12#include <boost/property_map/parallel/detail/untracked_pair.hpp>
13#include <boost/type_traits/is_base_and_derived.hpp>
14#include <boost/bind.hpp>
15#include <boost/property_map/parallel/simple_trigger.hpp>
16
17namespace boost { namespace parallel {
18
19template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
20template<typename Reduce>
21PBGL_DISTRIB_PMAP
22::distributed_property_map(const ProcessGroup& pg, const GlobalMap& global,
23                           const StorageMap& pm, const Reduce& reduce)
24  : data(new data_t(pg, global, pm, reduce, Reduce::non_default_resolver))
25{
26  typedef handle_message<Reduce> Handler;
27
28  data->ghost_cells.reset(new ghost_cells_type());
29  data->reset = &data_t::template do_reset<Reduce>;
30  data->process_group.replace_handler(Handler(data, reduce));
31  data->process_group.template get_receiver<Handler>()
32    ->setup_triggers(data->process_group);
33}
34
35template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
36PBGL_DISTRIB_PMAP::~distributed_property_map() { }
37
38template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
39template<typename Reduce>
40void
41PBGL_DISTRIB_PMAP::set_reduce(const Reduce& reduce)
42{
43  typedef handle_message<Reduce> Handler;
44  data->process_group.replace_handler(Handler(data, reduce));
45  Handler* handler = data->process_group.template get_receiver<Handler>();
46  BOOST_ASSERT(handler);
47  handler->setup_triggers(data->process_group);
48  data->get_default_value = reduce;
49  data->has_default_resolver = Reduce::non_default_resolver;
50  int model = data->model;
51  data->reset = &data_t::template do_reset<Reduce>;
52  set_consistency_model(model);
53}
54
55template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
56void PBGL_DISTRIB_PMAP::prune_ghost_cells() const
57{
58  if (data->max_ghost_cells == 0)
59    return;
60
61  while (data->ghost_cells->size() > data->max_ghost_cells) {
62    // Evict the last ghost cell
63
64    if (data->model & cm_flush) {
65      // We need to flush values when we evict them.
66      boost::parallel::detail::untracked_pair<key_type, value_type> const& victim
67        = data->ghost_cells->back();
68      send(data->process_group, get(data->global, victim.first).first,
69           property_map_put, victim);
70    }
71
72    // Actually remove the ghost cell
73    data->ghost_cells->pop_back();
74  }
75}
76
77template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
78typename PBGL_DISTRIB_PMAP::value_type&
79PBGL_DISTRIB_PMAP::cell(const key_type& key, bool request_if_missing) const
80{
81  // Index by key
82  ghost_cells_key_index_type const& key_index
83    = data->ghost_cells->template get<1>();
84
85  // Search for the ghost cell by key, and project back to the sequence
86  iterator ghost_cell
87    = data->ghost_cells->template project<0>(key_index.find(key));
88  if (ghost_cell == data->ghost_cells->end()) {
89    value_type value;
90    if (data->has_default_resolver)
91      // Since we have a default resolver, use it to create a default
92      // value for this ghost cell.
93      value = data->get_default_value(key);
94    else if (request_if_missing)
95      // Request the actual value of this key from its owner
96      send_oob_with_reply(data->process_group, get(data->global, key).first,
97                          property_map_get, key, value);
98    else
99      value = value_type();
100
101    // Create a ghost cell containing the new value
102    ghost_cell
103      = data->ghost_cells->push_front(std::make_pair(key, value)).first;
104
105    // If we need to, prune the ghost cells
106    if (data->max_ghost_cells > 0)
107      prune_ghost_cells();
108  } else if (data->max_ghost_cells > 0)
109    // Put this cell at the beginning of the MRU list
110    data->ghost_cells->relocate(data->ghost_cells->begin(), ghost_cell);
111
112  return const_cast<value_type&>(ghost_cell->second);
113}
114
115template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
116template<typename Reduce>
117void
118PBGL_DISTRIB_PMAP
119::handle_message<Reduce>::operator()(process_id_type source, int tag)
120{
121  BOOST_ASSERT(false);
122}
123
124template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
125template<typename Reduce>
126void
127PBGL_DISTRIB_PMAP::handle_message<Reduce>::
128handle_put(int /*source*/, int /*tag*/,
129           const boost::parallel::detail::untracked_pair<key_type, value_type>& req, trigger_receive_context)
130{
131  using boost::get;
132
133  shared_ptr<data_t> data(data_ptr);
134
135  owner_local_pair p = get(data->global, req.first);
136  BOOST_ASSERT(p.first == process_id(data->process_group));
137
138  detail::maybe_put(data->storage, p.second,
139                    reduce(req.first,
140                           get(data->storage, p.second),
141                           req.second));
142}
143
144template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
145template<typename Reduce>
146typename PBGL_DISTRIB_PMAP::value_type
147PBGL_DISTRIB_PMAP::handle_message<Reduce>::
148handle_get(int source, int /*tag*/, const key_type& key,
149           trigger_receive_context)
150{
151  using boost::get;
152
153  shared_ptr<data_t> data(data_ptr);
154  BOOST_ASSERT(data);
155
156  owner_local_pair p = get(data->global, key);
157  return get(data->storage, p.second);
158}
159
160template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
161template<typename Reduce>
162void
163PBGL_DISTRIB_PMAP::handle_message<Reduce>::
164handle_multiget(int source, int tag, const std::vector<key_type>& keys,
165                trigger_receive_context)
166{
167  shared_ptr<data_t> data(data_ptr);
168  BOOST_ASSERT(data);
169
170  typedef boost::parallel::detail::untracked_pair<key_type, value_type> key_value;
171  std::vector<key_value> results;
172  std::size_t n = keys.size();
173  results.reserve(n);
174
175  using boost::get;
176
177  for (std::size_t i = 0; i < n; ++i) {
178    local_key_type local_key = get(data->global, keys[i]).second;
179    results.push_back(key_value(keys[i], get(data->storage, local_key)));
180  }
181  send(data->process_group, source, property_map_multiget_reply, results);
182}
183
184template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
185template<typename Reduce>
186void
187PBGL_DISTRIB_PMAP::handle_message<Reduce>::
188handle_multiget_reply
189  (int source, int tag,
190   const std::vector<boost::parallel::detail::untracked_pair<key_type, value_type> >& msg,
191   trigger_receive_context)
192{
193  shared_ptr<data_t> data(data_ptr);
194  BOOST_ASSERT(data);
195
196  // Index by key
197  ghost_cells_key_index_type const& key_index
198    = data->ghost_cells->template get<1>();
199
200  std::size_t n = msg.size();
201  for (std::size_t i = 0; i < n; ++i) {
202    // Search for the ghost cell by key, and project back to the sequence
203    iterator position
204      = data->ghost_cells->template project<0>(key_index.find(msg[i].first));
205
206    if (position != data->ghost_cells->end())
207      const_cast<value_type&>(position->second) = msg[i].second;
208  }
209}
210
211template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
212template<typename Reduce>
213void
214PBGL_DISTRIB_PMAP::handle_message<Reduce>::
215handle_multiput
216  (int source, int tag,
217   const std::vector<unsafe_pair<local_key_type, value_type> >& values,
218   trigger_receive_context)
219{
220  using boost::get;
221
222  shared_ptr<data_t> data(data_ptr);
223  BOOST_ASSERT(data);
224
225  std::size_t n = values.size();
226  for (std::size_t i = 0; i < n; ++i) {
227    local_key_type local_key = values[i].first;
228    value_type local_value = get(data->storage, local_key);
229    detail::maybe_put(data->storage, values[i].first,
230                      reduce(values[i].first,
231                             local_value,
232                             values[i].second));
233  }
234}
235
236template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
237template<typename Reduce>
238void
239PBGL_DISTRIB_PMAP::handle_message<Reduce>::
240setup_triggers(process_group_type& pg)
241{
242  using boost::parallel::simple_trigger;
243
244  simple_trigger(pg, property_map_put, this, &handle_message::handle_put);
245  simple_trigger(pg, property_map_get, this, &handle_message::handle_get);
246  simple_trigger(pg, property_map_multiget, this,
247                 &handle_message::handle_multiget);
248  simple_trigger(pg, property_map_multiget_reply, this,
249                 &handle_message::handle_multiget_reply);
250  simple_trigger(pg, property_map_multiput, this,
251                 &handle_message::handle_multiput);
252}
253
254template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
255void
256PBGL_DISTRIB_PMAP
257::on_synchronize::operator()()
258{
259  int stage=0; // we only get called at the start now
260  shared_ptr<data_t> data(data_ptr);
261  BOOST_ASSERT(data);
262
263  // Determine in which stage backward consistency messages should be sent.
264  int backward_stage = -1;
265  if (data->model & cm_backward) {
266    if (data->model & cm_flush) backward_stage = 1;
267    else backward_stage = 0;
268  }
269
270  // Flush results in first stage
271  if (stage == 0 && data->model & cm_flush)
272    data->flush();
273
274  // Backward consistency
275  if (stage == backward_stage && !(data->model & (cm_clear | cm_reset)))
276    data->refresh_ghost_cells();
277
278  // Optionally clear results
279  if (data->model & cm_clear)
280    data->clear();
281
282  // Optionally reset results
283  if (data->model & cm_reset) {
284    if (data->reset) ((*data).*data->reset)();
285  }
286}
287
288
289template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
290void
291PBGL_DISTRIB_PMAP::set_consistency_model(int model)
292{
293  data->model = model;
294
295  bool need_on_synchronize = (model != cm_forward);
296
297  // Backward consistency is a two-stage process.
298  if (model & cm_backward) {
299    // For backward consistency to work, we absolutely cannot throw
300    // away any ghost cells.
301    data->max_ghost_cells = 0;
302  }
303
304  // attach the on_synchronize handler.
305  if (need_on_synchronize)
306    data->process_group.replace_on_synchronize_handler(on_synchronize(data));
307}
308
309template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
310void
311PBGL_DISTRIB_PMAP::set_max_ghost_cells(std::size_t max_ghost_cells)
312{
313  if ((data->model & cm_backward) && max_ghost_cells > 0)
314      boost::throw_exception(std::runtime_error("distributed_property_map::set_max_ghost_cells: "
315                                                "cannot limit ghost-cell usage with a backward "
316                                                "consistency model"));
317
318  if (max_ghost_cells == 1)
319    // It is not safe to have only 1 ghost cell; the cell() method
320    // will fail.
321    max_ghost_cells = 2;
322
323  data->max_ghost_cells = max_ghost_cells;
324  prune_ghost_cells();
325}
326
327template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
328void PBGL_DISTRIB_PMAP::clear()
329{
330  data->clear();
331}
332
333template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
334void PBGL_DISTRIB_PMAP::data_t::clear()
335{
336  ghost_cells->clear();
337}
338
339template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
340void PBGL_DISTRIB_PMAP::reset()
341{
342  if (data->reset) ((*data).*data->reset)();
343}
344
345template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
346void PBGL_DISTRIB_PMAP::flush()
347{
348  data->flush();
349}
350
351template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
352void PBGL_DISTRIB_PMAP::data_t::refresh_ghost_cells()
353{
354  using boost::get;
355
356  std::vector<std::vector<key_type> > keys;
357  keys.resize(num_processes(process_group));
358
359  // Collect the set of keys for which we will request values
360  for (iterator i = ghost_cells->begin(); i != ghost_cells->end(); ++i)
361    keys[get(global, i->first).first].push_back(i->first);
362
363  // Send multiget requests to each of the other processors
364  typedef typename ProcessGroup::process_size_type process_size_type;
365  process_size_type n = num_processes(process_group);
366  process_id_type id = process_id(process_group);
367  for (process_size_type p = (id + 1) % n ; p != id ; p = (p + 1) % n) {
368    if (!keys[p].empty())
369      send(process_group, p, property_map_multiget, keys[p]);
370  }
371}
372
373template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
374void PBGL_DISTRIB_PMAP::data_t::flush()
375{
376  using boost::get;
377
378  int n = num_processes(process_group);
379  std::vector<std::vector<unsafe_pair<local_key_type, value_type> > > values;
380  values.resize(n);
381
382  // Collect all of the flushed values
383  for (iterator i = ghost_cells->begin(); i != ghost_cells->end(); ++i) {
384    std::pair<int, local_key_type> g = get(global, i->first);
385    values[g.first].push_back(std::make_pair(g.second, i->second));
386  }
387
388  // Transmit flushed values
389  for (int p = 0; p < n; ++p) {
390    if (!values[p].empty())
391      send(process_group, p, property_map_multiput, values[p]);
392  }
393}
394
395template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
396void PBGL_DISTRIB_PMAP::do_synchronize()
397{
398  if (data->model & cm_backward) {
399    synchronize(data->process_group);
400    return;
401  }
402
403  // Request refreshes of the values of our ghost cells
404  data->refresh_ghost_cells();
405
406  // Allows all of the multigets to get to their destinations
407  synchronize(data->process_group);
408
409  // Allows all of the multiget responses to get to their destinations
410  synchronize(data->process_group);
411}
412
413template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
414template<typename Resolver>
415void PBGL_DISTRIB_PMAP::data_t::do_reset()
416{
417  Resolver* resolver = get_default_value.template target<Resolver>();
418  BOOST_ASSERT(resolver);
419
420  for (iterator i = ghost_cells->begin(); i != ghost_cells->end(); ++i)
421    const_cast<value_type&>(i->second) = (*resolver)(i->first);
422}
423
424} } // end namespace boost::parallel
425