1 // Copyright (C) 2005-2006 The Trustees of Indiana University.
2 
3 // Use, modification and distribution is subject to the Boost Software
4 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
6 
7 //  Authors: Douglas Gregor
8 //           Andrew Lumsdaine
9 
10 //
11 // Implements redistribution of vertices for a distributed adjacency
12 // list. This file should not be included by users. It will be
13 // included by the distributed adjacency list header.
14 //
15 
16 #ifndef BOOST_GRAPH_USE_MPI
17 #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
18 #endif
19 
20 #include <boost/pending/container_traits.hpp>
21 
22 namespace boost { namespace detail { namespace parallel {
23 
24 /* This structure contains a (vertex or edge) descriptor that is being
25    moved from one processor to another. It contains the properties for
26    that descriptor (if any).
27  */
28 template<typename Descriptor, typename DescriptorProperty>
29 struct redistributed_descriptor : maybe_store_property<DescriptorProperty>
30 {
31   typedef maybe_store_property<DescriptorProperty> inherited;
32 
33   redistributed_descriptor() { }
34 
35   redistributed_descriptor(const Descriptor& v, const DescriptorProperty& p)
36     : inherited(p), descriptor(v) { }
37 
38   Descriptor descriptor;
39 
40 private:
41   friend class boost::serialization::access;
42 
43   template<typename Archiver>
44   void serialize(Archiver& ar, unsigned int /*version*/)
45   {
46     ar & boost::serialization::base_object<inherited>(*this)
47        & unsafe_serialize(descriptor);
48   }
49 };
50 
51 /* Predicate that returns true if the target has migrated. */
52 template<typename VertexProcessorMap, typename Graph>
53 struct target_migrated_t
54 {
55   typedef typename graph_traits<Graph>::vertex_descriptor Vertex;
56   typedef typename graph_traits<Graph>::edge_descriptor Edge;
57 
58   target_migrated_t(VertexProcessorMap vertex_to_processor, const Graph& g)
59     : vertex_to_processor(vertex_to_processor), g(g) { }
60 
61   bool operator()(Edge e) const
62   {
dbwrap_watch_rec_parse(TDB_DATA data,uint8_t ** pwatchers,size_t * pnum_watchers,TDB_DATA * pdata)63     typedef global_descriptor<Vertex> DVertex;
64     processor_id_type owner = get(edge_target_processor_id, g, e);
65     return get(vertex_to_processor, DVertex(owner, target(e, g))) != owner;
66   }
67 
68 private:
69   VertexProcessorMap vertex_to_processor;
70   const Graph& g;
71 };
72 
73 template<typename VertexProcessorMap, typename Graph>
74 inline target_migrated_t<VertexProcessorMap, Graph>
75 target_migrated(VertexProcessorMap vertex_to_processor, const Graph& g)
76 { return target_migrated_t<VertexProcessorMap, Graph>(vertex_to_processor, g); }
77 
78 /* Predicate that returns true if the source of an in-edge has migrated. */
79 template<typename VertexProcessorMap, typename Graph>
80 struct source_migrated_t
81 {
82   typedef typename graph_traits<Graph>::vertex_descriptor Vertex;
83   typedef typename graph_traits<Graph>::edge_descriptor Edge;
84 
85   source_migrated_t(VertexProcessorMap vertex_to_processor, const Graph& g)
86     : vertex_to_processor(vertex_to_processor), g(g) { }
87 
88   bool operator()(stored_in_edge<Edge> e) const
89   {
90     return get(vertex_to_processor, DVertex(e.source_processor, source(e.e, g)))
91       != e.source_processor;
92   }
93 
94 private:
95   VertexProcessorMap vertex_to_processor;
96   const Graph& g;
97 };
98 
99 template<typename VertexProcessorMap, typename Graph>
100 inline source_migrated_t<VertexProcessorMap, Graph>
101 source_migrated(VertexProcessorMap vertex_to_processor, const Graph& g)
102 { return source_migrated_t<VertexProcessorMap, Graph>(vertex_to_processor, g); }
103 
104 /* Predicate that returns true if the target has migrated. */
105 template<typename VertexProcessorMap, typename Graph>
106 struct source_or_target_migrated_t
107 {
108   typedef typename graph_traits<Graph>::edge_descriptor Edge;
109 
110   source_or_target_migrated_t(VertexProcessorMap vertex_to_processor,
111                               const Graph& g)
112     : vertex_to_processor(vertex_to_processor), g(g) { }
113 
114   bool operator()(Edge e) const
115   {
116     return get(vertex_to_processor, source(e, g)) != source(e, g).owner
dbwrap_watcher_get(struct dbwrap_watcher * w,const uint8_t buf[DBWRAP_WATCHER_BUF_LENGTH])117       || get(vertex_to_processor, target(e, g)) != target(e, g).owner;
118   }
119 
120 private:
121   VertexProcessorMap vertex_to_processor;
122   const Graph& g;
123 };
dbwrap_watcher_put(uint8_t buf[DBWRAP_WATCHER_BUF_LENGTH],const struct dbwrap_watcher * w)124 
125 template<typename VertexProcessorMap, typename Graph>
126 inline source_or_target_migrated_t<VertexProcessorMap, Graph>
127 source_or_target_migrated(VertexProcessorMap vertex_to_processor,
128 const Graph& g)
129 {
130   typedef source_or_target_migrated_t<VertexProcessorMap, Graph> result_type;
131   return result_type(vertex_to_processor, g);
132 }
133 
134 } } // end of namespace detail::parallel
135 
136 template<PBGL_DISTRIB_ADJLIST_TEMPLATE_PARMS>
137 template<typename VertexProcessorMap>
138 void
139 PBGL_DISTRIB_ADJLIST_TYPE
140 ::request_in_neighbors(vertex_descriptor v,
141                        VertexProcessorMap vertex_to_processor,
142                        bidirectionalS)
143 {
144   BGL_FORALL_INEDGES_T(v, e, *this, graph_type)
145     request(vertex_to_processor, source(e, *this));
146 }
147 
148 template<PBGL_DISTRIB_ADJLIST_TEMPLATE_PARMS>
149 template<typename VertexProcessorMap>
150 void
151 PBGL_DISTRIB_ADJLIST_TYPE
152 ::remove_migrated_in_edges(vertex_descriptor v,
153                            VertexProcessorMap vertex_to_processor,
154                            bidirectionalS)
155 {
156   graph_detail::erase_if(get(vertex_in_edges, base())[v.local],
157                          source_migrated(vertex_to_processor, base()));
158 }
159 
160 template<PBGL_DISTRIB_ADJLIST_TEMPLATE_PARMS>
161 template<typename VertexProcessorMap>
dbwrap_watched_fetch_locked(struct db_context * db,TALLOC_CTX * mem_ctx,TDB_DATA key)162 void
163 PBGL_DISTRIB_ADJLIST_TYPE
164 ::redistribute(VertexProcessorMap vertex_to_processor)
165 {
166   using boost::parallel::inplace_all_to_all;
167 
168   // When we have stable descriptors, we only move those descriptors
169   // that actually need to be moved. Otherwise, we essentially have to
170   // regenerate the entire graph.
171   const bool has_stable_descriptors =
172     is_same<typename config_type::vertex_list_selector, listS>::value
173     || is_same<typename config_type::vertex_list_selector, setS>::value
174     || is_same<typename config_type::vertex_list_selector, multisetS>::value;
175 
176   typedef detail::parallel::redistributed_descriptor<vertex_descriptor,
177                                                      vertex_property_type>
178     redistributed_vertex;
179   typedef detail::parallel::redistributed_descriptor<edge_descriptor,
180                                                      edge_property_type>
181     redistributed_edge;
182 
183   vertex_iterator vi, vi_end;
184   edge_iterator ei, ei_end;
185 
186   process_group_type pg = process_group();
187 
188   // Initial synchronization makes sure that we have all of our ducks
189   // in a row. We don't want any outstanding add/remove messages
190   // coming in mid-redistribution!
191   synchronize(process_group_);
192 
193   // We cannot cope with eviction of ghost cells
194   vertex_to_processor.set_max_ghost_cells(0);
195 
196   process_id_type p = num_processes(pg);
197 
198   // Send vertices and edges to the processor where they will
199   // actually reside.  This requires O(|V| + |E|) communication
200   std::vector<std::vector<redistributed_vertex> > redistributed_vertices(p);
201   std::vector<std::vector<redistributed_edge> > redistributed_edges(p);
202 
203   // Build the sets of relocated vertices for each process and then do
204   // an all-to-all transfer.
205   for (boost::tie(vi, vi_end) = vertices(*this); vi != vi_end; ++vi) {
206     if (!has_stable_descriptors
207         || get(vertex_to_processor, *vi) != vi->owner) {
208       redistributed_vertices[get(vertex_to_processor, *vi)]
209         .push_back(redistributed_vertex(*vi, get(vertex_all_t(), base(),
210                                                  vi->local)));
211     }
212 
213     // When our descriptors are stable, we need to determine which
214     // adjacent descriptors are stable to determine which edges will
215     // be removed.
216     if (has_stable_descriptors) {
217       BGL_FORALL_OUTEDGES_T(*vi, e, *this, graph_type)
218         request(vertex_to_processor, target(e, *this));
219       request_in_neighbors(*vi, vertex_to_processor, directed_selector());
220     }
221   }
222 
223   inplace_all_to_all(pg, redistributed_vertices);
224 
225   // If we have stable descriptors, we need to know where our neighbor
226   // vertices are moving.
227   if (has_stable_descriptors)
228     synchronize(vertex_to_processor);
229 
230   // Build the sets of relocated edges for each process and then do
231   // an all-to-all transfer.
232   for (boost::tie(ei, ei_end) = edges(*this); ei != ei_end; ++ei) {
233     vertex_descriptor src = source(*ei, *this);
234     vertex_descriptor tgt = target(*ei, *this);
235     if (!has_stable_descriptors
236         || get(vertex_to_processor, src) != src.owner
237         || get(vertex_to_processor, tgt) != tgt.owner)
238       redistributed_edges[get(vertex_to_processor, source(*ei, *this))]
239         .push_back(redistributed_edge(*ei, split_edge_property(get(edge_all_t(), base(),
240                                                                    ei->local))));
241   }
242   inplace_all_to_all(pg, redistributed_edges);
243 
244   // A mapping from old vertex descriptors to new vertex
245   // descriptors. This is an STL map partly because I'm too lazy to
246   // build a real property map (which is hard in the general case) but
247   // also because it won't try to look in the graph itself, because
248   // the keys are all vertex descriptors that have been invalidated.
249   std::map<vertex_descriptor, vertex_descriptor> old_to_new_vertex_map;
250 
251   if (has_stable_descriptors) {
252     // Clear out all vertices and edges that will have moved. There
253     // are several stages to this.
254 
255     // First, eliminate all outgoing edges from the (local) vertices
256     // that have been moved or whose targets have been moved.
257     BGL_FORALL_VERTICES_T(v, *this, graph_type) {
258       if (get(vertex_to_processor, v) != v.owner) {
259         clear_out_edges(v.local, base());
260         clear_in_edges_local(v, directed_selector());
261       } else {
262         remove_out_edge_if(v.local,
263                            target_migrated(vertex_to_processor, base()),
264                            base());
265         remove_migrated_in_edges(v, vertex_to_processor, directed_selector());
266       }
267     }
268 
269     // Next, eliminate locally-stored edges that have migrated (for
270     // undirected graphs).
271     graph_detail::erase_if(local_edges_,
272                            source_or_target_migrated(vertex_to_processor, *this));
273 
274     // Eliminate vertices that have migrated
275     for (boost::tie(vi, vi_end) = vertices(*this); vi != vi_end; /* in loop */) {
276       if (get(vertex_to_processor, *vi) != vi->owner)
277         remove_vertex((*vi++).local, base());
278       else {
279         // Add the identity relation for vertices that have not migrated
280         old_to_new_vertex_map[*vi] = *vi;
281         ++vi;
282       }
283     }
284   } else {
285     // Clear out the local graph: the entire graph is in transit
286     clear();
287   }
288 
289   // Add the new vertices to the graph. When we do so, update the old
290   // -> new vertex mapping both locally and for the owner of the "old"
291   // vertex.
292   {
293     typedef std::pair<vertex_descriptor, vertex_descriptor> mapping_pair;
294     std::vector<std::vector<mapping_pair> > mappings(p);
295 
296     for (process_id_type src = 0; src < p; ++src) {
297       for (typename std::vector<redistributed_vertex>::iterator vi =
298              redistributed_vertices[src].begin();
299            vi != redistributed_vertices[src].end(); ++vi) {
300         vertex_descriptor new_vertex =
301             add_vertex(vi->get_property(), *this);
302         old_to_new_vertex_map[vi->descriptor] = new_vertex;
303         mappings[vi->descriptor.owner].push_back(mapping_pair(vi->descriptor,
304                                                               new_vertex));
305       }
306 
307       redistributed_vertices[src].clear();
308     }
309 
310     inplace_all_to_all(pg, mappings);
311 
312     // Add the mappings we were sent into the old->new map.
313     for (process_id_type src = 0; src < p; ++src)
314       old_to_new_vertex_map.insert(mappings[src].begin(), mappings[src].end());
315   }
316 
317   // Get old->new vertex mappings for all of the vertices we need to
318   // know about.
319 
320   // TBD: An optimization here might involve sending the
321   // request-response pairs without an explicit request step (for
322   // bidirectional and undirected graphs). However, it may not matter
323   // all that much given the cost of redistribution.
324   {
325     std::vector<std::vector<vertex_descriptor> > vertex_map_requests(p);
326     std::vector<std::vector<vertex_descriptor> > vertex_map_responses(p);
327 
328     // We need to know about all of the vertices incident on edges
329     // that have been relocated to this processor. Tell each processor
330     // what each other processor needs to know.
331     for (process_id_type src = 0; src < p; ++src)
332       for (typename std::vector<redistributed_edge>::iterator ei =
333              redistributed_edges[src].begin();
334            ei != redistributed_edges[src].end(); ++ei) {
335         vertex_descriptor need_vertex = target(ei->descriptor, *this);
336         if (old_to_new_vertex_map.find(need_vertex)
337             == old_to_new_vertex_map.end())
338           {
339             old_to_new_vertex_map[need_vertex] = need_vertex;
340             vertex_map_requests[need_vertex.owner].push_back(need_vertex);
341           }
342       }
343     inplace_all_to_all(pg,
344                        vertex_map_requests,
345                        vertex_map_responses);
346 
347     // Process the requests made for vertices we own. Then perform yet
348     // another all-to-all swap. This one matches the requests we've
349     // made to the responses we were given.
350     for (process_id_type src = 0; src < p; ++src)
351       for (typename std::vector<vertex_descriptor>::iterator vi =
352              vertex_map_responses[src].begin();
353            vi != vertex_map_responses[src].end(); ++vi)
354         *vi = old_to_new_vertex_map[*vi];
355     inplace_all_to_all(pg, vertex_map_responses);
356 
357     // Matching the requests to the responses, update the old->new
358     // vertex map for all of the vertices we will need to know.
359     for (process_id_type src = 0; src < p; ++src) {
360       typedef typename std::vector<vertex_descriptor>::size_type size_type;
361       for (size_type i = 0; i < vertex_map_requests[src].size(); ++i) {
362         old_to_new_vertex_map[vertex_map_requests[src][i]] =
363           vertex_map_responses[src][i];
364       }
365     }
366   }
367 
368   // Add edges to the graph by mapping the source and target.
369   for (process_id_type src = 0; src < p; ++src) {
370     for (typename std::vector<redistributed_edge>::iterator ei =
371            redistributed_edges[src].begin();
372          ei != redistributed_edges[src].end(); ++ei) {
373       add_edge(old_to_new_vertex_map[source(ei->descriptor, *this)],
374                old_to_new_vertex_map[target(ei->descriptor, *this)],
375                ei->get_property(),
376                *this);
377     }
378 
379     redistributed_edges[src].clear();
380   }
381 
382   // Be sure that edge-addition messages are received now, completing
383   // the graph.
384   synchronize(process_group_);
385 
386   this->distribution().clear();
387 
388   detail::parallel::maybe_initialize_vertex_indices(vertices(base()),
389                                                     get(vertex_index, base()));
390 }
391 
dbwrap_watched_subrec_wakeup_fn(struct db_record * rec,TDB_DATA value,void * private_data)392 } // end namespace boost
393