1 #ifndef DIY_IO_BLOCK_HPP 2 #define DIY_IO_BLOCK_HPP 3 4 #include <string> 5 #include <algorithm> 6 #include <stdexcept> 7 8 #include "../mpi.hpp" 9 #include "../assigner.hpp" 10 #include "../master.hpp" 11 #include "../storage.hpp" 12 #include "../log.hpp" 13 #include "utils.hpp" 14 15 // Read and write collections of blocks using MPI-IO 16 namespace diy 17 { 18 namespace io 19 { 20 namespace detail 21 { 22 typedef mpi::io::offset offset_t; 23 24 struct GidOffsetCount 25 { GidOffsetCountdiy::io::detail::GidOffsetCount26 GidOffsetCount(): // need to initialize a vector of given size 27 gid(-1), offset(0), count(0) {} 28 GidOffsetCountdiy::io::detail::GidOffsetCount29 GidOffsetCount(int gid_, offset_t offset_, offset_t count_): 30 gid(gid_), offset(offset_), count(count_) {} 31 operator <diy::io::detail::GidOffsetCount32 bool operator<(const GidOffsetCount& other) const { return gid < other.gid; } 33 34 int gid; 35 offset_t offset; 36 offset_t count; 37 }; 38 } 39 } 40 41 // Serialize GidOffsetCount explicitly, to avoid alignment and uninitialized data issues 42 // (to get identical output files given the same block input) 43 template<> 44 struct Serialization<io::detail::GidOffsetCount> 45 { 46 typedef io::detail::GidOffsetCount GidOffsetCount; 47 savediy::Serialization48 static void save(BinaryBuffer& bb, const GidOffsetCount& x) 49 { 50 diy::save(bb, x.gid); 51 diy::save(bb, x.offset); 52 diy::save(bb, x.count); 53 } 54 loaddiy::Serialization55 static void load(BinaryBuffer& bb, GidOffsetCount& x) 56 { 57 diy::load(bb, x.gid); 58 diy::load(bb, x.offset); 59 diy::load(bb, x.count); 60 } 61 }; 62 63 namespace io 64 { 65 /** 66 * \ingroup IO 67 * \brief Write blocks to storage collectively in one shared file 68 */ 69 inline 70 void write_blocks(const std::string & outfilename,const mpi::communicator & comm,Master & master,const MemoryBuffer & extra=MemoryBuffer (),Master::SaveBlock save=0)71 write_blocks(const std::string& outfilename, //!< output file name 72 const mpi::communicator& comm, //!< communicator 73 Master& master, //!< master object 74 const MemoryBuffer& extra = MemoryBuffer(),//!< user-defined metadata for file header; meaningful only on rank == 0 75 Master::SaveBlock save = 0) //!< block save function in case different than or undefined in the master 76 { 77 if (!save) save = master.saver(); // save is likely to be different from master.save() 78 79 typedef detail::offset_t offset_t; 80 typedef detail::GidOffsetCount GidOffsetCount; 81 82 unsigned size = master.size(), 83 max_size, min_size; 84 mpi::all_reduce(comm, size, max_size, mpi::maximum<unsigned>()); 85 mpi::all_reduce(comm, size, min_size, mpi::minimum<unsigned>()); 86 87 // truncate the file 88 if (comm.rank() == 0) 89 diy::io::utils::truncate(outfilename.c_str(), 0); 90 91 mpi::io::file f(comm, outfilename, mpi::io::file::wronly | mpi::io::file::create); 92 93 offset_t start = 0, shift; 94 std::vector<GidOffsetCount> offset_counts; 95 for (unsigned i = 0; i < max_size; ++i) 96 { 97 offset_t count = 0, 98 offset; 99 if (i < size) 100 { 101 // get the block from master and serialize it 102 const void* block = master.get(i); 103 MemoryBuffer bb; 104 LinkFactory::save(bb, master.link(i)); 105 save(block, bb); 106 count = bb.buffer.size(); 107 mpi::scan(comm, count, offset, std::plus<offset_t>()); 108 offset += start - count; 109 mpi::all_reduce(comm, count, shift, std::plus<offset_t>()); 110 start += shift; 111 112 if (i < min_size) // up to min_size, we can do collective IO 113 f.write_at_all(offset, bb.buffer); 114 else 115 f.write_at(offset, bb.buffer); 116 117 offset_counts.push_back(GidOffsetCount(master.gid(i), offset, count)); 118 } else 119 { 120 // matching global operations 121 mpi::scan(comm, count, offset, std::plus<offset_t>()); 122 mpi::all_reduce(comm, count, shift, std::plus<offset_t>()); 123 124 // -1 indicates that there is no block written here from this rank 125 offset_counts.push_back(GidOffsetCount(-1, offset, count)); 126 } 127 } 128 129 if (comm.rank() == 0) 130 { 131 // round-about way of gather vector of vectors of GidOffsetCount to avoid registering a new mpi datatype 132 std::vector< std::vector<char> > gathered_offset_count_buffers; 133 MemoryBuffer oc_buffer; diy::save(oc_buffer, offset_counts); 134 mpi::gather(comm, oc_buffer.buffer, gathered_offset_count_buffers, 0); 135 136 std::vector<GidOffsetCount> all_offset_counts; 137 for (unsigned i = 0; i < gathered_offset_count_buffers.size(); ++i) 138 { 139 MemoryBuffer per_rank_oc_buffer; per_rank_oc_buffer.buffer.swap(gathered_offset_count_buffers[i]); 140 std::vector<GidOffsetCount> per_rank_offset_counts; 141 diy::load(per_rank_oc_buffer, per_rank_offset_counts); 142 for (unsigned j = 0; j < per_rank_offset_counts.size(); ++j) 143 if (per_rank_offset_counts[j].gid != -1) 144 all_offset_counts.push_back(per_rank_offset_counts[j]); 145 } 146 std::sort(all_offset_counts.begin(), all_offset_counts.end()); // sorts by gid 147 148 MemoryBuffer bb; 149 diy::save(bb, all_offset_counts); 150 diy::save(bb, extra); 151 size_t footer_size = bb.size(); 152 diy::save(bb, footer_size); 153 154 // find footer_offset as the max of (offset + count) 155 offset_t footer_offset = 0; 156 for (unsigned i = 0; i < all_offset_counts.size(); ++i) 157 { 158 offset_t end = all_offset_counts[i].offset + all_offset_counts[i].count; 159 if (end > footer_offset) 160 footer_offset = end; 161 } 162 f.write_at(footer_offset, bb.buffer); 163 } else 164 { 165 MemoryBuffer oc_buffer; diy::save(oc_buffer, offset_counts); 166 mpi::gather(comm, oc_buffer.buffer, 0); 167 } 168 } 169 170 /** 171 * \ingroup IO 172 * \brief Read blocks from storage collectively from one shared file 173 */ 174 inline 175 void read_blocks(const std::string & infilename,const mpi::communicator & comm,StaticAssigner & assigner,Master & master,MemoryBuffer & extra,Master::LoadBlock load=0)176 read_blocks(const std::string& infilename, //!< input file name 177 const mpi::communicator& comm, //!< communicator 178 StaticAssigner& assigner, //!< assigner object 179 Master& master, //!< master object 180 MemoryBuffer& extra, //!< user-defined metadata in file header 181 Master::LoadBlock load = 0) //!< load block function in case different than or unefined in the master 182 { 183 if (!load) load = master.loader(); // load is likely to be different from master.load() 184 185 typedef detail::offset_t offset_t; 186 typedef detail::GidOffsetCount GidOffsetCount; 187 188 mpi::io::file f(comm, infilename, mpi::io::file::rdonly); 189 190 offset_t footer_offset = f.size() - sizeof(size_t); 191 size_t footer_size; 192 193 // Read the size 194 f.read_at_all(footer_offset, (char*) &footer_size, sizeof(footer_size)); 195 196 // Read all_offset_counts 197 footer_offset -= footer_size; 198 MemoryBuffer footer; 199 footer.buffer.resize(footer_size); 200 f.read_at_all(footer_offset, footer.buffer); 201 202 std::vector<GidOffsetCount> all_offset_counts; 203 diy::load(footer, all_offset_counts); 204 diy::load(footer, extra); 205 extra.reset(); 206 207 // Get local gids from assigner 208 size_t size = all_offset_counts.size(); 209 assigner.set_nblocks(size); 210 std::vector<int> gids; 211 assigner.local_gids(comm.rank(), gids); 212 213 for (unsigned i = 0; i < gids.size(); ++i) 214 { 215 if (gids[i] != all_offset_counts[gids[i]].gid) 216 get_logger()->warn("gids don't match in diy::io::read_blocks(), {} vs {}", 217 gids[i], all_offset_counts[gids[i]].gid); 218 219 offset_t offset = all_offset_counts[gids[i]].offset, 220 count = all_offset_counts[gids[i]].count; 221 MemoryBuffer bb; 222 bb.buffer.resize(count); 223 f.read_at(offset, bb.buffer); 224 Link* l = LinkFactory::load(bb); 225 l->fix(assigner); 226 void* b = master.create(); 227 load(b, bb); 228 master.add(gids[i], b, l); 229 } 230 } 231 232 233 // Functions without the extra buffer, for compatibility with the old code 234 inline 235 void write_blocks(const std::string & outfilename,const mpi::communicator & comm,Master & master,Master::SaveBlock save)236 write_blocks(const std::string& outfilename, 237 const mpi::communicator& comm, 238 Master& master, 239 Master::SaveBlock save) 240 { 241 MemoryBuffer extra; 242 write_blocks(outfilename, comm, master, extra, save); 243 } 244 245 inline 246 void read_blocks(const std::string & infilename,const mpi::communicator & comm,StaticAssigner & assigner,Master & master,Master::LoadBlock load=0)247 read_blocks(const std::string& infilename, 248 const mpi::communicator& comm, 249 StaticAssigner& assigner, 250 Master& master, 251 Master::LoadBlock load = 0) 252 { 253 MemoryBuffer extra; // dummy 254 read_blocks(infilename, comm, assigner, master, extra, load); 255 } 256 257 namespace split 258 { 259 /** 260 * \ingroup IO 261 * \brief Write blocks to storage independently in one file per process 262 */ 263 inline 264 void write_blocks(const std::string & outfilename,const mpi::communicator & comm,Master & master,const MemoryBuffer & extra=MemoryBuffer (),Master::SaveBlock save=0)265 write_blocks(const std::string& outfilename, //!< output file name 266 const mpi::communicator& comm, //!< communicator 267 Master& master, //!< master object 268 const MemoryBuffer& extra = MemoryBuffer(),//!< user-defined metadata for file header; meaningful only on rank == 0 269 Master::SaveBlock save = 0) //!< block save function in case different than or undefined in master 270 { 271 if (!save) save = master.saver(); // save is likely to be different from master.save() 272 273 bool proceed = false; 274 size_t size = 0; 275 if (comm.rank() == 0) 276 { 277 if (diy::io::utils::is_directory(outfilename)) 278 proceed = true; 279 else if (diy::io::utils::make_directory(outfilename)) 280 proceed = true; 281 282 mpi::broadcast(comm, proceed, 0); 283 mpi::reduce(comm, (size_t) master.size(), size, 0, std::plus<size_t>()); 284 } else 285 { 286 mpi::broadcast(comm, proceed, 0); 287 mpi::reduce(comm, (size_t) master.size(), 0, std::plus<size_t>()); 288 } 289 290 if (!proceed) 291 throw std::runtime_error("Cannot access or create directory: " + outfilename); 292 293 for (int i = 0; i < (int)master.size(); ++i) 294 { 295 const void* block = master.get(i); 296 297 std::string filename = fmt::format("{}/{}", outfilename, master.gid(i)); 298 299 ::diy::detail::FileBuffer bb(fopen(filename.c_str(), "w")); 300 301 LinkFactory::save(bb, master.link(i)); 302 save(block, bb); 303 304 fclose(bb.file); 305 } 306 307 if (comm.rank() == 0) 308 { 309 // save the extra buffer 310 std::string filename = outfilename + "/extra"; 311 ::diy::detail::FileBuffer bb(fopen(filename.c_str(), "w")); 312 ::diy::save(bb, size); 313 ::diy::save(bb, extra); 314 fclose(bb.file); 315 } 316 } 317 318 /** 319 * \ingroup IO 320 * \brief Read blocks from storage independently from one file per process 321 */ 322 inline 323 void read_blocks(const std::string & infilename,const mpi::communicator & comm,StaticAssigner & assigner,Master & master,MemoryBuffer & extra,Master::LoadBlock load=0)324 read_blocks(const std::string& infilename, //!< input file name 325 const mpi::communicator& comm, //!< communicator 326 StaticAssigner& assigner, //!< assigner object 327 Master& master, //!< master object 328 MemoryBuffer& extra, //!< user-defined metadata in file header 329 Master::LoadBlock load = 0) //!< block load function in case different than or undefined in master 330 { 331 if (!load) load = master.loader(); // load is likely to be different from master.load() 332 333 // load the extra buffer and size 334 size_t size; 335 { 336 std::string filename = infilename + "/extra"; 337 ::diy::detail::FileBuffer bb(fopen(filename.c_str(), "r")); 338 ::diy::load(bb, size); 339 ::diy::load(bb, extra); 340 extra.reset(); 341 fclose(bb.file); 342 } 343 344 // Get local gids from assigner 345 assigner.set_nblocks(size); 346 std::vector<int> gids; 347 assigner.local_gids(comm.rank(), gids); 348 349 // Read our blocks; 350 for (unsigned i = 0; i < gids.size(); ++i) 351 { 352 std::string filename = fmt::format("{}/{}", infilename, gids[i]); 353 354 ::diy::detail::FileBuffer bb(fopen(filename.c_str(), "r")); 355 Link* l = LinkFactory::load(bb); 356 l->fix(assigner); 357 void* b = master.create(); 358 load(b, bb); 359 master.add(gids[i], b, l); 360 361 fclose(bb.file); 362 } 363 } 364 365 // Functions without the extra buffer, for compatibility with the old code 366 inline 367 void write_blocks(const std::string & outfilename,const mpi::communicator & comm,Master & master,Master::SaveBlock save)368 write_blocks(const std::string& outfilename, 369 const mpi::communicator& comm, 370 Master& master, 371 Master::SaveBlock save) 372 { 373 MemoryBuffer extra; 374 write_blocks(outfilename, comm, master, extra, save); 375 } 376 377 inline 378 void read_blocks(const std::string & infilename,const mpi::communicator & comm,StaticAssigner & assigner,Master & master,Master::LoadBlock load=0)379 read_blocks(const std::string& infilename, 380 const mpi::communicator& comm, 381 StaticAssigner& assigner, 382 Master& master, 383 Master::LoadBlock load = 0) 384 { 385 MemoryBuffer extra; // dummy 386 read_blocks(infilename, comm, assigner, master, extra, load); 387 } 388 } // split 389 } // io 390 } // diy 391 392 #endif 393