1 //////////////////////////////////////////////////////////////////////////////////////
2 //// This file is distributed under the University of Illinois/NCSA Open Source License.
3 //// See LICENSE file in top directory for details.
4 ////
5 //// Copyright (c) 2016 Jeongnim Kim and QMCPACK developers.
6 ////
7 //// File developed by:
8 ////
9 //// File created by: Miguel Morales, moralessilva2@llnl.gov, Lawrence Livermore National Laboratory
10 ////////////////////////////////////////////////////////////////////////////////////////
11
12 #ifndef QMCPLUSPLUS_AFQMC_HDF5_READERS_HPP
13 #define QMCPLUSPLUS_AFQMC_HDF5_READERS_HPP
14
15
16 #include <cassert>
17 #include <complex>
18 #include <cstdlib>
19 #include <algorithm>
20 #include <utility>
21 #include <vector>
22 #include <numeric>
23 #if defined(USE_MPI)
24 #include <mpi.h>
25 #endif
26
27 #include "Configuration.h"
28 #include "Utilities/FairDivide.h"
29 #include "hdf/hdf_archive.h"
30 #include "Message/CommOperators.h"
31
32 #include "AFQMC/config.0.h"
33
34 #if defined(ADD_TESTS_TIMERS)
35 #include "AFQMC/Utilities/myTimer.h"
36 #endif
37
38 #define MORE 1555
39 #define LAST 2777
40
41 namespace qmcplusplus
42 {
43 namespace afqmc
44 {
45 // look for a dataset call ..._descriptor, which will contain integer types
46 // describing the properties of the datasets, e.g.
47 // - transposed?
48 // - sorted?
49 // - etc.
50 //
51 // If this dataset is not found, then assume generic
52 //
53 // SpMatrix_Partition both partitions the matrix into local segments belonging to a
54 // given node, as well as applies cutoffs to values.
55 //
56 // For good efficiency, it is possible to partition the local block
57 // (obtained through split.local(i,j,val))
58 // among equivalent nnodes (same node id within TG and using split.local_split(i,j,val))
59 // and read/sort only this local segment.
60 // After the call to compress (which only does it over the local segment),
61 // assemble the full local partition with a call to Allgather with the corresponding
62 // nodes in COMM_WORLD.
63 // -- minimizes sorting
64 //
65 // Algorith 1:
66 // 1. head_of_nodes read a block from the hdf5 in parallel and sequentially
67 // 2. loop over head_of_nodes, each one
68 // i. bcasts his block to the group
69 // ii. from each bcast, adds to the local list if split.local(i,j)==true
70 // *** overlap the calls to bcast (with Ibcast) and the block processing
71 // 3. after read, compress
72 // 4. after compress, gather segments from other equivalent nodes
73 // 5. If memory is not a problem, further speedup can be obtained by having
74 // other cores also read/bcast/process blocks.
75 // -- possible downside: 1) memory, 2) I/O bottleneck (e.g. BGQ)
76 template<class SpMatrix, class SpMatrix_Partition, class task_group>
multiple_reader_hdf5_SpMat(SpMatrix & SpM,SpMatrix_Partition & split,hdf_archive & dump,int nblk,task_group & TG,int n_working_cores)77 inline bool multiple_reader_hdf5_SpMat(SpMatrix& SpM,
78 SpMatrix_Partition& split,
79 hdf_archive& dump,
80 int nblk,
81 task_group& TG,
82 int n_working_cores)
83 {
84 int nnodes = TG.getTotalNodes(), nodeid = TG.getNodeID(), coreid = TG.getCoreID();
85 #if defined(ADD_TESTS_TIMERS)
86 myTimer Timer;
87 #endif
88
89 // process hdf5 file
90 if (coreid < n_working_cores)
91 {
92 bool need_lock = n_working_cores > 1;
93
94 #if defined(ADD_TESTS_TIMERS)
95 Timer.start("Gen");
96 #endif
97
98 std::vector<int> block_size(nblk);
99 if (!dump.readEntry(block_size, std::string("block_sizes")))
100 {
101 app_error() << " Error in multiple_reader_hdf5_SpMat: Problems reading ***_block_sizes dataset. \n";
102 return false;
103 }
104
105 #if defined(ADD_TESTS_TIMERS)
106 Timer.stop("Gen");
107 app_log() << " multiple_reader_hdf5_SpMat::read blocks " << Timer.total("Gen") << "\n";
108 #endif
109
110 int nmax = 100000;
111 std::vector<std::tuple<int, int, SPValueType>> buff;
112 buff.reserve(nmax);
113
114 int ntmax = *std::max_element(block_size.begin(), block_size.end());
115 std::vector<IndexType> ivec, ivec2;
116 std::vector<ValueType> vvec, vvec2;
117 ivec.reserve(2 * ntmax);
118 vvec.reserve(ntmax);
119 ivec2.reserve(2 * ntmax);
120 vvec2.reserve(ntmax);
121
122 std::vector<int> pool_dist;
123 // divide blocks over n_working_cores groups
124 FairDivide(nblk, n_working_cores, pool_dist);
125
126 int first_local_block = pool_dist[coreid];
127 int last_local_block = pool_dist[coreid + 1];
128 int nbtot = last_local_block - first_local_block;
129 int niter = nbtot / nnodes + std::min(nbtot % nnodes, 1);
130
131 #if defined(ADD_TESTS_TIMERS)
132 Timer.reset("Gen");
133 Timer.start("Gen");
134 #endif
135
136 for (int iter = 0; iter < niter; iter++)
137 {
138 int myblock_number = first_local_block + nnodes * iter + nodeid;
139 int first_block = first_local_block + nnodes * iter;
140 int last_block = std::min(first_block + nnodes, last_local_block);
141 if (myblock_number < last_local_block)
142 {
143 ivec.resize(2 * block_size[myblock_number]);
144 vvec.resize(block_size[myblock_number]);
145 if (!dump.readEntry(ivec, std::string("index_") + std::to_string(myblock_number)))
146 {
147 app_error() << " Error in multiple_reader_hdf5_SpMat: Problems reading ***_index_" << myblock_number
148 << " dataset. \n";
149 return false;
150 }
151 if (!dump.readEntry(vvec, std::string("vals_") + std::to_string(myblock_number)))
152 {
153 app_error() << " Error in multiple_reader_hdf5_SpMat: Problems reading ***_vals_" << myblock_number
154 << " dataset. \n";
155 return false;
156 }
157 }
158 for (int k = first_block, ipr = 0; k < last_block; k++, ipr++)
159 {
160 ivec2.resize(2 * block_size[k]);
161 vvec2.resize(block_size[k]);
162 if (ipr == nodeid)
163 {
164 assert(myblock_number == k);
165 std::copy(ivec.begin(), ivec.end(), ivec2.begin());
166 TG.Cores().broadcast_n(ivec2.begin(), ivec2.size(), ipr);
167 std::copy(vvec.begin(), vvec.end(), vvec2.begin());
168 TG.Cores().broadcast_n(vvec2.begin(), vvec2.size(), ipr);
169 }
170 else
171 {
172 TG.Cores().broadcast_n(ivec2.begin(), ivec2.size(), ipr);
173 TG.Cores().broadcast_n(vvec2.begin(), vvec2.size(), ipr);
174 }
175
176 for (int ik = 0, ikend = block_size[k]; ik < ikend; ik++)
177 if (split.local(ivec2[2 * ik], ivec2[2 * ik + 1], vvec2[ik]))
178 {
179 buff.emplace_back(std::make_tuple(ivec2[2 * ik], ivec2[2 * ik + 1], static_cast<SPValueType>(vvec2[ik])));
180 if (buff.size() == nmax)
181 {
182 SpM.add(buff, need_lock);
183 buff.clear();
184 }
185 }
186 }
187 }
188 if (buff.size() > 0)
189 {
190 SpM.add(buff, need_lock);
191 buff.clear();
192 }
193 }
194
195 #if defined(ADD_TESTS_TIMERS)
196 MPI_Barrier(&(TG.Node()));
197 Timer.stop("Gen");
198 app_log() << " multiple_reader_hdf5_SpMat::read data " << Timer.total("Gen") << "\n";
199 Timer.reset("Gen");
200 Timer.start("Gen");
201 #endif
202
203 // compress matrix
204 SpM.compress(&(TG.Node()));
205
206 #if defined(ADD_TESTS_TIMERS)
207 Timer.stop("Gen");
208 app_log() << " multiple_reader_hdf5_SpMat::compress " << Timer.total("Gen") << "\n";
209 #endif
210
211 // gather segment
212 //std::vector<int> counts, displ;
213 //if(coreid == 0) {
214 // counts.resize(nnodes);
215 // displ.resize(nnodes+1);
216 //}
217
218 return true;
219 }
220
221 template<class SpMatrix, class SpMatrix_Partition, class task_group>
single_reader_hdf5_SpMat(SpMatrix & SpM,SpMatrix_Partition & split,hdf_archive & dump,int nblk,task_group & TG)222 inline bool single_reader_hdf5_SpMat(SpMatrix& SpM,
223 SpMatrix_Partition& split,
224 hdf_archive& dump,
225 int nblk,
226 task_group& TG)
227 {
228 return true;
229 }
230
231
232 template<class SpMatrix_Partition, class IVec, class task_group>
multiple_reader_count_entries(hdf_archive & dump,SpMatrix_Partition & split,int nblk,bool byRow,IVec & counts,task_group & TG,int n_working_cores)233 inline bool multiple_reader_count_entries(hdf_archive& dump,
234 SpMatrix_Partition& split,
235 int nblk,
236 bool byRow,
237 IVec& counts,
238 task_group& TG,
239 int n_working_cores)
240 {
241 double cut = split.getCutoff();
242 int dim;
243 if (byRow)
244 std::tie(dim, std::ignore) = split.getDimensions();
245 else
246 std::tie(std::ignore, dim) = split.getDimensions();
247 counts.resize(dim);
248 std::fill(counts.begin(), counts.end(), 0);
249 assert(sizeof(counts[0]) == sizeof(int));
250
251 int nnodes = TG.getTotalNodes(), nodeid = TG.getNodeID(), coreid = TG.getCoreID();
252
253 // process hdf5 file
254 if (coreid < n_working_cores)
255 {
256 std::vector<int> block_size(nblk);
257 if (!dump.readEntry(block_size, std::string("block_sizes")))
258 {
259 app_error() << " Error in multiple_reader_count_entries: Problems reading ***_block_sizes dataset. \n";
260 return false;
261 }
262
263 int ntmax = *std::max_element(block_size.begin(), block_size.end());
264 std::vector<IndexType> ivec;
265 std::vector<ValueType> vvec;
266 ivec.reserve(2 * ntmax);
267 vvec.reserve(ntmax);
268
269 std::vector<int> pool_dist;
270 // divide blocks over n_working_cores groups
271 FairDivide(nblk, n_working_cores, pool_dist);
272
273 int first_local_block = pool_dist[coreid];
274 int last_local_block = pool_dist[coreid + 1];
275
276 for (int ib = first_local_block; ib < last_local_block; ib++)
277 {
278 if (ib % nnodes != nodeid)
279 continue;
280 ivec.resize(2 * block_size[ib]);
281 vvec.resize(block_size[ib]);
282 if (!dump.readEntry(ivec, std::string("index_") + std::to_string(ib)))
283 {
284 app_error() << " Error in multiple_reader_count_entries: Problems reading ***_index_" << ib << " dataset. \n";
285 return false;
286 }
287 if (!dump.readEntry(vvec, std::string("vals_") + std::to_string(ib)))
288 {
289 app_error() << " Error in multiple_reader_count_entries: Problems reading ***_vals_" << ib << " dataset. \n";
290 return false;
291 }
292 if (byRow)
293 {
294 for (int ik = 0, ikend = block_size[ib]; ik < ikend; ik++)
295 {
296 if (std::abs(vvec[ik]) > cut)
297 {
298 assert(ivec[2 * ik] < dim);
299 counts[ivec[2 * ik]]++;
300 }
301 }
302 }
303 else
304 {
305 for (int ik = 0, ikend = block_size[ib]; ik < ikend; ik++)
306 {
307 if (std::abs(vvec[ik]) > cut)
308 {
309 assert(ivec[2 * ik + 1] < dim);
310 counts[ivec[2 * ik + 1]]++;
311 }
312 }
313 }
314 }
315 }
316
317 IVec a(counts);
318 MPI_Allreduce(a.data(), counts.data(), counts.size(), MPI_INT, MPI_SUM, &(TG.Global()));
319 return true;
320 }
321
322 template<class SpMatrix_Partition, class IVec, class task_group>
single_reader_count_entries(hdf_archive & dump,SpMatrix_Partition & split,int nblk,bool byRow,IVec & counts,task_group & TG)323 inline bool single_reader_count_entries(hdf_archive& dump,
324 SpMatrix_Partition& split,
325 int nblk,
326 bool byRow,
327 IVec& counts,
328 task_group& TG)
329 {
330 if (TG.getCoreID() != 0)
331 return true;
332
333 int dim;
334 if (byRow)
335 std::tie(dim, std::ignore) = split.getDimensions();
336 else
337 std::tie(std::ignore, dim) = split.getDimensions();
338 double cut = split.getCutoff();
339 counts.resize(dim);
340 std::fill(counts.begin(), counts.end(), 0);
341
342 std::vector<int> block_size(nblk);
343 if (!dump.readEntry(block_size, std::string("block_sizes")))
344 {
345 app_error() << " Error in single_reader_count_entries: Problems reading ***_block_sizes dataset. \n";
346 return false;
347 }
348
349 int ntmax = *std::max_element(block_size.begin(), block_size.end());
350 std::vector<IndexType> ivec;
351 std::vector<ValueType> vvec;
352 ivec.reserve(2 * ntmax);
353 vvec.reserve(ntmax);
354
355 for (int ib = 0; ib < nblk; ib++)
356 {
357 ivec.resize(2 * block_size[ib]);
358 vvec.resize(block_size[ib]);
359 if (!dump.readEntry(ivec, std::string("index_") + std::to_string(ib)))
360 {
361 app_error() << " Error in single_reader_count_entries: Problems reading ***_index_" << ib << " dataset. \n";
362 return false;
363 }
364 if (!dump.readEntry(vvec, std::string("vals_") + std::to_string(ib)))
365 {
366 app_error() << " Error in single_reader_count_entries: Problems reading ***_vals_" << ib << " dataset. \n";
367 return false;
368 }
369 if (byRow)
370 {
371 for (int ik = 0, ikend = block_size[ib]; ik < ikend; ik++)
372 {
373 if (std::abs(vvec[ik]) > cut)
374 {
375 assert(ivec[2 * ik] < dim);
376 counts[ivec[2 * ik]]++;
377 }
378 }
379 }
380 else
381 {
382 for (int ik = 0, ikend = block_size[ib]; ik < ikend; ik++)
383 {
384 if (std::abs(vvec[ik]) > cut)
385 {
386 assert(ivec[2 * ik + 1] < dim);
387 counts[ivec[2 * ik + 1]]++;
388 }
389 }
390 }
391 }
392
393 return true;
394 }
395
396 template<class SpMatrix, class SpMatrix_Partition, class task_group>
read_hdf5_SpMat(SpMatrix & SpM,SpMatrix_Partition & split,hdf_archive & dump,int nblk,task_group & TG,bool parallel_read=true,int n_working_cores=1)397 inline bool read_hdf5_SpMat(SpMatrix& SpM,
398 SpMatrix_Partition& split,
399 hdf_archive& dump,
400 int nblk,
401 task_group& TG,
402 bool parallel_read = true,
403 int n_working_cores = 1)
404 {
405 assert(n_working_cores >= 1);
406 if (parallel_read)
407 return multiple_reader_hdf5_SpMat(SpM, split, dump, nblk, TG, n_working_cores);
408 else
409 return single_reader_hdf5_SpMat(SpM, split, dump, nblk, TG);
410 }
411
412 template<class SpMatrix_Partition, class IVec, class task_group>
count_entries_hdf5_SpMat(hdf_archive & dump,SpMatrix_Partition & split,int nblk,bool byRow,IVec & counts,task_group & TG,bool parallel_read=true,int n_working_cores=1)413 inline bool count_entries_hdf5_SpMat(hdf_archive& dump,
414 SpMatrix_Partition& split,
415 int nblk,
416 bool byRow,
417 IVec& counts,
418 task_group& TG,
419 bool parallel_read = true,
420 int n_working_cores = 1)
421 {
422 assert(n_working_cores >= 1);
423 if (parallel_read)
424 return multiple_reader_count_entries(dump, split, nblk, byRow, counts, TG, n_working_cores);
425 else
426 return single_reader_count_entries(dump, split, nblk, byRow, counts, TG);
427 }
428
429 /*
430 template<class SpMatrix,
431 class task_group
432 >
433 inline std::tuple<int,int> write_hdf5_SpMat(SpMatrix& SpM, hdf_archive& dump, std::string name, int nterms_per_blk, task_group& TG)
434 {
435 std::size_t nblocks=0,ntot=0;
436 if(TG.getTGNumber() > 0)
437 return std::make_tuple(nblocks,ntot);
438 int nnodes_per_TG = TG.getNGroupsPerTG();
439 int node_number = TG.getLocalGroupNumber();
440 int core_rank = TG.getCoreRank();
441 if(core_rank==0 && node_number==0) {
442
443 // get ranks of node heads on TG
444 std::vector<int> ranks;
445 int pos=0;
446 if(nnodes_per_TG>1) TG.getRanksOfRoots(ranks,pos);
447 else {
448 ranks.push_back(0);
449 }
450
451 std::size_t sz = SpM.size();
452 int nb_loc = int( sz/std::size_t(nterms_per_blk) + std::min(sz%std::size_t(nterms_per_blk),std::size_t(1)));
453 if(nnodes_per_TG>1) {
454 int nt;
455 MPI_Reduce(&nb_loc,&nt,1,MPI_INT,MPI_SUM,0,TG.getTGCOMM());
456 nblocks = std::size_t(nt);
457 } else {
458 nblocks = nb_loc;
459 }
460
461 std::vector<int> bsize;
462 bsize.reserve(nblocks);
463
464 std::vector<IndexType> ivec;
465 std::vector<ValueType> vvec;
466 ivec.reserve(2*nterms_per_blk);
467 vvec.reserve(nterms_per_blk);
468
469 typename SpMatrix::int_iterator col = SpM.cols_begin();
470 typename SpMatrix::int_iterator row = SpM.rows_begin();
471 typename SpMatrix::iterator val = SpM.vals_begin();
472
473 // do mine first
474 int cnt=0, nterms;
475 IndexType nblk=0;
476 for(int i=0; i<nb_loc; i++) {
477 if( (sz-std::size_t(cnt)) < std::size_t(nterms_per_blk) )
478 nterms = int(sz-std::size_t(cnt));
479 else
480 nterms = nterms_per_blk;
481 ivec.clear();
482 vvec.clear();
483 for(int k=0; k<nterms; k++, cnt++, row++, col++, val++) {
484 ivec.push_back(*row);
485 ivec.push_back(*col);
486 vvec.push_back(static_cast<ValueType>(*val));
487 }
488 bsize.push_back(nterms);
489 dump.write(ivec,std::string("Spvn_index_")+std::to_string(nblk));
490 dump.write(vvec,std::string("Spvn_vals_")+std::to_string(nblk));
491 nblk++;
492 }
493
494 ntot = sz;
495 MPI_Status st;
496 for(int rk=1; rk<nnodes_per_TG; rk++) {
497
498 bool done = false;
499 do {
500 // resize to receive
501 ivec.resize(2*nterms_per_blk);
502 vvec.resize(nterms_per_blk);
503 #if defined(QMC_COMPLEX)
504 MPI_Recv(vvec.data(),2*nterms_per_blk,MPI_DOUBLE,ranks[rk],MPI_ANY_TAG,TG.getTGCOMM(),&st);
505 #else
506 MPI_Recv(vvec.data(),nterms_per_blk,MPI_DOUBLE,ranks[rk],MPI_ANY_TAG,TG.getTGCOMM(),&st);
507 #endif
508 MPI_Recv(ivec.data(),2*nterms_per_blk,MPI_INT,ranks[rk],MPI_ANY_TAG,TG.getTGCOMM(),&st);
509
510 int nt_;
511 MPI_Get_count(&st,MPI_INT,&nt_);
512 nterms = std::size_t(nt_/2);
513 if(st.MPI_TAG == MORE) {
514 assert(nterms == nterms_per_blk);
515 } else if(st.MPI_TAG == LAST) {
516 assert(nterms <= nterms_per_blk);
517 done=true;
518 } else {
519 APP_ABORT(" Error: This should not happen. \n\n\n");
520 }
521
522 ntot += std::size_t(nterms);
523 bsize.push_back(nterms);
524 // resize to write
525 ivec.resize(2*nterms);
526 vvec.resize(nterms);
527 dump.write(ivec,std::string("Spvn_index_")+std::to_string(nblk));
528 dump.write(vvec,std::string("Spvn_vals_")+std::to_string(nblk));
529 nblk++;
530 } while(!done);
531
532 }
533
534 dump.write(bsize,"Spvn_block_sizes");
535
536 } else if(nnodes_per_TG>1 && core_rank == 0) {
537
538 std::size_t sz = SpM.size();
539 int nt,nb_loc = int(sz/nterms_per_blk + std::min(sz%nterms_per_blk,std::size_t(1)));
540 MPI_Reduce(&nb_loc,&nt,1,MPI_INT,MPI_SUM,0,TG.getTGCOMM());
541
542 std::vector<IndexType> ivec;
543 std::vector<ValueType> vvec;
544 ivec.reserve(2*nterms_per_blk);
545 vvec.reserve(nterms_per_blk);
546
547 typename SpMatrix::int_iterator col = SpM.cols_begin();
548 typename SpMatrix::int_iterator row = SpM.rows_begin();
549 typename SpMatrix::iterator val = SpM.vals_begin();
550
551 int cnt=0, nterms;
552 IndexType nblk=0;
553 for(int i=0; i<nb_loc; i++) {
554 if( (sz-std::size_t(cnt)) < std::size_t(nterms_per_blk) )
555 nterms = int(sz-std::size_t(cnt));
556 else
557 nterms = nterms_per_blk;
558 ivec.clear();
559 vvec.clear();
560 for(int k=0; k<nterms; k++, cnt++, row++, col++, val++) {
561 ivec.push_back(*row);
562 ivec.push_back(*col);
563 vvec.push_back(static_cast<ValueType>(*val));
564 }
565 #if defined(QMC_COMPLEX)
566 MPI_Send(vvec.data(),2*nterms,MPI_DOUBLE,0, (i==nb_loc-1)?(LAST):(MORE) ,TG.getTGCOMM());
567 #else
568 MPI_Send(vvec.data(),nterms,MPI_DOUBLE,0, (i==nb_loc-1)?(LAST):(MORE) ,TG.getTGCOMM());
569 #endif
570 MPI_Send(ivec.data(),2*nterms,MPI_INT,0, (i==nb_loc-1)?(LAST):(MORE) ,TG.getTGCOMM());
571 }
572
573 } else if(nnodes_per_TG>1) {
574
575 int nb_loc=0,nt;
576 MPI_Reduce(&nb_loc,&nt,1,MPI_INT,MPI_SUM,0,TG.getTGCOMM());
577
578 }
579 return std::make_tuple(nblocks,ntot);
580
581 }
582 */
583
584 } // namespace afqmc
585
586 } // namespace qmcplusplus
587
588 #endif
589