1 #ifndef DIY_IO_BLOCK_HPP
2 #define DIY_IO_BLOCK_HPP
3 
4 #include <string>
5 #include <algorithm>
6 #include <stdexcept>
7 
8 #include "../mpi.hpp"
9 #include "../assigner.hpp"
10 #include "../master.hpp"
11 #include "../storage.hpp"
12 #include "../log.hpp"
13 #include "utils.hpp"
14 
15 // Read and write collections of blocks using MPI-IO
16 namespace diy
17 {
18 namespace io
19 {
20   namespace detail
21   {
22     typedef mpi::io::offset                 offset_t;
23 
24     struct GidOffsetCount
25     {
GidOffsetCountdiy::io::detail::GidOffsetCount26                     GidOffsetCount():                                   // need to initialize a vector of given size
27                         gid(-1), offset(0), count(0)                    {}
28 
GidOffsetCountdiy::io::detail::GidOffsetCount29                     GidOffsetCount(int gid_, offset_t offset_, offset_t count_):
30                         gid(gid_), offset(offset_), count(count_)       {}
31 
operator <diy::io::detail::GidOffsetCount32         bool        operator<(const GidOffsetCount& other) const        { return gid < other.gid; }
33 
34         int         gid;
35         offset_t    offset;
36         offset_t    count;
37     };
38   }
39 }
40 
41 // Serialize GidOffsetCount explicitly, to avoid alignment and uninitialized data issues
42 // (to get identical output files given the same block input)
43 template<>
44 struct Serialization<io::detail::GidOffsetCount>
45 {
46     typedef             io::detail::GidOffsetCount                  GidOffsetCount;
47 
savediy::Serialization48     static void         save(BinaryBuffer& bb, const GidOffsetCount& x)
49     {
50       diy::save(bb, x.gid);
51       diy::save(bb, x.offset);
52       diy::save(bb, x.count);
53     }
54 
loaddiy::Serialization55     static void         load(BinaryBuffer& bb, GidOffsetCount& x)
56     {
57       diy::load(bb, x.gid);
58       diy::load(bb, x.offset);
59       diy::load(bb, x.count);
60     }
61 };
62 
63 namespace io
64 {
65 /**
66  * \ingroup IO
67  * \brief Write blocks to storage collectively in one shared file
68  */
69   inline
70   void
write_blocks(const std::string & outfilename,const mpi::communicator & comm,Master & master,const MemoryBuffer & extra=MemoryBuffer (),Master::SaveBlock save=0)71   write_blocks(const std::string&           outfilename,           //!< output file name
72                const mpi::communicator&     comm,                  //!< communicator
73                Master&                      master,                //!< master object
74                const MemoryBuffer&          extra = MemoryBuffer(),//!< user-defined metadata for file header; meaningful only on rank == 0
75                Master::SaveBlock            save = 0)              //!< block save function in case different than or undefined in the master
76   {
77     if (!save) save = master.saver();       // save is likely to be different from master.save()
78 
79     typedef detail::offset_t                offset_t;
80     typedef detail::GidOffsetCount          GidOffsetCount;
81 
82     unsigned size = master.size(),
83              max_size, min_size;
84     mpi::all_reduce(comm, size, max_size, mpi::maximum<unsigned>());
85     mpi::all_reduce(comm, size, min_size, mpi::minimum<unsigned>());
86 
87     // truncate the file
88     if (comm.rank() == 0)
89         diy::io::utils::truncate(outfilename.c_str(), 0);
90 
91     mpi::io::file f(comm, outfilename, mpi::io::file::wronly | mpi::io::file::create);
92 
93     offset_t  start = 0, shift;
94     std::vector<GidOffsetCount>     offset_counts;
95     for (unsigned i = 0; i < max_size; ++i)
96     {
97       offset_t count = 0,
98                offset;
99       if (i < size)
100       {
101         // get the block from master and serialize it
102         const void* block = master.get(i);
103         MemoryBuffer bb;
104         LinkFactory::save(bb, master.link(i));
105         save(block, bb);
106         count = bb.buffer.size();
107         mpi::scan(comm, count, offset, std::plus<offset_t>());
108         offset += start - count;
109         mpi::all_reduce(comm, count, shift, std::plus<offset_t>());
110         start += shift;
111 
112         if (i < min_size)       // up to min_size, we can do collective IO
113           f.write_at_all(offset, bb.buffer);
114         else
115           f.write_at(offset, bb.buffer);
116 
117         offset_counts.push_back(GidOffsetCount(master.gid(i), offset, count));
118       } else
119       {
120         // matching global operations
121         mpi::scan(comm, count, offset, std::plus<offset_t>());
122         mpi::all_reduce(comm, count, shift, std::plus<offset_t>());
123 
124         // -1 indicates that there is no block written here from this rank
125         offset_counts.push_back(GidOffsetCount(-1, offset, count));
126       }
127     }
128 
129     if (comm.rank() == 0)
130     {
131       // round-about way of gather vector of vectors of GidOffsetCount to avoid registering a new mpi datatype
132       std::vector< std::vector<char> > gathered_offset_count_buffers;
133       MemoryBuffer oc_buffer; diy::save(oc_buffer, offset_counts);
134       mpi::gather(comm, oc_buffer.buffer, gathered_offset_count_buffers, 0);
135 
136       std::vector<GidOffsetCount>  all_offset_counts;
137       for (unsigned i = 0; i < gathered_offset_count_buffers.size(); ++i)
138       {
139         MemoryBuffer per_rank_oc_buffer; per_rank_oc_buffer.buffer.swap(gathered_offset_count_buffers[i]);
140         std::vector<GidOffsetCount> per_rank_offset_counts;
141         diy::load(per_rank_oc_buffer, per_rank_offset_counts);
142         for (unsigned j = 0; j < per_rank_offset_counts.size(); ++j)
143           if (per_rank_offset_counts[j].gid != -1)
144             all_offset_counts.push_back(per_rank_offset_counts[j]);
145       }
146       std::sort(all_offset_counts.begin(), all_offset_counts.end());        // sorts by gid
147 
148       MemoryBuffer bb;
149       diy::save(bb, all_offset_counts);
150       diy::save(bb, extra);
151       size_t footer_size = bb.size();
152       diy::save(bb, footer_size);
153 
154       // find footer_offset as the max of (offset + count)
155       offset_t footer_offset = 0;
156       for (unsigned i = 0; i < all_offset_counts.size(); ++i)
157       {
158         offset_t end = all_offset_counts[i].offset + all_offset_counts[i].count;
159         if (end > footer_offset)
160             footer_offset = end;
161       }
162       f.write_at(footer_offset, bb.buffer);
163     } else
164     {
165       MemoryBuffer oc_buffer; diy::save(oc_buffer, offset_counts);
166       mpi::gather(comm, oc_buffer.buffer, 0);
167     }
168   }
169 
170 /**
171  * \ingroup IO
172  * \brief Read blocks from storage collectively from one shared file
173  */
174     inline
175     void
read_blocks(const std::string & infilename,const mpi::communicator & comm,StaticAssigner & assigner,Master & master,MemoryBuffer & extra,Master::LoadBlock load=0)176     read_blocks(const std::string&           infilename,     //!< input file name
177                 const mpi::communicator&     comm,           //!< communicator
178                 StaticAssigner&              assigner,       //!< assigner object
179                 Master&                      master,         //!< master object
180                 MemoryBuffer&                extra,          //!< user-defined metadata in file header
181                 Master::LoadBlock            load = 0)       //!< load block function in case different than or unefined in the master
182     {
183         if (!load) load = master.loader();      // load is likely to be different from master.load()
184 
185         typedef detail::offset_t                offset_t;
186         typedef detail::GidOffsetCount          GidOffsetCount;
187 
188         mpi::io::file f(comm, infilename, mpi::io::file::rdonly);
189 
190         offset_t    footer_offset = f.size() - sizeof(size_t);
191         size_t footer_size;
192 
193         // Read the size
194         f.read_at_all(footer_offset, (char*) &footer_size, sizeof(footer_size));
195 
196         // Read all_offset_counts
197         footer_offset -= footer_size;
198         MemoryBuffer footer;
199         footer.buffer.resize(footer_size);
200         f.read_at_all(footer_offset, footer.buffer);
201 
202         std::vector<GidOffsetCount>  all_offset_counts;
203         diy::load(footer, all_offset_counts);
204         diy::load(footer, extra);
205         extra.reset();
206 
207         // Get local gids from assigner
208         size_t size = all_offset_counts.size();
209         assigner.set_nblocks(size);
210         std::vector<int> gids;
211         assigner.local_gids(comm.rank(), gids);
212 
213         for (unsigned i = 0; i < gids.size(); ++i)
214         {
215             if (gids[i] != all_offset_counts[gids[i]].gid)
216                 get_logger()->warn("gids don't match in diy::io::read_blocks(), {} vs {}",
217                                    gids[i], all_offset_counts[gids[i]].gid);
218 
219             offset_t offset = all_offset_counts[gids[i]].offset,
220                      count  = all_offset_counts[gids[i]].count;
221             MemoryBuffer bb;
222             bb.buffer.resize(count);
223             f.read_at(offset, bb.buffer);
224             Link* l = LinkFactory::load(bb);
225             l->fix(assigner);
226             void* b = master.create();
227             load(b, bb);
228             master.add(gids[i], b, l);
229         }
230     }
231 
232 
233   // Functions without the extra buffer, for compatibility with the old code
234   inline
235   void
write_blocks(const std::string & outfilename,const mpi::communicator & comm,Master & master,Master::SaveBlock save)236   write_blocks(const std::string&           outfilename,
237                const mpi::communicator&     comm,
238                Master&                      master,
239                Master::SaveBlock            save)
240   {
241     MemoryBuffer extra;
242     write_blocks(outfilename, comm, master, extra, save);
243   }
244 
245   inline
246   void
read_blocks(const std::string & infilename,const mpi::communicator & comm,StaticAssigner & assigner,Master & master,Master::LoadBlock load=0)247   read_blocks(const std::string&           infilename,
248               const mpi::communicator&     comm,
249               StaticAssigner&              assigner,
250               Master&                      master,
251               Master::LoadBlock            load = 0)
252   {
253     MemoryBuffer extra;     // dummy
254     read_blocks(infilename, comm, assigner, master, extra, load);
255   }
256 
257 namespace split
258 {
259 /**
260  * \ingroup IO
261  * \brief Write blocks to storage independently in one file per process
262  */
263   inline
264   void
write_blocks(const std::string & outfilename,const mpi::communicator & comm,Master & master,const MemoryBuffer & extra=MemoryBuffer (),Master::SaveBlock save=0)265   write_blocks(const std::string&           outfilename,           //!< output file name
266                const mpi::communicator&     comm,                  //!< communicator
267                Master&                      master,                //!< master object
268                const MemoryBuffer&          extra = MemoryBuffer(),//!< user-defined metadata for file header; meaningful only on rank == 0
269                Master::SaveBlock            save = 0)              //!< block save function in case different than or undefined in master
270   {
271     if (!save) save = master.saver();       // save is likely to be different from master.save()
272 
273     bool proceed = false;
274     size_t size = 0;
275     if (comm.rank() == 0)
276     {
277         if (diy::io::utils::is_directory(outfilename))
278           proceed = true;
279         else if (diy::io::utils::make_directory(outfilename))
280           proceed = true;
281 
282         mpi::broadcast(comm, proceed, 0);
283         mpi::reduce(comm, (size_t) master.size(), size, 0, std::plus<size_t>());
284     } else
285     {
286         mpi::broadcast(comm, proceed, 0);
287         mpi::reduce(comm, (size_t) master.size(), 0, std::plus<size_t>());
288     }
289 
290     if (!proceed)
291         throw std::runtime_error("Cannot access or create directory: " + outfilename);
292 
293     for (int i = 0; i < (int)master.size(); ++i)
294     {
295         const void* block = master.get(i);
296 
297         std::string filename = fmt::format("{}/{}", outfilename, master.gid(i));
298 
299         ::diy::detail::FileBuffer bb(fopen(filename.c_str(), "w"));
300 
301         LinkFactory::save(bb, master.link(i));
302         save(block, bb);
303 
304         fclose(bb.file);
305     }
306 
307     if (comm.rank() == 0)
308     {
309         // save the extra buffer
310         std::string filename = outfilename + "/extra";
311         ::diy::detail::FileBuffer bb(fopen(filename.c_str(), "w"));
312         ::diy::save(bb, size);
313         ::diy::save(bb, extra);
314         fclose(bb.file);
315     }
316   }
317 
318 /**
319  * \ingroup IO
320  * \brief Read blocks from storage independently from one file per process
321  */
322   inline
323   void
read_blocks(const std::string & infilename,const mpi::communicator & comm,StaticAssigner & assigner,Master & master,MemoryBuffer & extra,Master::LoadBlock load=0)324   read_blocks(const std::string&           infilename,  //!< input file name
325               const mpi::communicator&     comm,        //!< communicator
326               StaticAssigner&              assigner,    //!< assigner object
327               Master&                      master,      //!< master object
328               MemoryBuffer&                extra,       //!< user-defined metadata in file header
329               Master::LoadBlock            load = 0)    //!< block load function in case different than or undefined in master
330   {
331     if (!load) load = master.loader();      // load is likely to be different from master.load()
332 
333     // load the extra buffer and size
334     size_t          size;
335     {
336         std::string filename = infilename + "/extra";
337         ::diy::detail::FileBuffer bb(fopen(filename.c_str(), "r"));
338         ::diy::load(bb, size);
339         ::diy::load(bb, extra);
340         extra.reset();
341         fclose(bb.file);
342     }
343 
344     // Get local gids from assigner
345     assigner.set_nblocks(size);
346     std::vector<int> gids;
347     assigner.local_gids(comm.rank(), gids);
348 
349     // Read our blocks;
350     for (unsigned i = 0; i < gids.size(); ++i)
351     {
352         std::string filename = fmt::format("{}/{}", infilename, gids[i]);
353 
354         ::diy::detail::FileBuffer bb(fopen(filename.c_str(), "r"));
355         Link* l = LinkFactory::load(bb);
356         l->fix(assigner);
357         void* b = master.create();
358         load(b, bb);
359         master.add(gids[i], b, l);
360 
361         fclose(bb.file);
362     }
363   }
364 
365   // Functions without the extra buffer, for compatibility with the old code
366   inline
367   void
write_blocks(const std::string & outfilename,const mpi::communicator & comm,Master & master,Master::SaveBlock save)368   write_blocks(const std::string&           outfilename,
369                const mpi::communicator&     comm,
370                Master&                      master,
371                Master::SaveBlock            save)
372   {
373     MemoryBuffer extra;
374     write_blocks(outfilename, comm, master, extra, save);
375   }
376 
377   inline
378   void
read_blocks(const std::string & infilename,const mpi::communicator & comm,StaticAssigner & assigner,Master & master,Master::LoadBlock load=0)379   read_blocks(const std::string&           infilename,
380               const mpi::communicator&     comm,
381               StaticAssigner&              assigner,
382               Master&                      master,
383               Master::LoadBlock            load = 0)
384   {
385     MemoryBuffer extra;     // dummy
386     read_blocks(infilename, comm, assigner, master, extra, load);
387   }
388 } // split
389 } // io
390 } // diy
391 
392 #endif
393