1 // Copyright (C) 2007 The Trustees of Indiana University.
2 
3 // Use, modification and distribution is subject to the Boost Software
4 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
6 
7 //  Authors: Douglas Gregor
8 //           Andrew Lumsdaine
9 
10 /**************************************************************************
11  * This source file implements the Delta-stepping algorithm:              *
12  *                                                                        *
13  *   Ulrich Meyer and Peter Sanders. Parallel Shortest Path for Arbitrary *
14  *   Graphs. In Proceedings from the 6th International Euro-Par           *
15  *   Conference on Parallel Processing, pages 461--470, 2000.             *
16  *                                                                        *
17  *   Ulrich Meyer, Peter Sanders: [Delta]-stepping: A Parallelizable      *
18  *   Shortest Path Algorithm. J. Algorithms 49(1): 114-152, 2003.         *
19  *                                                                        *
20  * There are several potential optimizations we could still perform for   *
21  * this algorithm implementation:                                         *
22  *                                                                        *
23  *   - Implement "shortcuts", which bound the number of reinsertions      *
24  *     in a single bucket (to one). The computation of shortcuts looks    *
25  *     expensive in a distributed-memory setting, but it could be         *
26  *     ammortized over many queries.                                      *
27  *                                                                        *
28  *   - The size of the "buckets" data structure can be bounded to         *
29  *     max_edge_weight/delta buckets, if we treat the buckets as a        *
30  *     circular array.                                                    *
31  *                                                                        *
32  *   - If we partition light/heavy edges ahead of time, we could improve  *
33  *     relaxation performance by only iterating over the right portion    *
34  *     of the out-edge list each time.                                    *
35  *                                                                        *
36  *   - Implement a more sophisticated algorithm for guessing "delta",     *
37  *     based on the shortcut-finding algorithm.                           *
38  **************************************************************************/
39 #ifndef BOOST_GRAPH_DELTA_STEPPING_SHORTEST_PATHS_HPP
40 #define BOOST_GRAPH_DELTA_STEPPING_SHORTEST_PATHS_HPP
41 
42 #ifndef BOOST_GRAPH_USE_MPI
43 #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
44 #endif
45 
46 #include <boost/config.hpp>
47 #include <boost/graph/graph_traits.hpp>
48 #include <boost/property_map/property_map.hpp>
49 #include <boost/graph/iteration_macros.hpp>
50 #include <limits>
51 #include <list>
52 #include <vector>
53 #include <boost/graph/parallel/container_traits.hpp>
54 #include <boost/graph/parallel/properties.hpp>
55 #include <boost/graph/distributed/detail/dijkstra_shortest_paths.hpp>
56 #include <utility> // for std::pair
57 #include <functional> // for std::logical_or
58 #include <boost/graph/parallel/algorithm.hpp> // for all_reduce
59 #include <cassert>
60 #include <algorithm> // for std::min, std::max
61 #include <boost/graph/parallel/simple_trigger.hpp>
62 
63 #ifdef PBGL_DELTA_STEPPING_DEBUG
64 #  include <iostream> // for std::cerr
65 #endif
66 
67 namespace boost { namespace graph { namespace distributed {
68 
69 template<typename Graph, typename PredecessorMap, typename DistanceMap,
70          typename EdgeWeightMap>
71 class delta_stepping_impl {
72   typedef typename graph_traits<Graph>::vertex_descriptor Vertex;
73   typedef typename graph_traits<Graph>::degree_size_type Degree;
74   typedef typename property_traits<EdgeWeightMap>::value_type Dist;
75   typedef typename boost::graph::parallel::process_group_type<Graph>::type
76     ProcessGroup;
77 
78   typedef std::list<Vertex> Bucket;
79   typedef typename Bucket::iterator BucketIterator;
80   typedef typename std::vector<Bucket*>::size_type BucketIndex;
81 
82   typedef detail::dijkstra_msg_value<DistanceMap, PredecessorMap> MessageValue;
83 
84   enum {
85     // Relax a remote vertex. The message contains a pair<Vertex,
86     // MessageValue>, the first part of which is the vertex whose
87     // tentative distance is being relaxed and the second part
88     // contains either the new distance (if there is no predecessor
89     // map) or a pair with the distance and predecessor.
90     msg_relax
91   };
92 
93 public:
94   delta_stepping_impl(const Graph& g,
95                       PredecessorMap predecessor,
96                       DistanceMap distance,
97                       EdgeWeightMap weight,
98                       Dist delta);
99 
100   delta_stepping_impl(const Graph& g,
101                       PredecessorMap predecessor,
102                       DistanceMap distance,
103                       EdgeWeightMap weight);
104 
105   void run(Vertex s);
106 
107 private:
108   // Relax the edge (u, v), creating a new best path of distance x.
109   void relax(Vertex u, Vertex v, Dist x);
110 
111   // Synchronize all of the processes, by receiving all messages that
112   // have not yet been received.
113   void synchronize();
114 
115   // Handle a relax message that contains only the target and
116   // distance. This kind of message will be received when the
117   // predecessor map is a dummy_property_map.
handle_relax_message(Vertex v,Dist x)118   void handle_relax_message(Vertex v, Dist x) { relax(v, v, x); }
119 
120   // Handle a relax message that contains the source (predecessor),
121   // target, and distance. This kind of message will be received when
122   // the predecessor map is not a dummy_property_map.
handle_relax_message(Vertex v,const std::pair<Dist,Vertex> & p)123   void handle_relax_message(Vertex v, const std::pair<Dist, Vertex>& p)
124   { relax(p.second, v, p.first); }
125 
126   // Setup triggers for msg_relax messages
127   void setup_triggers();
128 
handle_msg_relax(int,int,const std::pair<Vertex,typename MessageValue::type> & data,trigger_receive_context)129   void handle_msg_relax(int /*source*/, int /*tag*/,
130                         const std::pair<Vertex, typename MessageValue::type>& data,
131                         trigger_receive_context)
132   { handle_relax_message(data.first, data.second); }
133 
134   const Graph& g;
135   PredecessorMap predecessor;
136   DistanceMap distance;
137   EdgeWeightMap weight;
138   Dist delta;
139   ProcessGroup pg;
140   typename property_map<Graph, vertex_owner_t>::const_type owner;
141   typename property_map<Graph, vertex_local_t>::const_type local;
142 
143   // A "property map" that contains the position of each vertex in
144   // whatever bucket it resides in.
145   std::vector<BucketIterator> position_in_bucket;
146 
147   // Bucket data structure. The ith bucket contains all local vertices
148   // with (tentative) distance in the range [i*delta,
149   // (i+1)*delta).
150   std::vector<Bucket*> buckets;
151 
152   // This "dummy" list is used only so that we can initialize the
153   // position_in_bucket property map with non-singular iterators. This
154   // won't matter for most implementations of the C++ Standard
155   // Library, but it avoids undefined behavior and allows us to run
156   // with library "debug modes".
157   std::list<Vertex> dummy_list;
158 
159   // A "property map" that states which vertices have been deleted
160   // from the bucket in this iteration.
161   std::vector<bool> vertex_was_deleted;
162 };
163 
164 template<typename Graph, typename PredecessorMap, typename DistanceMap,
165          typename EdgeWeightMap>
166 delta_stepping_impl<Graph, PredecessorMap, DistanceMap, EdgeWeightMap>::
delta_stepping_impl(const Graph & g,PredecessorMap predecessor,DistanceMap distance,EdgeWeightMap weight,Dist delta)167 delta_stepping_impl(const Graph& g,
168                     PredecessorMap predecessor,
169                     DistanceMap distance,
170                     EdgeWeightMap weight,
171                     Dist delta)
172     : g(g),
173       predecessor(predecessor),
174       distance(distance),
175       weight(weight),
176       delta(delta),
177       pg(boost::graph::parallel::process_group_adl(g), attach_distributed_object()),
178       owner(get(vertex_owner, g)),
179       local(get(vertex_local, g))
180 {
181   setup_triggers();
182 }
183 
184 template<typename Graph, typename PredecessorMap, typename DistanceMap,
185          typename EdgeWeightMap>
186 delta_stepping_impl<Graph, PredecessorMap, DistanceMap, EdgeWeightMap>::
delta_stepping_impl(const Graph & g,PredecessorMap predecessor,DistanceMap distance,EdgeWeightMap weight)187 delta_stepping_impl(const Graph& g,
188                     PredecessorMap predecessor,
189                     DistanceMap distance,
190                     EdgeWeightMap weight)
191     : g(g),
192       predecessor(predecessor),
193       distance(distance),
194       weight(weight),
195       pg(boost::graph::parallel::process_group_adl(g), attach_distributed_object()),
196       owner(get(vertex_owner, g)),
197       local(get(vertex_local, g))
198 {
199   using boost::parallel::all_reduce;
200   using boost::parallel::maximum;
201   using std::max;
202 
203   // Compute the maximum edge weight and degree
204   Dist max_edge_weight = 0;
205   Degree max_degree = 0;
206   BGL_FORALL_VERTICES_T(u, g, Graph) {
207     max_degree = max BOOST_PREVENT_MACRO_SUBSTITUTION (max_degree, out_degree(u, g));
208     BGL_FORALL_OUTEDGES_T(u, e, g, Graph)
209       max_edge_weight = max BOOST_PREVENT_MACRO_SUBSTITUTION (max_edge_weight, get(weight, e));
210   }
211 
212   max_edge_weight = all_reduce(pg, max_edge_weight, maximum<Dist>());
213   max_degree = all_reduce(pg, max_degree, maximum<Degree>());
214 
215   // Take a guess at delta, based on what works well for random
216   // graphs.
217   delta = max_edge_weight / max_degree;
218   if (delta == 0)
219     delta = 1;
220 
221   setup_triggers();
222 }
223 
224 template<typename Graph, typename PredecessorMap, typename DistanceMap,
225          typename EdgeWeightMap>
226 void
227 delta_stepping_impl<Graph, PredecessorMap, DistanceMap, EdgeWeightMap>::
run(Vertex s)228 run(Vertex s)
229 {
230   Dist inf = (std::numeric_limits<Dist>::max)();
231 
232   // None of the vertices are stored in the bucket.
233   position_in_bucket.clear();
234   position_in_bucket.resize(num_vertices(g), dummy_list.end());
235 
236   // None of the vertices have been deleted
237   vertex_was_deleted.clear();
238   vertex_was_deleted.resize(num_vertices(g), false);
239 
240   // No path from s to any other vertex, yet
241   BGL_FORALL_VERTICES_T(v, g, Graph)
242     put(distance, v, inf);
243 
244   // The distance to the starting node is zero
245   if (get(owner, s) == process_id(pg))
246     // Put "s" into its bucket (bucket 0)
247     relax(s, s, 0);
248   else
249     // Note that we know the distance to s is zero
250     cache(distance, s, 0);
251 
252   BucketIndex max_bucket = (std::numeric_limits<BucketIndex>::max)();
253   BucketIndex current_bucket = 0;
254   do {
255     // Synchronize with all of the other processes.
256     synchronize();
257 
258     // Find the next bucket that has something in it.
259     while (current_bucket < buckets.size()
260            && (!buckets[current_bucket] || buckets[current_bucket]->empty()))
261       ++current_bucket;
262     if (current_bucket >= buckets.size())
263       current_bucket = max_bucket;
264 
265 #ifdef PBGL_DELTA_STEPPING_DEBUG
266     std::cerr << "#" << process_id(pg) << ": lowest bucket is #"
267               << current_bucket << std::endl;
268 #endif
269     // Find the smallest bucket (over all processes) that has vertices
270     // that need to be processed.
271     using boost::parallel::all_reduce;
272     using boost::parallel::minimum;
273     current_bucket = all_reduce(pg, current_bucket, minimum<BucketIndex>());
274 
275     if (current_bucket == max_bucket)
276       // There are no non-empty buckets in any process; exit.
277       break;
278 
279 #ifdef PBGL_DELTA_STEPPING_DEBUG
280     if (process_id(pg) == 0)
281       std::cerr << "Processing bucket #" << current_bucket << std::endl;
282 #endif
283 
284     // Contains the set of vertices that have been deleted in the
285     // relaxation of "light" edges. Note that we keep track of which
286     // vertices were deleted with the property map
287     // "vertex_was_deleted".
288     std::vector<Vertex> deleted_vertices;
289 
290     // Repeatedly relax light edges
291     bool nonempty_bucket;
292     do {
293       // Someone has work to do in this bucket.
294 
295       if (current_bucket < buckets.size() && buckets[current_bucket]) {
296         Bucket& bucket = *buckets[current_bucket];
297         // For each element in the bucket
298         while (!bucket.empty()) {
299           Vertex u = bucket.front();
300 
301 #ifdef PBGL_DELTA_STEPPING_DEBUG
302           std::cerr << "#" << process_id(pg) << ": processing vertex "
303                     << get(vertex_global, g, u).second << "@"
304                     << get(vertex_global, g, u).first
305                     << std::endl;
306 #endif
307 
308           // Remove u from the front of the bucket
309           bucket.pop_front();
310 
311           // Insert u into the set of deleted vertices, if it hasn't
312           // been done already.
313           if (!vertex_was_deleted[get(local, u)]) {
314             vertex_was_deleted[get(local, u)] = true;
315             deleted_vertices.push_back(u);
316           }
317 
318           // Relax each light edge.
319           Dist u_dist = get(distance, u);
320           BGL_FORALL_OUTEDGES_T(u, e, g, Graph)
321             if (get(weight, e) <= delta) // light edge
322               relax(u, target(e, g), u_dist + get(weight, e));
323         }
324       }
325 
326       // Synchronize with all of the other processes.
327       synchronize();
328 
329       // Is the bucket empty now?
330       nonempty_bucket = (current_bucket < buckets.size()
331                          && buckets[current_bucket]
332                          && !buckets[current_bucket]->empty());
333      } while (all_reduce(pg, nonempty_bucket, std::logical_or<bool>()));
334 
335     // Relax heavy edges for each of the vertices that we previously
336     // deleted.
337     for (typename std::vector<Vertex>::iterator iter = deleted_vertices.begin();
338          iter != deleted_vertices.end(); ++iter) {
339       // Relax each heavy edge.
340       Vertex u = *iter;
341       Dist u_dist = get(distance, u);
342       BGL_FORALL_OUTEDGES_T(u, e, g, Graph)
343         if (get(weight, e) > delta) // heavy edge
344           relax(u, target(e, g), u_dist + get(weight, e));
345     }
346 
347     // Go to the next bucket: the current bucket must already be empty.
348     ++current_bucket;
349   } while (true);
350 
351   // Delete all of the buckets.
352   for (typename std::vector<Bucket*>::iterator iter = buckets.begin();
353        iter != buckets.end(); ++iter) {
354     if (*iter) {
355       delete *iter;
356       *iter = 0;
357     }
358   }
359 }
360 
361 template<typename Graph, typename PredecessorMap, typename DistanceMap,
362          typename EdgeWeightMap>
363 void
364 delta_stepping_impl<Graph, PredecessorMap, DistanceMap, EdgeWeightMap>::
relax(Vertex u,Vertex v,Dist x)365 relax(Vertex u, Vertex v, Dist x)
366 {
367 #ifdef PBGL_DELTA_STEPPING_DEBUG
368   std::cerr << "#" << process_id(pg) << ": relax("
369             << get(vertex_global, g, u).second << "@"
370             << get(vertex_global, g, u).first << ", "
371             << get(vertex_global, g, v).second << "@"
372             << get(vertex_global, g, v).first << ", "
373             << x << ")" << std::endl;
374 #endif
375 
376   if (x < get(distance, v)) {
377     // We're relaxing the edge to vertex v.
378     if (get(owner, v) == process_id(pg)) {
379       // Compute the new bucket index for v
380       BucketIndex new_index = static_cast<BucketIndex>(x / delta);
381 
382       // Make sure there is enough room in the buckets data structure.
383       if (new_index >= buckets.size()) buckets.resize(new_index + 1, 0);
384 
385       // Make sure that we have allocated the bucket itself.
386       if (!buckets[new_index]) buckets[new_index] = new Bucket;
387 
388       if (get(distance, v) != (std::numeric_limits<Dist>::max)()
389           && !vertex_was_deleted[get(local, v)]) {
390         // We're moving v from an old bucket into a new one. Compute
391         // the old index, then splice it in.
392         BucketIndex old_index
393           = static_cast<BucketIndex>(get(distance, v) / delta);
394         buckets[new_index]->splice(buckets[new_index]->end(),
395                                    *buckets[old_index],
396                                    position_in_bucket[get(local, v)]);
397       } else {
398         // We're inserting v into a bucket for the first time. Put it
399         // at the end.
400         buckets[new_index]->push_back(v);
401       }
402 
403       // v is now at the last position in the new bucket
404       position_in_bucket[get(local, v)] = buckets[new_index]->end();
405       --position_in_bucket[get(local, v)];
406 
407       // Update predecessor and tentative distance information
408       put(predecessor, v, u);
409       put(distance, v, x);
410     } else {
411 #ifdef PBGL_DELTA_STEPPING_DEBUG
412       std::cerr << "#" << process_id(pg) << ": sending relax("
413                 << get(vertex_global, g, u).second << "@"
414                 << get(vertex_global, g, u).first << ", "
415                 << get(vertex_global, g, v).second << "@"
416                 << get(vertex_global, g, v).first << ", "
417             << x << ") to " << get(owner, v) << std::endl;
418 
419 #endif
420       // The vertex is remote: send a request to the vertex's owner
421       send(pg, get(owner, v), msg_relax,
422            std::make_pair(v, MessageValue::create(x, u)));
423 
424       // Cache tentative distance information
425       cache(distance, v, x);
426     }
427   }
428 }
429 
430 template<typename Graph, typename PredecessorMap, typename DistanceMap,
431          typename EdgeWeightMap>
432 void
433 delta_stepping_impl<Graph, PredecessorMap, DistanceMap, EdgeWeightMap>::
synchronize()434 synchronize()
435 {
436   using boost::parallel::synchronize;
437 
438   // Synchronize at the process group level.
439   synchronize(pg);
440 
441   // Receive any relaxation request messages.
442 //   typedef typename ProcessGroup::process_id_type process_id_type;
443 //   while (optional<std::pair<process_id_type, int> > stp = probe(pg)) {
444 //     // Receive the relaxation message
445 //     assert(stp->second == msg_relax);
446 //     std::pair<Vertex, typename MessageValue::type> data;
447 //     receive(pg, stp->first, stp->second, data);
448 
449 //     // Turn it into a "relax" call
450 //     handle_relax_message(data.first, data.second);
451 //   }
452 }
453 
454 template<typename Graph, typename PredecessorMap, typename DistanceMap,
455          typename EdgeWeightMap>
456 void
457 delta_stepping_impl<Graph, PredecessorMap, DistanceMap, EdgeWeightMap>::
setup_triggers()458 setup_triggers()
459 {
460   using boost::graph::parallel::simple_trigger;
461 
462   simple_trigger(pg, msg_relax, this,
463                  &delta_stepping_impl::handle_msg_relax);
464 }
465 
466 template<typename Graph, typename PredecessorMap, typename DistanceMap,
467          typename EdgeWeightMap>
468 void
delta_stepping_shortest_paths(const Graph & g,typename graph_traits<Graph>::vertex_descriptor s,PredecessorMap predecessor,DistanceMap distance,EdgeWeightMap weight,typename property_traits<EdgeWeightMap>::value_type delta)469 delta_stepping_shortest_paths
470   (const Graph& g,
471    typename graph_traits<Graph>::vertex_descriptor s,
472    PredecessorMap predecessor, DistanceMap distance, EdgeWeightMap weight,
473    typename property_traits<EdgeWeightMap>::value_type delta)
474 {
475   // The "distance" map needs to act like one, retrieving the default
476   // value of infinity.
477   set_property_map_role(vertex_distance, distance);
478 
479   // Construct the implementation object, which will perform all of
480   // the actual work.
481   delta_stepping_impl<Graph, PredecessorMap, DistanceMap, EdgeWeightMap>
482     impl(g, predecessor, distance, weight, delta);
483 
484   // Run the delta-stepping algorithm. The results will show up in
485   // "predecessor" and "weight".
486   impl.run(s);
487 }
488 
489 template<typename Graph, typename PredecessorMap, typename DistanceMap,
490          typename EdgeWeightMap>
491 void
delta_stepping_shortest_paths(const Graph & g,typename graph_traits<Graph>::vertex_descriptor s,PredecessorMap predecessor,DistanceMap distance,EdgeWeightMap weight)492 delta_stepping_shortest_paths
493   (const Graph& g,
494    typename graph_traits<Graph>::vertex_descriptor s,
495    PredecessorMap predecessor, DistanceMap distance, EdgeWeightMap weight)
496 {
497   // The "distance" map needs to act like one, retrieving the default
498   // value of infinity.
499   set_property_map_role(vertex_distance, distance);
500 
501   // Construct the implementation object, which will perform all of
502   // the actual work.
503   delta_stepping_impl<Graph, PredecessorMap, DistanceMap, EdgeWeightMap>
504     impl(g, predecessor, distance, weight);
505 
506   // Run the delta-stepping algorithm. The results will show up in
507   // "predecessor" and "weight".
508   impl.run(s);
509 }
510 
511 } } } // end namespace boost::graph::distributed
512 
513 #endif // BOOST_GRAPH_DELTA_STEPPING_SHORTEST_PATHS_HPP
514