1 //////////////////////////////////////////////////////////////////////////////////////
2 //// This file is distributed under the University of Illinois/NCSA Open Source License.
3 //// See LICENSE file in top directory for details.
4 ////
5 //// Copyright (c) 2016 Jeongnim Kim and QMCPACK developers.
6 ////
7 //// File developed by:
8 ////
9 //// File created by: Miguel Morales, moralessilva2@llnl.gov, Lawrence Livermore National Laboratory
10 ////////////////////////////////////////////////////////////////////////////////////////
11
12 #ifndef QMCPLUSPLUS_AFQMC_HDF5_READERS_HPP
13 #define QMCPLUSPLUS_AFQMC_HDF5_READERS_HPP
14
15
16 #include <cassert>
17 #include <complex>
18 #include <cstdlib>
19 #include <algorithm>
20 #include <utility>
21 #include <vector>
22 #include <numeric>
23 #if defined(USE_MPI)
24 #include <mpi.h>
25 #endif
26
27 #include "Configuration.h"
28 #include "Utilities/FairDivide.h"
29 #include "hdf/hdf_archive.h"
30 #include "Message/CommOperators.h"
31
32 #include "AFQMC/config.0.h"
33
34 #define MORE 1555
35 #define LAST 2777
36
37 namespace qmcplusplus
38 {
39 namespace afqmc
40 {
41 // look for a dataset call ..._descriptor, which will contain integer types
42 // describing the properties of the datasets, e.g.
43 // - transposed?
44 // - sorted?
45 // - etc.
46 //
47 // If this dataset is not found, then assume generic
48 //
49 // SpMatrix_Partition both partitions the matrix into local segments belonging to a
50 // given node, as well as applies cutoffs to values.
51 //
52 // For good efficiency, it is possible to partition the local block
53 // (obtained through split.local(i,j,val))
54 // among equivalent nnodes (same node id within TG and using split.local_split(i,j,val))
55 // and read/sort only this local segment.
56 // After the call to compress (which only does it over the local segment),
57 // assemble the full local partition with a call to Allgather with the corresponding
58 // nodes in COMM_WORLD.
59 // -- minimizes sorting
60 //
61 // Algorith 1:
62 // 1. head_of_nodes read a block from the hdf5 in parallel and sequentially
63 // 2. loop over head_of_nodes, each one
64 // i. bcasts his block to the group
65 // ii. from each bcast, adds to the local list if split.local(i,j)==true
66 // *** overlap the calls to bcast (with Ibcast) and the block processing
67 // 3. after read, compress
68 // 4. after compress, gather segments from other equivalent nodes
69 // 5. If memory is not a problem, further speedup can be obtained by having
70 // other cores also read/bcast/process blocks.
71 // -- possible downside: 1) memory, 2) I/O bottleneck (e.g. BGQ)
72 template<class SpMatrix, class SpMatrix_Partition, class task_group>
multiple_reader_hdf5_SpMat(SpMatrix & SpM,SpMatrix_Partition & split,hdf_archive & dump,std::string name,int nblk,task_group & TG,int n_working_cores)73 inline bool multiple_reader_hdf5_SpMat(SpMatrix& SpM,
74 SpMatrix_Partition& split,
75 hdf_archive& dump,
76 std::string name,
77 int nblk,
78 task_group& TG,
79 int n_working_cores)
80 {
81 int nnodes = TG.getTotalNodes(), nodeid = TG.getNodeID(), coreid = TG.getCoreID();
82 int node_rank;
83
84 // process hdf5 file
85 if (coreid < n_working_cores)
86 {
87 bool need_lock = n_working_cores > 1;
88 MPI_Comm_rank(TG.getHeadOfNodesComm(), &node_rank); // this should be equal to nodeid
89 int fct = 1;
90 #if defined(QMC_COMPLEX)
91 fct = 2;
92 #endif
93
94 std::vector<int> block_size(nblk);
95 if (!dump.readEntry(block_size, name + std::string("_block_sizes")))
96 {
97 app_error() << " Error in multiple_reader_hdf5_SpMat: Problems reading ***_block_sizes dataset. \n";
98 return false;
99 }
100
101 int ntmax = *std::max_element(block_size.begin(), block_size.end());
102 std::vector<IndexType> ivec, ivec2;
103 std::vector<ValueType> vvec, vvec2;
104 ivec.reserve(2 * ntmax);
105 vvec.reserve(ntmax);
106 ivec2.reserve(2 * ntmax);
107 vvec2.reserve(ntmax);
108
109 std::vector<int> pool_dist;
110 // divide blocks over n_working_cores groups
111 FairDivide(nblk, n_working_cores, pool_dist);
112
113 int first_local_block = pool_dist[coreid];
114 int last_local_block = pool_dist[coreid + 1];
115 int nbtot = last_local_block - first_local_block;
116 int niter = nbtot / nnodes + std::min(nbtot % nnodes, 1);
117
118 for (int iter = 0; iter < niter; iter++)
119 {
120 int myblock_number = first_local_block + nnodes * iter + nodeid;
121 int first_block = first_local_block + nnodes * iter;
122 int last_block = std::min(first_block + nnodes, last_local_block);
123 if (myblock_number < last_local_block)
124 {
125 ivec.resize(2 * block_size[myblock_number]);
126 vvec.resize(block_size[myblock_number]);
127 if (!dump.readEntry(ivec, name + std::string("_index_") + std::to_string(myblock_number)))
128 {
129 app_error() << " Error in multiple_reader_hdf5_SpMat: Problems reading ***_index_" << myblock_number
130 << " dataset. \n";
131 return false;
132 }
133 if (!dump.readEntry(vvec, name + std::string("_vals_") + std::to_string(myblock_number)))
134 {
135 app_error() << " Error in multiple_reader_hdf5_SpMat: Problems reading ***_vals_" << myblock_number
136 << " dataset. \n";
137 return false;
138 }
139 }
140 for (int k = first_block, ipr = 0; k < last_block; k++, ipr++)
141 {
142 ivec2.resize(2 * block_size[k]);
143 vvec2.resize(block_size[k]);
144 if (ipr == node_rank)
145 {
146 assert(myblock_number == k);
147 std::copy(ivec.begin(), ivec.end(), ivec2.begin());
148 MPI_Bcast(ivec2.data(), ivec2.size(), MPI_INT, ipr, TG.getHeadOfNodesComm());
149 std::copy(vvec.begin(), vvec.end(), vvec2.begin());
150 MPI_Bcast(vvec2.data(), fct * vvec2.size(), MPI_DOUBLE, ipr, TG.getHeadOfNodesComm());
151 }
152 else
153 {
154 MPI_Bcast(ivec2.data(), ivec2.size(), MPI_INT, ipr, TG.getHeadOfNodesComm());
155 MPI_Bcast(vvec2.data(), fct * vvec2.size(), MPI_DOUBLE, ipr, TG.getHeadOfNodesComm());
156 }
157
158 for (int ik = 0, ikend = block_size[k]; ik < ikend; ik++)
159 if (split.local(ivec2[2 * ik], ivec2[2 * ik + 1], vvec2[ik]))
160 SpM.add(ivec2[2 * ik], ivec2[2 * ik + 1], static_cast<SPValueType>(vvec2[ik]), need_lock);
161 }
162 }
163 }
164
165 // compress matrix
166 SpM.compress(TG.getNodeCommLocal());
167
168 // gather segment
169 //std::vector<int> counts, displ;
170 //if(coreid == 0) {
171 // counts.resize(nnodes);
172 // displ.resize(nnodes+1);
173 //}
174
175 return true;
176 }
177
178 template<class SpMatrix, class SpMatrix_Partition, class task_group>
single_reader_hdf5_SpMat(SpMatrix & SpM,SpMatrix_Partition & split,hdf_archive & dump,std::string name,int nblk,task_group & TG)179 inline bool single_reader_hdf5_SpMat(SpMatrix& SpM,
180 SpMatrix_Partition& split,
181 hdf_archive& dump,
182 std::string name,
183 int nblk,
184 task_group& TG)
185 {
186 return true;
187 }
188
189
190 template<class SpMatrix_Partition, class IVec, class task_group>
multiple_reader_count_entries(hdf_archive & dump,SpMatrix_Partition & split,std::string name,int nblk,bool byRow,IVec & counts,task_group & TG,int n_working_cores)191 inline bool multiple_reader_count_entries(hdf_archive& dump,
192 SpMatrix_Partition& split,
193 std::string name,
194 int nblk,
195 bool byRow,
196 IVec& counts,
197 task_group& TG,
198 int n_working_cores)
199 {
200 double cut = split.getCutoff();
201 int dim;
202 if (byRow)
203 std::tie(dim, std::ignore) = split.getDimensions();
204 else
205 std::tie(std::ignore, dim) = split.getDimensions();
206 counts.resize(dim);
207 std::fill(counts.begin(), counts.end(), 0);
208 assert(sizeof(counts[0]) == sizeof(int));
209
210 int nnodes = TG.getTotalNodes(), nodeid = TG.getNodeID(), coreid = TG.getCoreID();
211 int node_rank;
212
213 // process hdf5 file
214 if (coreid < n_working_cores)
215 {
216 std::vector<int> block_size(nblk);
217 if (!dump.readEntry(block_size, name + std::string("_block_sizes")))
218 {
219 app_error() << " Error in multiple_reader_count_entries: Problems reading ***_block_sizes dataset. \n";
220 return false;
221 }
222
223 int ntmax = *std::max_element(block_size.begin(), block_size.end());
224 std::vector<IndexType> ivec;
225 std::vector<ValueType> vvec;
226 ivec.reserve(2 * ntmax);
227 vvec.reserve(ntmax);
228
229 std::vector<int> pool_dist;
230 // divide blocks over n_working_cores groups
231 FairDivide(nblk, n_working_cores, pool_dist);
232
233 int first_local_block = pool_dist[coreid];
234 int last_local_block = pool_dist[coreid + 1];
235
236 for (int ib = first_local_block; ib < last_local_block; ib++)
237 {
238 if (ib % nnodes != nodeid)
239 continue;
240 ivec.resize(2 * block_size[ib]);
241 vvec.resize(block_size[ib]);
242 if (!dump.readEntry(ivec, name + std::string("_index_") + std::to_string(ib)))
243 {
244 app_error() << " Error in multiple_reader_count_entries: Problems reading ***_index_" << ib << " dataset. \n";
245 return false;
246 }
247 if (!dump.readEntry(vvec, name + std::string("_vals_") + std::to_string(ib)))
248 {
249 app_error() << " Error in multiple_reader_count_entries: Problems reading ***_vals_" << ib << " dataset. \n";
250 return false;
251 }
252 if (byRow)
253 {
254 for (int ik = 0, ikend = block_size[ib]; ik < ikend; ik++)
255 {
256 if (std::abs(vvec[ik]) > cut)
257 {
258 assert(ivec[2 * ik] < dim);
259 counts[ivec[2 * ik]]++;
260 }
261 }
262 }
263 else
264 {
265 for (int ik = 0, ikend = block_size[ib]; ik < ikend; ik++)
266 {
267 if (std::abs(vvec[ik]) > cut)
268 {
269 assert(ivec[2 * ik + 1] < dim);
270 counts[ivec[2 * ik + 1]]++;
271 }
272 }
273 }
274 }
275 }
276
277 IVec a(counts);
278 // FIX FIX FIX: use image communicator, not COMM_WORLD
279 MPI_Allreduce(a.data(), counts.data(), counts.size(), MPI_INT, MPI_SUM, MPI_COMM_WORLD);
280 return true;
281 }
282
283 template<class SpMatrix_Partition, class IVec, class task_group>
single_reader_count_entries(hdf_archive & dump,SpMatrix_Partition & split,std::string name,int nblk,bool byRow,IVec & counts,task_group & TG)284 inline bool single_reader_count_entries(hdf_archive& dump,
285 SpMatrix_Partition& split,
286 std::string name,
287 int nblk,
288 bool byRow,
289 IVec& counts,
290 task_group& TG)
291 {
292 if (TG.getCoreID() != 0)
293 return true;
294
295 int dim;
296 if (byRow)
297 std::tie(dim, std::ignore) = split.getDimensions();
298 else
299 std::tie(std::ignore, dim) = split.getDimensions();
300 double cut = split.getCutoff();
301 counts.resize(dim);
302 std::fill(counts.begin(), counts.end(), 0);
303
304 std::vector<int> block_size(nblk);
305 if (!dump.readEntry(block_size, name + std::string("_block_sizes")))
306 {
307 app_error() << " Error in single_reader_count_entries: Problems reading ***_block_sizes dataset. \n";
308 return false;
309 }
310
311 int ntmax = *std::max_element(block_size.begin(), block_size.end());
312 std::vector<IndexType> ivec;
313 std::vector<ValueType> vvec;
314 ivec.reserve(2 * ntmax);
315 vvec.reserve(ntmax);
316
317 for (int ib = 0; ib < nblk; ib++)
318 {
319 ivec.resize(2 * block_size[ib]);
320 vvec.resize(block_size[ib]);
321 if (!dump.readEntry(ivec, name + std::string("_index_") + std::to_string(ib)))
322 {
323 app_error() << " Error in single_reader_count_entries: Problems reading ***_index_" << ib << " dataset. \n";
324 return false;
325 }
326 if (!dump.readEntry(vvec, name + std::string("_vals_") + std::to_string(ib)))
327 {
328 app_error() << " Error in single_reader_count_entries: Problems reading ***_vals_" << ib << " dataset. \n";
329 return false;
330 }
331 if (byRow)
332 {
333 for (int ik = 0, ikend = block_size[ib]; ik < ikend; ik++)
334 {
335 if (std::abs(vvec[ik]) > cut)
336 {
337 assert(ivec[2 * ik] < dim);
338 counts[ivec[2 * ik]]++;
339 }
340 }
341 }
342 else
343 {
344 for (int ik = 0, ikend = block_size[ib]; ik < ikend; ik++)
345 {
346 if (std::abs(vvec[ik]) > cut)
347 {
348 assert(ivec[2 * ik + 1] < dim);
349 counts[ivec[2 * ik + 1]]++;
350 }
351 }
352 }
353 }
354
355 return true;
356 }
357
358 template<class SpMatrix, class SpMatrix_Partition, class task_group>
read_hdf5_SpMat(SpMatrix & SpM,SpMatrix_Partition & split,hdf_archive & dump,std::string name,int nblk,task_group & TG,bool parallel_read=true,int n_working_cores=1)359 inline bool read_hdf5_SpMat(SpMatrix& SpM,
360 SpMatrix_Partition& split,
361 hdf_archive& dump,
362 std::string name,
363 int nblk,
364 task_group& TG,
365 bool parallel_read = true,
366 int n_working_cores = 1)
367 {
368 assert(n_working_cores >= 1);
369 if (parallel_read)
370 return multiple_reader_hdf5_SpMat(SpM, split, dump, name, nblk, TG, n_working_cores);
371 else
372 return single_reader_hdf5_SpMat(SpM, split, dump, name, nblk, TG);
373 }
374
375 template<class SpMatrix_Partition, class IVec, class task_group>
count_entries_hdf5_SpMat(hdf_archive & dump,SpMatrix_Partition & split,std::string name,int nblk,bool byRow,IVec & counts,task_group & TG,bool parallel_read=true,int n_working_cores=1)376 inline bool count_entries_hdf5_SpMat(hdf_archive& dump,
377 SpMatrix_Partition& split,
378 std::string name,
379 int nblk,
380 bool byRow,
381 IVec& counts,
382 task_group& TG,
383 bool parallel_read = true,
384 int n_working_cores = 1)
385 {
386 assert(n_working_cores >= 1);
387 if (parallel_read)
388 return multiple_reader_count_entries(dump, split, name, nblk, byRow, counts, TG, n_working_cores);
389 else
390 return single_reader_count_entries(dump, split, name, nblk, byRow, counts, TG);
391 }
392
393 template<class SpMatrix, class task_group>
write_hdf5_SpMat(SpMatrix & SpM,hdf_archive & dump,std::string name,int nterms_per_blk,task_group & TG)394 inline std::tuple<int, int> write_hdf5_SpMat(SpMatrix& SpM,
395 hdf_archive& dump,
396 std::string name,
397 int nterms_per_blk,
398 task_group& TG)
399 {
400 std::size_t nblocks = 0, ntot = 0;
401 if (TG.getTGNumber() > 0)
402 return std::make_tuple(nblocks, ntot);
403 int nnodes_per_TG = TG.getNGroupsPerTG();
404 int node_number = TG.getLocalGroupNumber();
405 int core_rank = TG.getCoreRank();
406 if (core_rank == 0 && node_number == 0)
407 {
408 // get ranks of node heads on TG
409 std::vector<int> ranks;
410 int pos = 0;
411 if (nnodes_per_TG > 1)
412 TG.getRanksOfRoots(ranks, pos);
413 else
414 {
415 ranks.push_back(0);
416 }
417
418 std::size_t sz = SpM.size();
419 int nb_loc = int(sz / std::size_t(nterms_per_blk) + std::min(sz % std::size_t(nterms_per_blk), std::size_t(1)));
420 if (nnodes_per_TG > 1)
421 {
422 int nt;
423 MPI_Reduce(&nb_loc, &nt, 1, MPI_INT, MPI_SUM, 0, TG.getTGCOMM());
424 nblocks = std::size_t(nt);
425 }
426 else
427 {
428 nblocks = nb_loc;
429 }
430
431 std::vector<int> bsize;
432 bsize.reserve(nblocks);
433
434 std::vector<IndexType> ivec;
435 std::vector<ValueType> vvec;
436 ivec.reserve(2 * nterms_per_blk);
437 vvec.reserve(nterms_per_blk);
438
439 typename SpMatrix::int_iterator col = SpM.cols_begin();
440 typename SpMatrix::int_iterator row = SpM.rows_begin();
441 typename SpMatrix::iterator val = SpM.vals_begin();
442
443 // do mine first
444 int cnt = 0, nterms;
445 IndexType nblk = 0;
446 for (int i = 0; i < nb_loc; i++)
447 {
448 if ((sz - std::size_t(cnt)) < std::size_t(nterms_per_blk))
449 nterms = int(sz - std::size_t(cnt));
450 else
451 nterms = nterms_per_blk;
452 ivec.clear();
453 vvec.clear();
454 for (int k = 0; k < nterms; k++, cnt++, row++, col++, val++)
455 {
456 ivec.push_back(*row);
457 ivec.push_back(*col);
458 vvec.push_back(static_cast<ValueType>(*val));
459 }
460 bsize.push_back(nterms);
461 dump.write(ivec, std::string("Spvn_index_") + std::to_string(nblk));
462 dump.write(vvec, std::string("Spvn_vals_") + std::to_string(nblk));
463 nblk++;
464 }
465
466 ntot = sz;
467 MPI_Status st;
468 for (int rk = 1; rk < nnodes_per_TG; rk++)
469 {
470 bool done = false;
471 do
472 {
473 // resize to receive
474 ivec.resize(2 * nterms_per_blk);
475 vvec.resize(nterms_per_blk);
476 #if defined(QMC_COMPLEX)
477 MPI_Recv(vvec.data(), 2 * nterms_per_blk, MPI_DOUBLE, ranks[rk], MPI_ANY_TAG, TG.getTGCOMM(), &st);
478 #else
479 MPI_Recv(vvec.data(), nterms_per_blk, MPI_DOUBLE, ranks[rk], MPI_ANY_TAG, TG.getTGCOMM(), &st);
480 #endif
481 MPI_Recv(ivec.data(), 2 * nterms_per_blk, MPI_INT, ranks[rk], MPI_ANY_TAG, TG.getTGCOMM(), &st);
482
483 int nt_;
484 MPI_Get_count(&st, MPI_INT, &nt_);
485 nterms = std::size_t(nt_ / 2);
486 if (st.MPI_TAG == MORE)
487 {
488 assert(nterms == nterms_per_blk);
489 }
490 else if (st.MPI_TAG == LAST)
491 {
492 assert(nterms <= nterms_per_blk);
493 done = true;
494 }
495 else
496 {
497 APP_ABORT(" Error: This should not happen. \n\n\n");
498 }
499
500 ntot += std::size_t(nterms);
501 bsize.push_back(nterms);
502 // resize to write
503 ivec.resize(2 * nterms);
504 vvec.resize(nterms);
505 dump.write(ivec, std::string("Spvn_index_") + std::to_string(nblk));
506 dump.write(vvec, std::string("Spvn_vals_") + std::to_string(nblk));
507 nblk++;
508 } while (!done);
509 }
510
511 dump.write(bsize, "Spvn_block_sizes");
512 }
513 else if (nnodes_per_TG > 1 && core_rank == 0)
514 {
515 std::size_t sz = SpM.size();
516 int nt, nb_loc = int(sz / nterms_per_blk + std::min(sz % nterms_per_blk, std::size_t(1)));
517 MPI_Reduce(&nb_loc, &nt, 1, MPI_INT, MPI_SUM, 0, TG.getTGCOMM());
518
519 std::vector<IndexType> ivec;
520 std::vector<ValueType> vvec;
521 ivec.reserve(2 * nterms_per_blk);
522 vvec.reserve(nterms_per_blk);
523
524 typename SpMatrix::int_iterator col = SpM.cols_begin();
525 typename SpMatrix::int_iterator row = SpM.rows_begin();
526 typename SpMatrix::iterator val = SpM.vals_begin();
527
528 int cnt = 0, nterms;
529 IndexType nblk = 0;
530 for (int i = 0; i < nb_loc; i++)
531 {
532 if ((sz - std::size_t(cnt)) < std::size_t(nterms_per_blk))
533 nterms = int(sz - std::size_t(cnt));
534 else
535 nterms = nterms_per_blk;
536 ivec.clear();
537 vvec.clear();
538 for (int k = 0; k < nterms; k++, cnt++, row++, col++, val++)
539 {
540 ivec.push_back(*row);
541 ivec.push_back(*col);
542 vvec.push_back(static_cast<ValueType>(*val));
543 }
544 #if defined(QMC_COMPLEX)
545 MPI_Send(vvec.data(), 2 * nterms, MPI_DOUBLE, 0, (i == nb_loc - 1) ? (LAST) : (MORE), TG.getTGCOMM());
546 #else
547 MPI_Send(vvec.data(), nterms, MPI_DOUBLE, 0, (i == nb_loc - 1) ? (LAST) : (MORE), TG.getTGCOMM());
548 #endif
549 MPI_Send(ivec.data(), 2 * nterms, MPI_INT, 0, (i == nb_loc - 1) ? (LAST) : (MORE), TG.getTGCOMM());
550 }
551 }
552 else if (nnodes_per_TG > 1)
553 {
554 int nb_loc = 0, nt;
555 MPI_Reduce(&nb_loc, &nt, 1, MPI_INT, MPI_SUM, 0, TG.getTGCOMM());
556 }
557 return std::make_tuple(nblocks, ntot);
558 }
559
560 } // namespace afqmc
561
562 } // namespace qmcplusplus
563
564 #endif
565