1 // Copyright (C) 2004-2006 The Trustees of Indiana University.
2 
3 // Use, modification and distribution is subject to the Boost Software
4 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
6 
7 //  Authors: Brian Barrett
8 //           Douglas Gregor
9 //           Andrew Lumsdaine
10 #ifndef BOOST_GRAPH_PARALLEL_CC_PS_HPP
11 #define BOOST_GRAPH_PARALLEL_CC_PS_HPP
12 
13 #ifndef BOOST_GRAPH_USE_MPI
14 #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
15 #endif
16 
17 #include <boost/assert.hpp>
18 #include <boost/property_map/property_map.hpp>
19 #include <boost/graph/parallel/algorithm.hpp>
20 #include <boost/pending/indirect_cmp.hpp>
21 #include <boost/graph/graph_traits.hpp>
22 #include <boost/graph/overloading.hpp>
23 #include <boost/graph/distributed/concepts.hpp>
24 #include <boost/graph/parallel/properties.hpp>
25 #include <boost/graph/parallel/process_group.hpp>
26 #include <boost/optional.hpp>
27 #include <algorithm>
28 #include <vector>
29 #include <queue>
30 #include <limits>
31 #include <map>
32 #include <boost/graph/parallel/container_traits.hpp>
33 #include <boost/graph/iteration_macros.hpp>
34 
35 
36 // Connected components algorithm based on a parallel search.
37 //
38 // Every N nodes starts a parallel search from the first vertex in
39 // their local vertex list during the first superstep (the other nodes
40 // remain idle during the first superstep to reduce the number of
41 // conflicts in numbering the components).  At each superstep, all new
42 // component mappings from remote nodes are handled.  If there is no
43 // work from remote updates, a new vertex is removed from the local
44 // list and added to the work queue.
45 //
46 // Components are allocated from the component_value_allocator object,
47 // which ensures that a given component number is unique in the
48 // system, currently by using the rank and number of processes to
49 // stride allocations.
50 //
51 // When two components are discovered to actually be the same
52 // component, a mapping is created in the collisions object.  The
53 // lower component number is prefered in the resolution, so component
54 // numbering resolution is consistent.  After the search has exhausted
55 // all vertices in the graph, the mapping is shared with all
56 // processes, and they independently resolve the comonent mapping (so
57 // O((N * NP) + (V * NP)) work, in O(N + V) time, where N is the
58 // number of mappings and V is the number of local vertices).  This
59 // phase can likely be significantly sped up if a clever algorithm for
60 // the reduction can be found.
61 namespace boost { namespace graph { namespace distributed {
62   namespace cc_ps_detail {
63     // Local object for allocating component numbers.  There are two
64     // places this happens in the code, and I was getting sick of them
65     // getting out of sync.  Components are not tightly packed in
66     // numbering, but are numbered to ensure each rank has its own
67     // independent sets of numberings.
68     template<typename component_value_type>
69     class component_value_allocator {
70     public:
component_value_allocator(int num,int size)71       component_value_allocator(int num, int size) :
72         last(0), num(num), size(size)
73       {
74       }
75 
allocate(void)76       component_value_type allocate(void)
77       {
78         component_value_type ret = num + (last * size);
79         last++;
80         return ret;
81       }
82 
83     private:
84       component_value_type last;
85       int num;
86       int size;
87     };
88 
89 
90     // Map of the "collisions" between component names in the global
91     // component mapping.  TO make cleanup easier, component numbers
92     // are added, pointing to themselves, when a new component is
93     // found.  In order to make the results deterministic, the lower
94     // component number is always taken.  The resolver will drill
95     // through the map until it finds a component entry that points to
96     // itself as the next value, allowing some cleanup to happen at
97     // update() time.  Attempts are also made to update the mapping
98     // when new entries are created.
99     //
100     // Note that there's an assumption that the entire mapping is
101     // shared during the end of the algorithm, but before component
102     // name resolution.
103     template<typename component_value_type>
104     class collision_map {
105     public:
collision_map()106       collision_map() : num_unique(0)
107       {
108       }
109 
110       // add new component mapping first time component is used.  Own
111       // function only so that we can sanity check there isn't already
112       // a mapping for that component number (which would be bad)
add(const component_value_type & a)113       void add(const component_value_type &a)
114       {
115         BOOST_ASSERT(collisions.count(a) == 0);
116         collisions[a] = a;
117       }
118 
119       // add a mapping between component values saying they're the
120       // same component
add(const component_value_type & a,const component_value_type & b)121       void add(const component_value_type &a, const component_value_type &b)
122       {
123         component_value_type high, low, tmp;
124         if (a > b) {
125           high = a;
126           low = b;
127         } else {
128           high = b;
129           low = a;
130         }
131 
132         if (collisions.count(high) != 0 && collisions[high] != low) {
133           tmp = collisions[high];
134           if (tmp > low) {
135             collisions[tmp] = low;
136             collisions[high] = low;
137           } else {
138             collisions[low] = tmp;
139             collisions[high] = tmp;
140           }
141         } else {
142           collisions[high] = low;
143         }
144 
145       }
146 
147       // get the "real" component number for the given component.
148       // Used to resolve mapping at end of run.
update(component_value_type a)149       component_value_type update(component_value_type a)
150       {
151         BOOST_ASSERT(num_unique > 0);
152         BOOST_ASSERT(collisions.count(a) != 0);
153         return collisions[a];
154       }
155 
156       // collapse the collisions tree, so that update is a one lookup
157       // operation.  Count unique components at the same time.
uniqify(void)158       void uniqify(void)
159       {
160         typename std::map<component_value_type, component_value_type>::iterator i, end;
161 
162         end = collisions.end();
163         for (i = collisions.begin() ; i != end ; ++i) {
164           if (i->first == i->second) {
165             num_unique++;
166           } else {
167             i->second = collisions[i->second];
168           }
169         }
170       }
171 
172       // get the number of component entries that have an associated
173       // component number of themselves, which are the real components
174       // used in the final mapping.  This is the number of unique
175       // components in the graph.
unique(void)176       int unique(void)
177       {
178         BOOST_ASSERT(num_unique > 0);
179         return num_unique;
180       }
181 
182       // "serialize" into a vector for communication.
serialize(void)183       std::vector<component_value_type> serialize(void)
184       {
185         std::vector<component_value_type> ret;
186         typename std::map<component_value_type, component_value_type>::iterator i, end;
187 
188         end = collisions.end();
189         for (i = collisions.begin() ; i != end ; ++i) {
190           ret.push_back(i->first);
191           ret.push_back(i->second);
192         }
193 
194         return ret;
195       }
196 
197     private:
198       std::map<component_value_type, component_value_type> collisions;
199       int num_unique;
200     };
201 
202 
203     // resolver to handle remote updates.  The resolver will add
204     // entries into the collisions map if required, and if it is the
205     // first time the vertex has been touched, it will add the vertex
206     // to the remote queue.  Note that local updates are handled
207     // differently, in the main loop (below).
208 
209       // BWB - FIX ME - don't need graph anymore - can pull from key value of Component Map.
210     template<typename ComponentMap, typename work_queue>
211     struct update_reducer {
212       BOOST_STATIC_CONSTANT(bool, non_default_resolver = false);
213 
214       typedef typename property_traits<ComponentMap>::value_type component_value_type;
215       typedef typename property_traits<ComponentMap>::key_type vertex_descriptor;
216 
update_reducerboost::graph::distributed::cc_ps_detail::update_reducer217       update_reducer(work_queue *q,
218                      cc_ps_detail::collision_map<component_value_type> *collisions,
219                      processor_id_type pg_id) :
220         q(q), collisions(collisions), pg_id(pg_id)
221       {
222       }
223 
224       // ghost cell initialization routine.  This should never be
225       // called in this imlementation.
226       template<typename K>
operator ()boost::graph::distributed::cc_ps_detail::update_reducer227       component_value_type operator()(const K&) const
228       {
229         return component_value_type(0);
230       }
231 
232       // resolver for remote updates.  I'm not entirely sure why, but
233       // I decided to not change the value of the vertex if it's
234       // already non-infinite.  It doesn't matter in the end, as we'll
235       // touch every vertex in the cleanup phase anyway.  If the
236       // component is currently infinite, set to the new component
237       // number and add the vertex to the work queue.  If it's not
238       // infinite, we've touched it already so don't add it to the
239       // work queue.  Do add a collision entry so that we know the two
240       // components are the same.
operator ()boost::graph::distributed::cc_ps_detail::update_reducer241       component_value_type operator()(const vertex_descriptor &v,
242                                       const component_value_type& current,
243                                       const component_value_type& update) const
244       {
245         const component_value_type max = (std::numeric_limits<component_value_type>::max)();
246         component_value_type ret = current;
247 
248         if (max == current) {
249           q->push(v);
250           ret = update;
251         } else if (current != update) {
252           collisions->add(current, update);
253         }
254 
255         return ret;
256       }
257 
258       // So for whatever reason, the property map can in theory call
259       // the resolver with a local descriptor in addition to the
260       // standard global descriptor.  As far as I can tell, this code
261       // path is never taken in this implementation, but I need to
262       // have this code here to make it compile.  We just make a
263       // global descriptor and call the "real" operator().
264       template<typename K>
operator ()boost::graph::distributed::cc_ps_detail::update_reducer265       component_value_type operator()(const K& v,
266                                       const component_value_type& current,
267                                       const component_value_type& update) const
268       {
269           return (*this)(vertex_descriptor(pg_id, v), current, update);
270       }
271 
272     private:
273       work_queue *q;
274       collision_map<component_value_type> *collisions;
275       boost::processor_id_type pg_id;
276     };
277 
278   } // namespace cc_ps_detail
279 
280 
281   template<typename Graph, typename ComponentMap>
282   typename property_traits<ComponentMap>::value_type
connected_components_ps(const Graph & g,ComponentMap c)283   connected_components_ps(const Graph& g, ComponentMap c)
284   {
285     using boost::graph::parallel::process_group;
286 
287     typedef typename property_traits<ComponentMap>::value_type component_value_type;
288     typedef typename graph_traits<Graph>::vertex_iterator vertex_iterator;
289     typedef typename graph_traits<Graph>::vertex_descriptor vertex_descriptor;
290     typedef typename boost::graph::parallel::process_group_type<Graph>
291       ::type process_group_type;
292     typedef typename process_group_type::process_id_type process_id_type;
293     typedef std::queue<vertex_descriptor> work_queue;
294 
295     static const component_value_type max_component =
296       (std::numeric_limits<component_value_type>::max)();
297     typename property_map<Graph, vertex_owner_t>::const_type
298       owner = get(vertex_owner, g);
299 
300     // standard who am i? stuff
301     process_group_type pg = process_group(g);
302     process_id_type id = process_id(pg);
303 
304     // Initialize every vertex to have infinite component number
305     BGL_FORALL_VERTICES_T(v, g, Graph) put(c, v, max_component);
306 
307     vertex_iterator current, end;
308     boost::tie(current, end) = vertices(g);
309 
310     cc_ps_detail::component_value_allocator<component_value_type> cva(process_id(pg), num_processes(pg));
311     cc_ps_detail::collision_map<component_value_type> collisions;
312     work_queue q;  // this is intentionally a local data structure
313     c.set_reduce(cc_ps_detail::update_reducer<ComponentMap, work_queue>(&q, &collisions, id));
314 
315     // add starting work
316     while (true) {
317         bool useful_found = false;
318         component_value_type val = cva.allocate();
319         put(c, *current, val);
320         collisions.add(val);
321         q.push(*current);
322         if (0 != out_degree(*current, g)) useful_found = true;
323         ++current;
324         if (useful_found) break;
325     }
326 
327     // Run the loop until everyone in the system is done
328     bool global_done = false;
329     while (!global_done) {
330 
331       // drain queue of work for this superstep
332       while (!q.empty()) {
333         vertex_descriptor v = q.front();
334         q.pop();
335         // iterate through outedges of the vertex currently being
336         // examined, setting their component to our component.  There
337         // is no way to end up in the queue without having a component
338         // number already.
339 
340         BGL_FORALL_ADJ_T(v, peer, g, Graph) {
341           component_value_type my_component = get(c, v);
342 
343           // update other vertex with our component information.
344           // Resolver will handle remote collisions as well as whether
345           // to put the vertex on the work queue or not.  We have to
346           // handle local collisions and work queue management
347           if (id == get(owner, peer)) {
348             if (max_component == get(c, peer)) {
349               put(c, peer, my_component);
350               q.push(peer);
351             } else if (my_component != get(c, peer)) {
352               collisions.add(my_component, get(c, peer));
353             }
354           } else {
355             put(c, peer, my_component);
356           }
357         }
358       }
359 
360       // synchronize / start a new superstep.
361       synchronize(pg);
362       global_done = all_reduce(pg, (q.empty() && (current == end)), boost::parallel::minimum<bool>());
363 
364       // If the queue is currently empty, add something to do to start
365       // the current superstep (supersteps start at the sync, not at
366       // the top of the while loop as one might expect).  Down at the
367       // bottom of the while loop so that not everyone starts the
368       // algorithm with something to do, to try to reduce component
369       // name conflicts
370       if (q.empty()) {
371         bool useful_found = false;
372         for ( ; current != end && !useful_found ; ++current) {
373           if (max_component == get(c, *current)) {
374             component_value_type val = cva.allocate();
375             put(c, *current, val);
376             collisions.add(val);
377             q.push(*current);
378             if (0 != out_degree(*current, g)) useful_found = true;
379           }
380         }
381       }
382     }
383 
384     // share component mappings
385     std::vector<component_value_type> global;
386     std::vector<component_value_type> mine = collisions.serialize();
387     all_gather(pg, mine.begin(), mine.end(), global);
388     for (size_t i = 0 ; i < global.size() ; i += 2) {
389       collisions.add(global[i], global[i + 1]);
390     }
391     collisions.uniqify();
392 
393     // update the component mappings
394     BGL_FORALL_VERTICES_T(v, g, Graph) {
395       put(c, v, collisions.update(get(c, v)));
396     }
397 
398     return collisions.unique();
399   }
400 
401 } // end namespace distributed
402 
403 } // end namespace graph
404 
405 } // end namespace boost
406 
407 #endif // BOOST_GRAPH_PARALLEL_CC_HPP
408