1 //////////////////////////////////////////////////////////////////////////////////////
2 //// This file is distributed under the University of Illinois/NCSA Open Source License.
3 //// See LICENSE file in top directory for details.
4 ////
5 //// Copyright (c) 2016 Jeongnim Kim and QMCPACK developers.
6 ////
7 //// File developed by:
8 ////
9 //// File created by: Miguel Morales, moralessilva2@llnl.gov, Lawrence Livermore National Laboratory
10 ////////////////////////////////////////////////////////////////////////////////////////
11 
12 #ifndef QMCPLUSPLUS_AFQMC_HDF5_READERS_HPP
13 #define QMCPLUSPLUS_AFQMC_HDF5_READERS_HPP
14 
15 
16 #include <cassert>
17 #include <complex>
18 #include <cstdlib>
19 #include <algorithm>
20 #include <utility>
21 #include <vector>
22 #include <numeric>
23 #if defined(USE_MPI)
24 #include <mpi.h>
25 #endif
26 
27 #include "Configuration.h"
28 #include "Utilities/FairDivide.h"
29 #include "hdf/hdf_archive.h"
30 #include "Message/CommOperators.h"
31 
32 #include "AFQMC/config.0.h"
33 
34 #if defined(ADD_TESTS_TIMERS)
35 #include "AFQMC/Utilities/myTimer.h"
36 #endif
37 
38 #define MORE 1555
39 #define LAST 2777
40 
41 namespace qmcplusplus
42 {
43 namespace afqmc
44 {
45 // look for a dataset call ..._descriptor, which will contain integer types
46 // describing the properties of the datasets, e.g.
47 // - transposed?
48 // - sorted?
49 // - etc.
50 //
51 // If this dataset is not found, then assume generic
52 //
53 // SpMatrix_Partition both partitions the matrix into local segments belonging to a
54 // given node, as well as applies cutoffs to values.
55 //
56 // For good efficiency, it is possible to partition the local block
57 // (obtained through split.local(i,j,val))
58 // among equivalent nnodes (same node id within TG and using split.local_split(i,j,val))
59 // and read/sort only this local segment.
60 // After the call to compress (which only does it over the local segment),
61 // assemble the full local partition with a call to Allgather with the corresponding
62 // nodes in COMM_WORLD.
63 //   -- minimizes sorting
64 //
65 // Algorith 1:
66 //  1. head_of_nodes read a block from the hdf5 in parallel and sequentially
67 //  2. loop over head_of_nodes, each one
68 //      i. bcasts his block to the group
69 //      ii. from each bcast, adds to the local list if split.local(i,j)==true
70 //  *** overlap the calls to bcast (with Ibcast) and the block processing
71 //  3. after read, compress
72 //  4. after compress, gather segments from other equivalent nodes
73 //  5. If memory is not a problem, further speedup can be obtained by having
74 //     other cores also read/bcast/process blocks.
75 //       --  possible downside:  1) memory, 2) I/O bottleneck (e.g. BGQ)
76 template<class SpMatrix, class SpMatrix_Partition, class task_group>
multiple_reader_hdf5_SpMat(SpMatrix & SpM,SpMatrix_Partition & split,hdf_archive & dump,int nblk,task_group & TG,int n_working_cores)77 inline bool multiple_reader_hdf5_SpMat(SpMatrix& SpM,
78                                        SpMatrix_Partition& split,
79                                        hdf_archive& dump,
80                                        int nblk,
81                                        task_group& TG,
82                                        int n_working_cores)
83 {
84   int nnodes = TG.getTotalNodes(), nodeid = TG.getNodeID(), coreid = TG.getCoreID();
85 #if defined(ADD_TESTS_TIMERS)
86   myTimer Timer;
87 #endif
88 
89   // process hdf5 file
90   if (coreid < n_working_cores)
91   {
92     bool need_lock = n_working_cores > 1;
93 
94 #if defined(ADD_TESTS_TIMERS)
95     Timer.start("Gen");
96 #endif
97 
98     std::vector<int> block_size(nblk);
99     if (!dump.readEntry(block_size, std::string("block_sizes")))
100     {
101       app_error() << " Error in multiple_reader_hdf5_SpMat: Problems reading ***_block_sizes dataset. \n";
102       return false;
103     }
104 
105 #if defined(ADD_TESTS_TIMERS)
106     Timer.stop("Gen");
107     app_log() << " multiple_reader_hdf5_SpMat::read blocks " << Timer.total("Gen") << "\n";
108 #endif
109 
110     int nmax = 100000;
111     std::vector<std::tuple<int, int, SPValueType>> buff;
112     buff.reserve(nmax);
113 
114     int ntmax = *std::max_element(block_size.begin(), block_size.end());
115     std::vector<IndexType> ivec, ivec2;
116     std::vector<ValueType> vvec, vvec2;
117     ivec.reserve(2 * ntmax);
118     vvec.reserve(ntmax);
119     ivec2.reserve(2 * ntmax);
120     vvec2.reserve(ntmax);
121 
122     std::vector<int> pool_dist;
123     // divide blocks over n_working_cores groups
124     FairDivide(nblk, n_working_cores, pool_dist);
125 
126     int first_local_block = pool_dist[coreid];
127     int last_local_block  = pool_dist[coreid + 1];
128     int nbtot             = last_local_block - first_local_block;
129     int niter             = nbtot / nnodes + std::min(nbtot % nnodes, 1);
130 
131 #if defined(ADD_TESTS_TIMERS)
132     Timer.reset("Gen");
133     Timer.start("Gen");
134 #endif
135 
136     for (int iter = 0; iter < niter; iter++)
137     {
138       int myblock_number = first_local_block + nnodes * iter + nodeid;
139       int first_block    = first_local_block + nnodes * iter;
140       int last_block     = std::min(first_block + nnodes, last_local_block);
141       if (myblock_number < last_local_block)
142       {
143         ivec.resize(2 * block_size[myblock_number]);
144         vvec.resize(block_size[myblock_number]);
145         if (!dump.readEntry(ivec, std::string("index_") + std::to_string(myblock_number)))
146         {
147           app_error() << " Error in multiple_reader_hdf5_SpMat: Problems reading ***_index_" << myblock_number
148                       << " dataset. \n";
149           return false;
150         }
151         if (!dump.readEntry(vvec, std::string("vals_") + std::to_string(myblock_number)))
152         {
153           app_error() << " Error in multiple_reader_hdf5_SpMat: Problems reading ***_vals_" << myblock_number
154                       << " dataset. \n";
155           return false;
156         }
157       }
158       for (int k = first_block, ipr = 0; k < last_block; k++, ipr++)
159       {
160         ivec2.resize(2 * block_size[k]);
161         vvec2.resize(block_size[k]);
162         if (ipr == nodeid)
163         {
164           assert(myblock_number == k);
165           std::copy(ivec.begin(), ivec.end(), ivec2.begin());
166           TG.Cores().broadcast_n(ivec2.begin(), ivec2.size(), ipr);
167           std::copy(vvec.begin(), vvec.end(), vvec2.begin());
168           TG.Cores().broadcast_n(vvec2.begin(), vvec2.size(), ipr);
169         }
170         else
171         {
172           TG.Cores().broadcast_n(ivec2.begin(), ivec2.size(), ipr);
173           TG.Cores().broadcast_n(vvec2.begin(), vvec2.size(), ipr);
174         }
175 
176         for (int ik = 0, ikend = block_size[k]; ik < ikend; ik++)
177           if (split.local(ivec2[2 * ik], ivec2[2 * ik + 1], vvec2[ik]))
178           {
179             buff.emplace_back(std::make_tuple(ivec2[2 * ik], ivec2[2 * ik + 1], static_cast<SPValueType>(vvec2[ik])));
180             if (buff.size() == nmax)
181             {
182               SpM.add(buff, need_lock);
183               buff.clear();
184             }
185           }
186       }
187     }
188     if (buff.size() > 0)
189     {
190       SpM.add(buff, need_lock);
191       buff.clear();
192     }
193   }
194 
195 #if defined(ADD_TESTS_TIMERS)
196   MPI_Barrier(&(TG.Node()));
197   Timer.stop("Gen");
198   app_log() << " multiple_reader_hdf5_SpMat::read data " << Timer.total("Gen") << "\n";
199   Timer.reset("Gen");
200   Timer.start("Gen");
201 #endif
202 
203   // compress matrix
204   SpM.compress(&(TG.Node()));
205 
206 #if defined(ADD_TESTS_TIMERS)
207   Timer.stop("Gen");
208   app_log() << " multiple_reader_hdf5_SpMat::compress " << Timer.total("Gen") << "\n";
209 #endif
210 
211   // gather segment
212   //std::vector<int> counts, displ;
213   //if(coreid == 0) {
214   //  counts.resize(nnodes);
215   //  displ.resize(nnodes+1);
216   //}
217 
218   return true;
219 }
220 
221 template<class SpMatrix, class SpMatrix_Partition, class task_group>
single_reader_hdf5_SpMat(SpMatrix & SpM,SpMatrix_Partition & split,hdf_archive & dump,int nblk,task_group & TG)222 inline bool single_reader_hdf5_SpMat(SpMatrix& SpM,
223                                      SpMatrix_Partition& split,
224                                      hdf_archive& dump,
225                                      int nblk,
226                                      task_group& TG)
227 {
228   return true;
229 }
230 
231 
232 template<class SpMatrix_Partition, class IVec, class task_group>
multiple_reader_count_entries(hdf_archive & dump,SpMatrix_Partition & split,int nblk,bool byRow,IVec & counts,task_group & TG,int n_working_cores)233 inline bool multiple_reader_count_entries(hdf_archive& dump,
234                                           SpMatrix_Partition& split,
235                                           int nblk,
236                                           bool byRow,
237                                           IVec& counts,
238                                           task_group& TG,
239                                           int n_working_cores)
240 {
241   double cut = split.getCutoff();
242   int dim;
243   if (byRow)
244     std::tie(dim, std::ignore) = split.getDimensions();
245   else
246     std::tie(std::ignore, dim) = split.getDimensions();
247   counts.resize(dim);
248   std::fill(counts.begin(), counts.end(), 0);
249   assert(sizeof(counts[0]) == sizeof(int));
250 
251   int nnodes = TG.getTotalNodes(), nodeid = TG.getNodeID(), coreid = TG.getCoreID();
252 
253   // process hdf5 file
254   if (coreid < n_working_cores)
255   {
256     std::vector<int> block_size(nblk);
257     if (!dump.readEntry(block_size, std::string("block_sizes")))
258     {
259       app_error() << " Error in multiple_reader_count_entries: Problems reading ***_block_sizes dataset. \n";
260       return false;
261     }
262 
263     int ntmax = *std::max_element(block_size.begin(), block_size.end());
264     std::vector<IndexType> ivec;
265     std::vector<ValueType> vvec;
266     ivec.reserve(2 * ntmax);
267     vvec.reserve(ntmax);
268 
269     std::vector<int> pool_dist;
270     // divide blocks over n_working_cores groups
271     FairDivide(nblk, n_working_cores, pool_dist);
272 
273     int first_local_block = pool_dist[coreid];
274     int last_local_block  = pool_dist[coreid + 1];
275 
276     for (int ib = first_local_block; ib < last_local_block; ib++)
277     {
278       if (ib % nnodes != nodeid)
279         continue;
280       ivec.resize(2 * block_size[ib]);
281       vvec.resize(block_size[ib]);
282       if (!dump.readEntry(ivec, std::string("index_") + std::to_string(ib)))
283       {
284         app_error() << " Error in multiple_reader_count_entries: Problems reading ***_index_" << ib << " dataset. \n";
285         return false;
286       }
287       if (!dump.readEntry(vvec, std::string("vals_") + std::to_string(ib)))
288       {
289         app_error() << " Error in multiple_reader_count_entries: Problems reading ***_vals_" << ib << " dataset. \n";
290         return false;
291       }
292       if (byRow)
293       {
294         for (int ik = 0, ikend = block_size[ib]; ik < ikend; ik++)
295         {
296           if (std::abs(vvec[ik]) > cut)
297           {
298             assert(ivec[2 * ik] < dim);
299             counts[ivec[2 * ik]]++;
300           }
301         }
302       }
303       else
304       {
305         for (int ik = 0, ikend = block_size[ib]; ik < ikend; ik++)
306         {
307           if (std::abs(vvec[ik]) > cut)
308           {
309             assert(ivec[2 * ik + 1] < dim);
310             counts[ivec[2 * ik + 1]]++;
311           }
312         }
313       }
314     }
315   }
316 
317   IVec a(counts);
318   MPI_Allreduce(a.data(), counts.data(), counts.size(), MPI_INT, MPI_SUM, &(TG.Global()));
319   return true;
320 }
321 
322 template<class SpMatrix_Partition, class IVec, class task_group>
single_reader_count_entries(hdf_archive & dump,SpMatrix_Partition & split,int nblk,bool byRow,IVec & counts,task_group & TG)323 inline bool single_reader_count_entries(hdf_archive& dump,
324                                         SpMatrix_Partition& split,
325                                         int nblk,
326                                         bool byRow,
327                                         IVec& counts,
328                                         task_group& TG)
329 {
330   if (TG.getCoreID() != 0)
331     return true;
332 
333   int dim;
334   if (byRow)
335     std::tie(dim, std::ignore) = split.getDimensions();
336   else
337     std::tie(std::ignore, dim) = split.getDimensions();
338   double cut = split.getCutoff();
339   counts.resize(dim);
340   std::fill(counts.begin(), counts.end(), 0);
341 
342   std::vector<int> block_size(nblk);
343   if (!dump.readEntry(block_size, std::string("block_sizes")))
344   {
345     app_error() << " Error in single_reader_count_entries: Problems reading ***_block_sizes dataset. \n";
346     return false;
347   }
348 
349   int ntmax = *std::max_element(block_size.begin(), block_size.end());
350   std::vector<IndexType> ivec;
351   std::vector<ValueType> vvec;
352   ivec.reserve(2 * ntmax);
353   vvec.reserve(ntmax);
354 
355   for (int ib = 0; ib < nblk; ib++)
356   {
357     ivec.resize(2 * block_size[ib]);
358     vvec.resize(block_size[ib]);
359     if (!dump.readEntry(ivec, std::string("index_") + std::to_string(ib)))
360     {
361       app_error() << " Error in single_reader_count_entries: Problems reading ***_index_" << ib << " dataset. \n";
362       return false;
363     }
364     if (!dump.readEntry(vvec, std::string("vals_") + std::to_string(ib)))
365     {
366       app_error() << " Error in single_reader_count_entries: Problems reading ***_vals_" << ib << " dataset. \n";
367       return false;
368     }
369     if (byRow)
370     {
371       for (int ik = 0, ikend = block_size[ib]; ik < ikend; ik++)
372       {
373         if (std::abs(vvec[ik]) > cut)
374         {
375           assert(ivec[2 * ik] < dim);
376           counts[ivec[2 * ik]]++;
377         }
378       }
379     }
380     else
381     {
382       for (int ik = 0, ikend = block_size[ib]; ik < ikend; ik++)
383       {
384         if (std::abs(vvec[ik]) > cut)
385         {
386           assert(ivec[2 * ik + 1] < dim);
387           counts[ivec[2 * ik + 1]]++;
388         }
389       }
390     }
391   }
392 
393   return true;
394 }
395 
396 template<class SpMatrix, class SpMatrix_Partition, class task_group>
read_hdf5_SpMat(SpMatrix & SpM,SpMatrix_Partition & split,hdf_archive & dump,int nblk,task_group & TG,bool parallel_read=true,int n_working_cores=1)397 inline bool read_hdf5_SpMat(SpMatrix& SpM,
398                             SpMatrix_Partition& split,
399                             hdf_archive& dump,
400                             int nblk,
401                             task_group& TG,
402                             bool parallel_read  = true,
403                             int n_working_cores = 1)
404 {
405   assert(n_working_cores >= 1);
406   if (parallel_read)
407     return multiple_reader_hdf5_SpMat(SpM, split, dump, nblk, TG, n_working_cores);
408   else
409     return single_reader_hdf5_SpMat(SpM, split, dump, nblk, TG);
410 }
411 
412 template<class SpMatrix_Partition, class IVec, class task_group>
count_entries_hdf5_SpMat(hdf_archive & dump,SpMatrix_Partition & split,int nblk,bool byRow,IVec & counts,task_group & TG,bool parallel_read=true,int n_working_cores=1)413 inline bool count_entries_hdf5_SpMat(hdf_archive& dump,
414                                      SpMatrix_Partition& split,
415                                      int nblk,
416                                      bool byRow,
417                                      IVec& counts,
418                                      task_group& TG,
419                                      bool parallel_read  = true,
420                                      int n_working_cores = 1)
421 {
422   assert(n_working_cores >= 1);
423   if (parallel_read)
424     return multiple_reader_count_entries(dump, split, nblk, byRow, counts, TG, n_working_cores);
425   else
426     return single_reader_count_entries(dump, split, nblk, byRow, counts, TG);
427 }
428 
429 /*
430 template<class SpMatrix,
431          class task_group
432          >
433 inline std::tuple<int,int> write_hdf5_SpMat(SpMatrix& SpM, hdf_archive& dump, std::string name, int nterms_per_blk, task_group& TG)
434 {
435   std::size_t nblocks=0,ntot=0;
436   if(TG.getTGNumber() > 0)
437     return std::make_tuple(nblocks,ntot);
438   int nnodes_per_TG = TG.getNGroupsPerTG();
439   int node_number = TG.getLocalGroupNumber();
440   int core_rank = TG.getCoreRank();
441   if(core_rank==0 && node_number==0) {
442 
443     // get ranks of node heads on TG
444     std::vector<int> ranks;
445     int pos=0;
446     if(nnodes_per_TG>1) TG.getRanksOfRoots(ranks,pos);
447     else {
448       ranks.push_back(0);
449     }
450 
451     std::size_t sz = SpM.size();
452     int nb_loc = int( sz/std::size_t(nterms_per_blk) + std::min(sz%std::size_t(nterms_per_blk),std::size_t(1)));
453     if(nnodes_per_TG>1) {
454       int nt;
455       MPI_Reduce(&nb_loc,&nt,1,MPI_INT,MPI_SUM,0,TG.getTGCOMM());
456       nblocks = std::size_t(nt);
457     } else {
458       nblocks = nb_loc;
459     }
460 
461     std::vector<int> bsize;
462     bsize.reserve(nblocks);
463 
464     std::vector<IndexType> ivec;
465     std::vector<ValueType> vvec;
466     ivec.reserve(2*nterms_per_blk);
467     vvec.reserve(nterms_per_blk);
468 
469     typename SpMatrix::int_iterator col = SpM.cols_begin();
470     typename SpMatrix::int_iterator row = SpM.rows_begin();
471     typename SpMatrix::iterator val = SpM.vals_begin();
472 
473     // do mine first
474     int cnt=0, nterms;
475     IndexType nblk=0;
476     for(int i=0; i<nb_loc; i++) {
477       if( (sz-std::size_t(cnt)) < std::size_t(nterms_per_blk) )
478         nterms = int(sz-std::size_t(cnt));
479       else
480         nterms = nterms_per_blk;
481       ivec.clear();
482       vvec.clear();
483       for(int k=0; k<nterms; k++, cnt++, row++, col++, val++) {
484         ivec.push_back(*row);
485         ivec.push_back(*col);
486         vvec.push_back(static_cast<ValueType>(*val));
487       }
488       bsize.push_back(nterms);
489       dump.write(ivec,std::string("Spvn_index_")+std::to_string(nblk));
490       dump.write(vvec,std::string("Spvn_vals_")+std::to_string(nblk));
491       nblk++;
492     }
493 
494     ntot = sz;
495     MPI_Status st;
496     for(int rk=1; rk<nnodes_per_TG; rk++) {
497 
498       bool done = false;
499       do {
500         // resize to receive
501         ivec.resize(2*nterms_per_blk);
502         vvec.resize(nterms_per_blk);
503 #if defined(QMC_COMPLEX)
504         MPI_Recv(vvec.data(),2*nterms_per_blk,MPI_DOUBLE,ranks[rk],MPI_ANY_TAG,TG.getTGCOMM(),&st);
505 #else
506         MPI_Recv(vvec.data(),nterms_per_blk,MPI_DOUBLE,ranks[rk],MPI_ANY_TAG,TG.getTGCOMM(),&st);
507 #endif
508         MPI_Recv(ivec.data(),2*nterms_per_blk,MPI_INT,ranks[rk],MPI_ANY_TAG,TG.getTGCOMM(),&st);
509 
510         int nt_;
511         MPI_Get_count(&st,MPI_INT,&nt_);
512         nterms = std::size_t(nt_/2);
513         if(st.MPI_TAG == MORE) {
514           assert(nterms == nterms_per_blk);
515         } else if(st.MPI_TAG == LAST) {
516           assert(nterms <= nterms_per_blk);
517           done=true;
518         } else {
519           APP_ABORT(" Error: This should not happen. \n\n\n");
520         }
521 
522         ntot += std::size_t(nterms);
523         bsize.push_back(nterms);
524         // resize to write
525         ivec.resize(2*nterms);
526         vvec.resize(nterms);
527         dump.write(ivec,std::string("Spvn_index_")+std::to_string(nblk));
528         dump.write(vvec,std::string("Spvn_vals_")+std::to_string(nblk));
529         nblk++;
530       } while(!done);
531 
532     }
533 
534     dump.write(bsize,"Spvn_block_sizes");
535 
536   } else if(nnodes_per_TG>1 && core_rank == 0) {
537 
538     std::size_t sz = SpM.size();
539     int nt,nb_loc = int(sz/nterms_per_blk + std::min(sz%nterms_per_blk,std::size_t(1)));
540     MPI_Reduce(&nb_loc,&nt,1,MPI_INT,MPI_SUM,0,TG.getTGCOMM());
541 
542     std::vector<IndexType> ivec;
543     std::vector<ValueType> vvec;
544     ivec.reserve(2*nterms_per_blk);
545     vvec.reserve(nterms_per_blk);
546 
547     typename SpMatrix::int_iterator col = SpM.cols_begin();
548     typename SpMatrix::int_iterator row = SpM.rows_begin();
549     typename SpMatrix::iterator val = SpM.vals_begin();
550 
551     int cnt=0, nterms;
552     IndexType nblk=0;
553     for(int i=0; i<nb_loc; i++) {
554       if( (sz-std::size_t(cnt)) < std::size_t(nterms_per_blk) )
555         nterms = int(sz-std::size_t(cnt));
556       else
557         nterms = nterms_per_blk;
558       ivec.clear();
559       vvec.clear();
560       for(int k=0; k<nterms; k++, cnt++, row++, col++, val++) {
561         ivec.push_back(*row);
562         ivec.push_back(*col);
563         vvec.push_back(static_cast<ValueType>(*val));
564       }
565 #if defined(QMC_COMPLEX)
566       MPI_Send(vvec.data(),2*nterms,MPI_DOUBLE,0, (i==nb_loc-1)?(LAST):(MORE) ,TG.getTGCOMM());
567 #else
568       MPI_Send(vvec.data(),nterms,MPI_DOUBLE,0, (i==nb_loc-1)?(LAST):(MORE) ,TG.getTGCOMM());
569 #endif
570       MPI_Send(ivec.data(),2*nterms,MPI_INT,0, (i==nb_loc-1)?(LAST):(MORE) ,TG.getTGCOMM());
571     }
572 
573   } else if(nnodes_per_TG>1) {
574 
575     int nb_loc=0,nt;
576     MPI_Reduce(&nb_loc,&nt,1,MPI_INT,MPI_SUM,0,TG.getTGCOMM());
577 
578   }
579   return std::make_tuple(nblocks,ntot);
580 
581 }
582 */
583 
584 } // namespace afqmc
585 
586 } // namespace qmcplusplus
587 
588 #endif
589