1 // Copyright Daniel Wallin 2007. Use, modification and distribution is
2 // subject to the Boost Software License, Version 1.0. (See accompanying
3 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
4 
5 #ifndef BOOST_GRAPH_DISTRIBUTED_ADJLIST_SERIALIZATION_070925_HPP
6 #define BOOST_GRAPH_DISTRIBUTED_ADJLIST_SERIALIZATION_070925_HPP
7 
8 #ifndef BOOST_GRAPH_USE_MPI
9 #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
10 #endif
11 
12 # include <boost/assert.hpp>
13 # include <boost/lexical_cast.hpp>
14 # include <boost/foreach.hpp>
15 # include <boost/filesystem/path.hpp>
16 # include <boost/filesystem/operations.hpp>
17 # include <cctype>
18 # include <fstream>
19 
20 namespace boost {
21 
22 namespace detail { namespace parallel
23 {
24 
25   // Wraps a local descriptor, making it serializable.
26   template <class Local>
27   struct serializable_local_descriptor
28   {
serializable_local_descriptorboost::detail::parallel::serializable_local_descriptor29       serializable_local_descriptor()
30       {}
31 
serializable_local_descriptorboost::detail::parallel::serializable_local_descriptor32       serializable_local_descriptor(Local local)
33         : local(local)
34       {}
35 
operator Local const&boost::detail::parallel::serializable_local_descriptor36       operator Local const&() const
37       {
38           return local;
39       }
40 
operator ==boost::detail::parallel::serializable_local_descriptor41       bool operator==(serializable_local_descriptor const& other) const
42       {
43           return local == other.local;
44       }
45 
operator <boost::detail::parallel::serializable_local_descriptor46       bool operator<(serializable_local_descriptor const& other) const
47       {
48           return local < other.local;
49       }
50 
51       template <class Archive>
serializeboost::detail::parallel::serializable_local_descriptor52       void serialize(Archive& ar, const unsigned int /*version*/)
53       {
54           ar & unsafe_serialize(local);
55       }
56 
57       Local local;
58   };
59 
60   template <class Vertex, class Properties>
61   struct pending_edge
62   {
pending_edgeboost::detail::parallel::pending_edge63       pending_edge(
64           Vertex source, Vertex target
65         , Properties properties, void* property_ptr
66       )
67         : source(source)
68         , target(target)
69         , properties(properties)
70         , property_ptr(property_ptr)
71       {}
72 
73       Vertex source;
74       Vertex target;
75       Properties properties;
76       void* property_ptr;
77   };
78 
is_digit(char c)79   inline bool is_digit(char c)
80   {
81       return (bool)std::isdigit(c);
82   }
83 
84   inline std::vector<int>
available_process_files(std::string const & filename)85       available_process_files(std::string const& filename)
86       {
87           if (!filesystem::exists(filename))
88               return std::vector<int>();
89 
90           std::vector<int> result;
91 
92           for (filesystem::directory_iterator i(filename), end; i != end; ++i)
93           {
94               if (!filesystem::is_regular(*i))
95                   boost::throw_exception(std::runtime_error("directory contains non-regular entries"));
96 
97               std::string process_name = i->path().filename().string();
98               for (std::string::size_type i = 0; i < process_name.size(); ++i)
99                 if (!is_digit(process_name[i]))
100                   boost::throw_exception(std::runtime_error("directory contains files with invalid names"));
101 
102               result.push_back(boost::lexical_cast<int>(process_name));
103           }
104 
105           return result;
106       }
107 
108   template <class Archive, class Tag, class T, class Base>
maybe_load_properties(Archive & ar,char const * name,property<Tag,T,Base> & properties)109   void maybe_load_properties(
110       Archive& ar, char const* name, property<Tag, T, Base>& properties)
111   {
112       ar >> serialization::make_nvp(name, get_property_value(properties, Tag()));
113       maybe_load_properties(ar, name, static_cast<Base&>(properties));
114   }
115 
116   template <class Archive>
maybe_load_properties(Archive &,char const *,no_property &)117   void maybe_load_properties(
118       Archive&, char const*, no_property&)
119   {}
120 
121   template <class Archive, typename Bundle>
maybe_load_properties(Archive & ar,char const * name,Bundle & bundle)122   void maybe_load_properties(
123       Archive& ar, char const* name, Bundle& bundle)
124   {
125     ar >> serialization::make_nvp(name, bundle);
126     no_property prop;
127     maybe_load_properties(ar, name, prop);
128   }
129 
130 
131 
132 
133 
134 
135   template <class Graph, class Archive, class VertexListS>
136   struct graph_loader
137   {
138       typedef typename Graph::vertex_descriptor vertex_descriptor;
139       typedef typename Graph::local_vertex_descriptor local_vertex_descriptor;
140       typedef typename Graph::vertex_property_type vertex_property_type;
141       typedef typename Graph::edge_descriptor edge_descriptor;
142       typedef typename Graph::local_edge_descriptor local_edge_descriptor;
143       typedef typename Graph::edge_property_type edge_property_type;
144       typedef typename Graph::process_group_type process_group_type;
145       typedef typename process_group_type::process_id_type process_id_type;
146       typedef typename Graph::directed_selector directed_selector;
147       typedef typename mpl::if_<
148           is_same<VertexListS, defaultS>, vecS, VertexListS
149       >::type vertex_list_selector;
150       typedef pending_edge<vertex_descriptor, edge_property_type>
151           pending_edge_type;
152       typedef serializable_local_descriptor<local_vertex_descriptor>
153           serializable_vertex_descriptor;
154 
graph_loaderboost::detail::parallel::graph_loader155       graph_loader(Graph& g, Archive& ar)
156         : m_g(g)
157         , m_ar(ar)
158         , m_pg(g.process_group())
159         , m_requested_vertices(num_processes(m_pg))
160         , m_remote_vertices(num_processes(m_pg))
161         , m_property_ptrs(num_processes(m_pg))
162       {
163           g.clear();
164           load_prefix();
165           load_vertices();
166           load_edges();
167           ar >> make_nvp("distribution", m_g.distribution());
168       }
169 
170   private:
171       struct pending_in_edge
172       {
pending_in_edgeboost::detail::parallel::graph_loader::pending_in_edge173           pending_in_edge(
174               vertex_descriptor u, vertex_descriptor v, void* property_ptr
175           )
176             : u(u)
177             , v(v)
178             , property_ptr(property_ptr)
179           {}
180 
181           vertex_descriptor u;
182           vertex_descriptor v;
183           void* property_ptr;
184       };
185 
is_rootboost::detail::parallel::graph_loader186       bool is_root() const
187       {
188           return process_id(m_pg) == 0;
189       }
190 
191       template <class T>
make_nvpboost::detail::parallel::graph_loader192       serialization::nvp<T> const make_nvp(char const* name, T& value) const
193       {
194           return serialization::nvp<T>(name, value);
195       }
196 
197       void load_prefix();
198       void load_vertices();
199 
200       template <class Anything>
201       void maybe_load_and_store_local_vertex(Anything);
202       void maybe_load_and_store_local_vertex(vecS);
203 
204       void load_edges();
205       void load_in_edges(bidirectionalS);
206       void load_in_edges(directedS);
207       void add_pending_in_edge(
208           vertex_descriptor u, vertex_descriptor v, void* property_ptr, vecS);
209       template <class Anything>
210       void add_pending_in_edge(
211           vertex_descriptor u, vertex_descriptor v, void* property_ptr, Anything);
212       template <class Anything>
213       void add_edge(
214           vertex_descriptor u, vertex_descriptor v
215         , edge_property_type const& property, void* property_ptr, Anything);
216       void add_edge(
217           vertex_descriptor u, vertex_descriptor v
218         , edge_property_type const& property, void* property_ptr, vecS);
219       void add_remote_vertex_request(
220           vertex_descriptor u, vertex_descriptor v, directedS);
221       void add_remote_vertex_request(
222           vertex_descriptor u, vertex_descriptor v, bidirectionalS);
223       void add_in_edge(
224           edge_descriptor const&, void*, directedS);
225       void add_in_edge(
226           edge_descriptor const& edge, void* old_property_ptr, bidirectionalS);
227 
228       void resolve_remote_vertices(directedS);
229       void resolve_remote_vertices(bidirectionalS);
230       vertex_descriptor resolve_remote_vertex(vertex_descriptor u) const;
231       vertex_descriptor resolve_remote_vertex(vertex_descriptor u, vecS) const;
232       template <class Anything>
233       vertex_descriptor resolve_remote_vertex(vertex_descriptor u, Anything) const;
234 
235       void resolve_property_ptrs();
236 
237       void commit_pending_edges(vecS);
238       template <class Anything>
239       void commit_pending_edges(Anything);
240       void commit_pending_in_edges(directedS);
241       void commit_pending_in_edges(bidirectionalS);
242 
maybe_load_property_ptrboost::detail::parallel::graph_loader243       void* maybe_load_property_ptr(directedS) { return 0; }
244       void* maybe_load_property_ptr(bidirectionalS);
245 
246       Graph& m_g;
247       Archive& m_ar;
248       process_group_type m_pg;
249 
250       std::vector<process_id_type> m_id_mapping;
251 
252       // Maps local vertices as loaded from the archive to
253       // the ones actually added to the graph. Only used
254       // when !vecS.
255       std::map<local_vertex_descriptor, local_vertex_descriptor> m_local_vertices;
256 
257       // This is the list of remote vertex descriptors that we
258       // are going to receive from other processes. This is
259       // kept sorted so that we can determine the position of
260       // the matching vertex descriptor in m_remote_vertices.
261       std::vector<std::vector<serializable_vertex_descriptor> > m_requested_vertices;
262 
263       // This is the list of remote vertex descriptors that
264       // we send and receive from other processes.
265       std::vector<std::vector<serializable_vertex_descriptor> > m_remote_vertices;
266 
267       // ...
268       std::vector<pending_edge_type> m_pending_edges;
269 
270       // The pending in-edges that will be added in the commit step, after
271       // the remote vertex descriptors has been resolved. Only used
272       // when bidirectionalS and !vecS.
273       std::vector<pending_in_edge> m_pending_in_edges;
274 
275       std::vector<std::vector<unsafe_pair<void*,void*> > > m_property_ptrs;
276   };
277 
278   template <class Graph, class Archive, class VertexListS>
load_prefix()279   void graph_loader<Graph, Archive, VertexListS>::load_prefix()
280   {
281       typename process_group_type::process_size_type num_processes_;
282       m_ar >> make_nvp("num_processes", num_processes_);
283 
284       if (num_processes_ != num_processes(m_pg))
285           boost::throw_exception(std::runtime_error("number of processes mismatch"));
286 
287       process_id_type old_id;
288       m_ar >> make_nvp("id", old_id);
289 
290       std::vector<typename Graph::distribution_type::size_type> mapping;
291       m_ar >> make_nvp("mapping", mapping);
292 
293       // Fetch all the old id's from the other processes.
294       std::vector<process_id_type> old_ids;
295       all_gather(m_pg, &old_id, &old_id+1, old_ids);
296 
297       m_id_mapping.resize(num_processes(m_pg), -1);
298 
299       for (process_id_type i = 0; i < num_processes(m_pg); ++i)
300       {
301 # ifdef PBGL_SERIALIZE_DEBUG
302           if (is_root())
303               std::cout << i << " used to be " << old_ids[i] << "\n";
304 # endif
305           BOOST_ASSERT(m_id_mapping[old_ids[i]] == -1);
306           m_id_mapping[old_ids[i]] = i;
307       }
308 
309       std::vector<typename Graph::distribution_type::size_type> new_mapping(
310           mapping.size());
311 
312       for (int i = 0; i < num_processes(m_pg); ++i)
313       {
314           new_mapping[mapping[old_ids[i]]] = i;
315       }
316 
317       m_g.distribution().assign_mapping(
318           new_mapping.begin(), new_mapping.end());
319   }
320 
321   template <class Graph, class Archive, class VertexListS>
load_vertices()322   void graph_loader<Graph, Archive, VertexListS>::load_vertices()
323   {
324       int V;
325       m_ar >> BOOST_SERIALIZATION_NVP(V);
326 
327 # ifdef PBGL_SERIALIZE_DEBUG
328       if (is_root())
329           std::cout << "Loading vertices\n";
330 # endif
331 
332       for (int i = 0; i < V; ++i)
333       {
334           maybe_load_and_store_local_vertex(vertex_list_selector());
335       }
336   }
337 
338   template <class Graph, class Archive, class VertexListS>
339   template <class Anything>
maybe_load_and_store_local_vertex(Anything)340   void graph_loader<Graph, Archive, VertexListS>::maybe_load_and_store_local_vertex(Anything)
341   {
342       // Load the original vertex descriptor
343       local_vertex_descriptor local;
344       m_ar >> make_nvp("local", unsafe_serialize(local));
345 
346       // Load the properties
347       vertex_property_type property;
348       detail::parallel::maybe_load_properties(m_ar, "vertex_property",
349                           property);
350 
351       // Add the vertex
352       vertex_descriptor v(process_id(m_pg), add_vertex(property, m_g.base()));
353 
354       if (m_g.on_add_vertex)
355         m_g.on_add_vertex(v, m_g);
356 
357       // Create the mapping from the "old" local descriptor to the new
358       // local descriptor.
359       m_local_vertices[local] = v.local;
360   }
361 
362   template <class Graph, class Archive, class VertexListS>
maybe_load_and_store_local_vertex(vecS)363   void graph_loader<Graph, Archive, VertexListS>::maybe_load_and_store_local_vertex(vecS)
364   {
365       // Load the properties
366       vertex_property_type property;
367       detail::parallel::maybe_load_properties(m_ar, "vertex_property",
368                           property);
369 
370       // Add the vertex
371       vertex_descriptor v(process_id(m_pg),
372                           add_vertex(m_g.build_vertex_property(property),
373                                      m_g.base()));
374 
375       if (m_g.on_add_vertex)
376         m_g.on_add_vertex(v, m_g);
377   }
378 
379   template <class Graph, class Archive, class VertexListS>
load_edges()380   void graph_loader<Graph, Archive, VertexListS>::load_edges()
381   {
382       int E;
383       m_ar >> BOOST_SERIALIZATION_NVP(E);
384 
385 # ifdef PBGL_SERIALIZE_DEBUG
386       if (is_root())
387           std::cout << "Loading edges\n";
388 # endif
389 
390       for (int i = 0; i < E; ++i)
391       {
392           local_vertex_descriptor local_src;
393           process_id_type target_owner;
394           local_vertex_descriptor local_tgt;
395 
396           m_ar >> make_nvp("source", unsafe_serialize(local_src));
397           m_ar >> make_nvp("target_owner", target_owner);
398           m_ar >> make_nvp("target", unsafe_serialize(local_tgt));
399 
400           process_id_type new_src_owner = process_id(m_pg);
401           process_id_type new_tgt_owner = m_id_mapping[target_owner];
402 
403           vertex_descriptor source(new_src_owner, local_src);
404           vertex_descriptor target(new_tgt_owner, local_tgt);
405 
406           edge_property_type properties;
407           detail::parallel::maybe_load_properties(m_ar, "edge_property", properties);
408 
409           void* property_ptr = maybe_load_property_ptr(directed_selector());
410           add_edge(source, target, properties, property_ptr, vertex_list_selector());
411       }
412 
413       load_in_edges(directed_selector());
414       commit_pending_edges(vertex_list_selector());
415   }
416 
417   template <class Graph, class Archive, class VertexListS>
load_in_edges(bidirectionalS)418   void graph_loader<Graph, Archive, VertexListS>::load_in_edges(bidirectionalS)
419   {
420       std::size_t I;
421       m_ar >> BOOST_SERIALIZATION_NVP(I);
422 
423 # ifdef PBGL_SERIALIZE_DEBUG
424       if (is_root())
425           std::cout << "Loading in-edges\n";
426 # endif
427 
428       for (int i = 0; i < I; ++i)
429       {
430           process_id_type src_owner;
431           local_vertex_descriptor local_src;
432           local_vertex_descriptor local_target;
433           void* property_ptr;
434 
435           m_ar >> make_nvp("src_owner", src_owner);
436           m_ar >> make_nvp("source", unsafe_serialize(local_src));
437           m_ar >> make_nvp("target", unsafe_serialize(local_target));
438           m_ar >> make_nvp("property_ptr", unsafe_serialize(property_ptr));
439 
440           src_owner = m_id_mapping[src_owner];
441 
442           vertex_descriptor u(src_owner, local_src);
443           vertex_descriptor v(process_id(m_pg), local_target);
444 
445           add_pending_in_edge(u, v, property_ptr, vertex_list_selector());
446       }
447   }
448 
449   template <class Graph, class Archive, class VertexListS>
load_in_edges(directedS)450   void graph_loader<Graph, Archive, VertexListS>::load_in_edges(directedS)
451   {}
452 
453   template <class Graph, class Archive, class VertexListS>
add_pending_in_edge(vertex_descriptor u,vertex_descriptor v,void * property_ptr,vecS)454   void graph_loader<Graph, Archive, VertexListS>::add_pending_in_edge(
455       vertex_descriptor u, vertex_descriptor v, void* property_ptr, vecS)
456   {
457       m_pending_in_edges.push_back(pending_in_edge(u,v,property_ptr));
458   }
459 
460   template <class Graph, class Archive, class VertexListS>
461   template <class Anything>
add_pending_in_edge(vertex_descriptor u,vertex_descriptor v,void * property_ptr,Anything)462   void graph_loader<Graph, Archive, VertexListS>::add_pending_in_edge(
463       vertex_descriptor u, vertex_descriptor v, void* property_ptr, Anything)
464   {
465       // u and v represent the out-edge here, meaning v is local
466       // to us, and u is always remote.
467       m_pending_in_edges.push_back(pending_in_edge(u,v,property_ptr));
468       add_remote_vertex_request(v, u, bidirectionalS());
469   }
470 
471   template <class Graph, class Archive, class VertexListS>
472   template <class Anything>
add_edge(vertex_descriptor u,vertex_descriptor v,edge_property_type const & property,void * property_ptr,Anything)473   void graph_loader<Graph, Archive, VertexListS>::add_edge(
474       vertex_descriptor u, vertex_descriptor v
475     , edge_property_type const& property, void* property_ptr, Anything)
476   {
477       m_pending_edges.push_back(pending_edge_type(u, v, property, property_ptr));
478       add_remote_vertex_request(u, v, directed_selector());
479   }
480 
481   template <class Graph, class Archive, class VertexListS>
add_remote_vertex_request(vertex_descriptor u,vertex_descriptor v,directedS)482   void graph_loader<Graph, Archive, VertexListS>::add_remote_vertex_request(
483       vertex_descriptor u, vertex_descriptor v, directedS)
484   {
485       // We have to request the remote vertex.
486       m_requested_vertices[owner(v)].push_back(local(v));
487   }
488 
489   template <class Graph, class Archive, class VertexListS>
add_remote_vertex_request(vertex_descriptor u,vertex_descriptor v,bidirectionalS)490   void graph_loader<Graph, Archive, VertexListS>::add_remote_vertex_request(
491       vertex_descriptor u, vertex_descriptor v, bidirectionalS)
492   {
493       // If the edge spans to another process, we know
494       // that that process has a matching in-edge, so
495       // we can just send our vertex. No requests
496       // necessary.
497       if (owner(v) != m_g.processor())
498       {
499           m_remote_vertices[owner(v)].push_back(local(u));
500           m_requested_vertices[owner(v)].push_back(local(v));
501       }
502   }
503 
504   template <class Graph, class Archive, class VertexListS>
add_edge(vertex_descriptor u,vertex_descriptor v,edge_property_type const & property,void * property_ptr,vecS)505   void graph_loader<Graph, Archive, VertexListS>::add_edge(
506       vertex_descriptor u, vertex_descriptor v
507     , edge_property_type const& property, void* property_ptr, vecS)
508   {
509       std::pair<local_edge_descriptor, bool> inserted =
510           detail::parallel::add_local_edge(
511               local(u), local(v)
512             , m_g.build_edge_property(property), m_g.base());
513       BOOST_ASSERT(inserted.second);
514       put(edge_target_processor_id, m_g.base(), inserted.first, owner(v));
515 
516       edge_descriptor e(owner(u), owner(v), true, inserted.first);
517 
518       if (inserted.second && m_g.on_add_edge)
519         m_g.on_add_edge(e, m_g);
520 
521       add_in_edge(e, property_ptr, directed_selector());
522   }
523 
524   template <class Graph, class Archive, class VertexListS>
add_in_edge(edge_descriptor const &,void *,directedS)525   void graph_loader<Graph, Archive, VertexListS>::add_in_edge(
526       edge_descriptor const&, void*, directedS)
527   {}
528 
529   template <class Graph, class Archive, class VertexListS>
add_in_edge(edge_descriptor const & edge,void * old_property_ptr,bidirectionalS)530   void graph_loader<Graph, Archive, VertexListS>::add_in_edge(
531       edge_descriptor const& edge, void* old_property_ptr, bidirectionalS)
532   {
533       if (owner(target(edge, m_g)) == m_g.processor())
534       {
535           detail::parallel::stored_in_edge<local_edge_descriptor>
536               e(m_g.processor(), local(edge));
537           boost::graph_detail::push(get(
538               vertex_in_edges, m_g.base())[local(target(edge, m_g))], e);
539       }
540       else
541       {
542           // We send the (old,new) property pointer pair to
543           // the remote process. This could be optimized to
544           // only send the new one -- the ordering can be
545           // made implicit because the old pointer value is
546           // stored on the remote process.
547           //
548           // Doing that is a little bit more complicated, but
549           // in case it turns out it's important we can do it.
550           void* property_ptr = local(edge).get_property();
551           m_property_ptrs[owner(target(edge, m_g))].push_back(
552               unsafe_pair<void*,void*>(old_property_ptr, property_ptr));
553       }
554   }
555 
556   template <class Graph, class Archive, class VertexListS>
resolve_property_ptrs()557   void graph_loader<Graph, Archive, VertexListS>::resolve_property_ptrs()
558   {
559 # ifdef PBGL_SERIALIZE_DEBUG
560       if (is_root())
561           std::cout << "Resolving property pointers\n";
562 # endif
563 
564       for (int i = 0; i < num_processes(m_pg); ++i)
565       {
566           std::sort(
567               m_property_ptrs[i].begin(), m_property_ptrs[i].end());
568       }
569 
570       boost::parallel::inplace_all_to_all(m_pg, m_property_ptrs);
571   }
572 
573   template <class Graph, class Archive, class VertexListS>
resolve_remote_vertices(directedS)574   void graph_loader<Graph, Archive, VertexListS>::resolve_remote_vertices(directedS)
575   {
576       for (int i = 0; i < num_processes(m_pg); ++i)
577       {
578           std::sort(m_requested_vertices[i].begin(), m_requested_vertices[i].end());
579       }
580 
581       boost::parallel::inplace_all_to_all(
582           m_pg, m_requested_vertices, m_remote_vertices);
583 
584       for (int i = 0; i < num_processes(m_pg); ++i)
585       {
586           BOOST_FOREACH(serializable_vertex_descriptor& u, m_remote_vertices[i])
587           {
588               u = m_local_vertices[u];
589           }
590       }
591 
592       boost::parallel::inplace_all_to_all(m_pg, m_remote_vertices);
593   }
594 
595   template <class Graph, class Archive, class VertexListS>
resolve_remote_vertices(bidirectionalS)596   void graph_loader<Graph, Archive, VertexListS>::resolve_remote_vertices(bidirectionalS)
597   {
598 # ifdef PBGL_SERIALIZE_DEBUG
599       if (is_root())
600           std::cout << "Resolving remote vertices\n";
601 # endif
602 
603       for (int i = 0; i < num_processes(m_pg); ++i)
604       {
605           std::sort(m_requested_vertices[i].begin(), m_requested_vertices[i].end());
606           std::sort(m_remote_vertices[i].begin(), m_remote_vertices[i].end());
607 
608           BOOST_FOREACH(serializable_vertex_descriptor& u, m_remote_vertices[i])
609           {
610               u = m_local_vertices[u];
611           }
612       }
613 
614       boost::parallel::inplace_all_to_all(m_pg, m_remote_vertices);
615 
616       for (int i = 0; i < num_processes(m_pg); ++i)
617           BOOST_ASSERT(m_remote_vertices[i].size() == m_requested_vertices[i].size());
618   }
619 
620   template <class Graph, class Archive, class VertexListS>
commit_pending_edges(vecS)621   void graph_loader<Graph, Archive, VertexListS>::commit_pending_edges(vecS)
622   {
623       commit_pending_in_edges(directed_selector());
624   }
625 
626   template <class Graph, class Archive, class VertexListS>
627   template <class Anything>
commit_pending_edges(Anything)628   void graph_loader<Graph, Archive, VertexListS>::commit_pending_edges(Anything)
629   {
630       resolve_remote_vertices(directed_selector());
631 
632       BOOST_FOREACH(pending_edge_type const& e, m_pending_edges)
633       {
634           vertex_descriptor u = resolve_remote_vertex(e.source);
635           vertex_descriptor v = resolve_remote_vertex(e.target);
636           add_edge(u, v, e.properties, e.property_ptr, vecS());
637       }
638 
639       commit_pending_in_edges(directed_selector());
640   }
641 
642   template <class Graph, class Archive, class VertexListS>
commit_pending_in_edges(directedS)643   void graph_loader<Graph, Archive, VertexListS>::commit_pending_in_edges(directedS)
644   {}
645 
646   template <class Graph, class Archive, class VertexListS>
commit_pending_in_edges(bidirectionalS)647   void graph_loader<Graph, Archive, VertexListS>::commit_pending_in_edges(bidirectionalS)
648   {
649       resolve_property_ptrs();
650 
651       BOOST_FOREACH(pending_in_edge const& e, m_pending_in_edges)
652       {
653           vertex_descriptor u = resolve_remote_vertex(e.u, vertex_list_selector());
654           vertex_descriptor v = resolve_remote_vertex(e.v, vertex_list_selector());
655 
656           typedef detail::parallel::stored_in_edge<local_edge_descriptor> stored_edge;
657 
658           std::vector<unsafe_pair<void*,void*> >::iterator i = std::lower_bound(
659               m_property_ptrs[owner(u)].begin()
660             , m_property_ptrs[owner(u)].end()
661             , unsafe_pair<void*,void*>(e.property_ptr, 0)
662           );
663 
664           if (i == m_property_ptrs[owner(u)].end()
665               || i->first != e.property_ptr)
666           {
667               BOOST_ASSERT(false);
668           }
669 
670           local_edge_descriptor local_edge(local(u), local(v), i->second);
671           stored_edge edge(owner(u), local_edge);
672           boost::graph_detail::push(
673               get(vertex_in_edges, m_g.base())[local(v)], edge);
674       }
675   }
676 
677   template <class Graph, class Archive, class VertexListS>
678   typename graph_loader<Graph, Archive, VertexListS>::vertex_descriptor
resolve_remote_vertex(vertex_descriptor u) const679   graph_loader<Graph, Archive, VertexListS>::resolve_remote_vertex(
680       vertex_descriptor u) const
681   {
682       if (owner(u) == process_id(m_pg))
683       {
684           return vertex_descriptor(
685               process_id(m_pg), m_local_vertices.find(local(u))->second);
686       }
687 
688       typename std::vector<serializable_vertex_descriptor>::const_iterator
689           i = std::lower_bound(
690               m_requested_vertices[owner(u)].begin()
691             , m_requested_vertices[owner(u)].end()
692             , serializable_vertex_descriptor(local(u))
693           );
694 
695       if (i == m_requested_vertices[owner(u)].end()
696           || *i != local(u))
697       {
698           BOOST_ASSERT(false);
699       }
700 
701       local_vertex_descriptor local =
702           m_remote_vertices[owner(u)][m_requested_vertices[owner(u)].end() - i];
703       return vertex_descriptor(owner(u), local);
704   }
705 
706   template <class Graph, class Archive, class VertexListS>
707   typename graph_loader<Graph, Archive, VertexListS>::vertex_descriptor
resolve_remote_vertex(vertex_descriptor u,vecS) const708   graph_loader<Graph, Archive, VertexListS>::resolve_remote_vertex(
709       vertex_descriptor u, vecS) const
710   {
711       return u;
712   }
713 
714   template <class Graph, class Archive, class VertexListS>
715   template <class Anything>
716   typename graph_loader<Graph, Archive, VertexListS>::vertex_descriptor
resolve_remote_vertex(vertex_descriptor u,Anything) const717   graph_loader<Graph, Archive, VertexListS>::resolve_remote_vertex(
718       vertex_descriptor u, Anything) const
719   {
720       return resolve_remote_vertex(u);
721   }
722 
723   template <class Graph, class Archive, class VertexListS>
724   void*
maybe_load_property_ptr(bidirectionalS)725   graph_loader<Graph, Archive, VertexListS>::maybe_load_property_ptr(bidirectionalS)
726   {
727       void* ptr;
728       m_ar >> make_nvp("property_ptr", unsafe_serialize(ptr));
729       return ptr;
730   }
731 
732 template <class Archive, class D>
maybe_save_local_descriptor(Archive & ar,D const &,vecS)733 void maybe_save_local_descriptor(Archive& ar, D const&, vecS)
734 {}
735 
736 template <class Archive, class D, class NotVecS>
maybe_save_local_descriptor(Archive & ar,D const & d,NotVecS)737 void maybe_save_local_descriptor(Archive& ar, D const& d, NotVecS)
738 {
739     ar << serialization::make_nvp(
740         "local", unsafe_serialize(const_cast<D&>(d)));
741 }
742 
743 template <class Archive>
maybe_save_properties(Archive &,char const *,no_property const &)744 void maybe_save_properties(
745     Archive&, char const*, no_property const&)
746 {}
747 
748 template <class Archive, class Tag, class T, class Base>
maybe_save_properties(Archive & ar,char const * name,property<Tag,T,Base> const & properties)749 void maybe_save_properties(
750     Archive& ar, char const* name, property<Tag, T, Base> const& properties)
751 {
752     ar & serialization::make_nvp(name, get_property_value(properties, Tag()));
753     maybe_save_properties(ar, name, static_cast<Base const&>(properties));
754 }
755 
756 template <class Archive, class Graph>
save_in_edges(Archive & ar,Graph const & g,directedS)757 void save_in_edges(Archive& ar, Graph const& g, directedS)
758 {}
759 
760 // We need to save the edges in the base edge
761 // list, and the in_edges that are stored in the
762 // vertex_in_edges vertex property.
763 template <class Archive, class Graph>
save_in_edges(Archive & ar,Graph const & g,bidirectionalS)764 void save_in_edges(Archive& ar, Graph const& g, bidirectionalS)
765 {
766     typedef typename Graph::process_group_type
767         process_group_type;
768     typedef typename process_group_type::process_id_type
769         process_id_type;
770     typedef typename graph_traits<
771         Graph>::vertex_descriptor vertex_descriptor;
772     typedef typename vertex_descriptor::local_descriptor_type
773         local_vertex_descriptor;
774     typedef typename graph_traits<
775         Graph>::edge_descriptor edge_descriptor;
776 
777     process_id_type id = g.processor();
778 
779     std::vector<edge_descriptor> saved_in_edges;
780 
781     BGL_FORALL_VERTICES_T(v, g, Graph)
782     {
783         BOOST_FOREACH(edge_descriptor const& e, in_edges(v, g))
784         {
785             // Only save the in_edges that isn't owned by this process.
786             if (owner(e) == id)
787                 continue;
788 
789             saved_in_edges.push_back(e);
790         }
791     }
792 
793     std::size_t I = saved_in_edges.size();
794     ar << BOOST_SERIALIZATION_NVP(I);
795 
796     BOOST_FOREACH(edge_descriptor const& e, saved_in_edges)
797     {
798         process_id_type src_owner = owner(source(e,g));
799         local_vertex_descriptor local_src = local(source(e,g));
800         local_vertex_descriptor local_target = local(target(e,g));
801         void* property_ptr = local(e).get_property();
802 
803         using serialization::make_nvp;
804 
805         ar << make_nvp("src_owner", src_owner);
806         ar << make_nvp("source", unsafe_serialize(local_src));
807         ar << make_nvp("target", unsafe_serialize(local_target));
808         ar << make_nvp("property_ptr", unsafe_serialize(property_ptr));
809     }
810 }
811 
812 template <class Archive, class Edge>
maybe_save_property_ptr(Archive &,Edge const &,directedS)813 void maybe_save_property_ptr(Archive&, Edge const&, directedS)
814 {}
815 
816 template <class Archive, class Edge>
maybe_save_property_ptr(Archive & ar,Edge const & e,bidirectionalS)817 void maybe_save_property_ptr(Archive& ar, Edge const& e, bidirectionalS)
818 {
819     void* ptr = local(e).get_property();
820     ar << serialization::make_nvp("property_ptr", unsafe_serialize(ptr));
821 }
822 
823 template <class Archive, class Graph, class DirectedS>
save_edges(Archive & ar,Graph const & g,DirectedS)824 void save_edges(Archive& ar, Graph const& g, DirectedS)
825 {
826     typedef typename Graph::process_group_type
827         process_group_type;
828     typedef typename process_group_type::process_id_type
829         process_id_type;
830     typedef typename graph_traits<
831         Graph>::vertex_descriptor vertex_descriptor;
832 
833     typedef typename Graph::edge_property_type edge_property_type;
834 
835     int E = num_edges(g);
836     ar << BOOST_SERIALIZATION_NVP(E);
837 
838     // For *directed* graphs, we can just save
839     // the edge list and be done.
840     //
841     // For *bidirectional* graphs, we need to also
842     // save the "vertex_in_edges" property map,
843     // because it might contain in-edges that
844     // are not locally owned.
845     BGL_FORALL_EDGES_T(e, g, Graph)
846     {
847         vertex_descriptor src(source(e, g));
848         vertex_descriptor tgt(target(e, g));
849 
850         typename vertex_descriptor::local_descriptor_type
851             local_u(local(src));
852         typename vertex_descriptor::local_descriptor_type
853             local_v(local(tgt));
854 
855         process_id_type target_owner = owner(tgt);
856 
857         using serialization::make_nvp;
858 
859         ar << make_nvp("source", unsafe_serialize(local_u));
860         ar << make_nvp("target_owner", target_owner);
861         ar << make_nvp("target", unsafe_serialize(local_v));
862 
863         maybe_save_properties(
864             ar, "edge_property"
865           , static_cast<edge_property_type const&>(get(edge_all_t(), g, e))
866         );
867 
868         maybe_save_property_ptr(ar, e, DirectedS());
869     }
870 
871     save_in_edges(ar, g, DirectedS());
872 }
873 
874 }} // namespace detail::parallel
875 
876 template <PBGL_DISTRIB_ADJLIST_TEMPLATE_PARMS>
877 template <class IStreamConstructibleArchive>
load(std::string const & filename)878 void PBGL_DISTRIB_ADJLIST_TYPE::load(std::string const& filename)
879 {
880     process_group_type pg = process_group();
881     process_id_type id = process_id(pg);
882 
883     synchronize(pg);
884 
885     std::vector<int> disk_files = detail::parallel::available_process_files(filename);
886     std::sort(disk_files.begin(), disk_files.end());
887 
888     // Negotiate which process gets which file. Serialized.
889     std::vector<int> consumed_files;
890     int picked_file = -1;
891 
892     if (id > 0)
893         receive_oob(pg, id-1, 0, consumed_files);
894 
895     std::sort(consumed_files.begin(), consumed_files.end());
896     std::vector<int> available_files;
897     std::set_difference(
898         disk_files.begin(), disk_files.end()
899       , consumed_files.begin(), consumed_files.end()
900       , std::back_inserter(available_files)
901     );
902 
903     if (available_files.empty())
904         boost::throw_exception(std::runtime_error("no file available"));
905 
906     // back() used for debug purposes. Making sure the
907     // ranks are shuffled.
908     picked_file = available_files.back();
909 
910 # ifdef PBGL_SERIALIZE_DEBUG
911     std::cout << id << " picked " << picked_file << "\n";
912 # endif
913 
914     consumed_files.push_back(picked_file);
915 
916     if (id < num_processes(pg) - 1)
917         send_oob(pg, id+1, 0, consumed_files);
918 
919     std::string local_filename = filename + "/" +
920         lexical_cast<std::string>(picked_file);
921 
922     std::ifstream in(local_filename.c_str(), std::ios_base::binary);
923     IStreamConstructibleArchive ar(in);
924 
925     detail::parallel::graph_loader<
926         graph_type, IStreamConstructibleArchive, InVertexListS
927     > loader(*this, ar);
928 
929 # ifdef PBGL_SERIALIZE_DEBUG
930     std::cout << "Process " << id << " done loading.\n";
931 # endif
932 
933     synchronize(pg);
934 }
935 
936 template <PBGL_DISTRIB_ADJLIST_TEMPLATE_PARMS>
937 template <class OStreamConstructibleArchive>
save(std::string const & filename) const938 void PBGL_DISTRIB_ADJLIST_TYPE::save(std::string const& filename) const
939 {
940     typedef typename config_type::VertexListS vertex_list_selector;
941 
942     process_group_type pg = process_group();
943     process_id_type id = process_id(pg);
944 
945     if (filesystem::exists(filename) && !filesystem::is_directory(filename))
946         boost::throw_exception(std::runtime_error("entry exists, but is not a directory"));
947 
948     filesystem::remove_all(filename);
949     filesystem::create_directory(filename);
950 
951     synchronize(pg);
952 
953     std::string local_filename = filename + "/" +
954         lexical_cast<std::string>(id);
955 
956     std::ofstream out(local_filename.c_str(), std::ios_base::binary);
957     OStreamConstructibleArchive ar(out);
958 
959     using serialization::make_nvp;
960 
961     typename process_group_type::process_size_type num_processes_ = num_processes(pg);
962     ar << make_nvp("num_processes", num_processes_);
963     ar << BOOST_SERIALIZATION_NVP(id);
964     ar << make_nvp("mapping", this->distribution().mapping());
965 
966     int V = num_vertices(*this);
967     ar << BOOST_SERIALIZATION_NVP(V);
968 
969     BGL_FORALL_VERTICES_T(v, *this, graph_type)
970     {
971         local_vertex_descriptor local_descriptor(local(v));
972         detail::parallel::maybe_save_local_descriptor(
973             ar, local_descriptor, vertex_list_selector());
974         detail::parallel::maybe_save_properties(
975             ar, "vertex_property"
976           , static_cast<vertex_property_type const&>(get(vertex_all_t(), *this, v))
977         );
978     }
979 
980     detail::parallel::save_edges(ar, *this, directed_selector());
981 
982     ar << make_nvp("distribution", this->distribution());
983 }
984 
985 } // namespace boost
986 
987 #endif // BOOST_GRAPH_DISTRIBUTED_ADJLIST_SERIALIZATION_070925_HPP
988 
989