1 //////////////////////////////////////////////////////////////////////////////////////
2 //// This file is distributed under the University of Illinois/NCSA Open Source License.
3 //// See LICENSE file in top directory for details.
4 ////
5 //// Copyright (c) 2016 Jeongnim Kim and QMCPACK developers.
6 ////
7 //// File developed by:
8 ////
9 //// File created by: Miguel Morales, moralessilva2@llnl.gov, Lawrence Livermore National Laboratory
10 ////////////////////////////////////////////////////////////////////////////////////////
11 
12 #ifndef QMCPLUSPLUS_AFQMC_HDF5_READERS_HPP
13 #define QMCPLUSPLUS_AFQMC_HDF5_READERS_HPP
14 
15 
16 #include <cassert>
17 #include <complex>
18 #include <cstdlib>
19 #include <algorithm>
20 #include <utility>
21 #include <vector>
22 #include <numeric>
23 #if defined(USE_MPI)
24 #include <mpi.h>
25 #endif
26 
27 #include "Configuration.h"
28 #include "Utilities/FairDivide.h"
29 #include "hdf/hdf_archive.h"
30 #include "Message/CommOperators.h"
31 
32 #include "AFQMC/config.0.h"
33 
34 #define MORE 1555
35 #define LAST 2777
36 
37 namespace qmcplusplus
38 {
39 namespace afqmc
40 {
41 // look for a dataset call ..._descriptor, which will contain integer types
42 // describing the properties of the datasets, e.g.
43 // - transposed?
44 // - sorted?
45 // - etc.
46 //
47 // If this dataset is not found, then assume generic
48 //
49 // SpMatrix_Partition both partitions the matrix into local segments belonging to a
50 // given node, as well as applies cutoffs to values.
51 //
52 // For good efficiency, it is possible to partition the local block
53 // (obtained through split.local(i,j,val))
54 // among equivalent nnodes (same node id within TG and using split.local_split(i,j,val))
55 // and read/sort only this local segment.
56 // After the call to compress (which only does it over the local segment),
57 // assemble the full local partition with a call to Allgather with the corresponding
58 // nodes in COMM_WORLD.
59 //   -- minimizes sorting
60 //
61 // Algorith 1:
62 //  1. head_of_nodes read a block from the hdf5 in parallel and sequentially
63 //  2. loop over head_of_nodes, each one
64 //      i. bcasts his block to the group
65 //      ii. from each bcast, adds to the local list if split.local(i,j)==true
66 //  *** overlap the calls to bcast (with Ibcast) and the block processing
67 //  3. after read, compress
68 //  4. after compress, gather segments from other equivalent nodes
69 //  5. If memory is not a problem, further speedup can be obtained by having
70 //     other cores also read/bcast/process blocks.
71 //       --  possible downside:  1) memory, 2) I/O bottleneck (e.g. BGQ)
72 template<class SpMatrix, class SpMatrix_Partition, class task_group>
multiple_reader_hdf5_SpMat(SpMatrix & SpM,SpMatrix_Partition & split,hdf_archive & dump,std::string name,int nblk,task_group & TG,int n_working_cores)73 inline bool multiple_reader_hdf5_SpMat(SpMatrix& SpM,
74                                        SpMatrix_Partition& split,
75                                        hdf_archive& dump,
76                                        std::string name,
77                                        int nblk,
78                                        task_group& TG,
79                                        int n_working_cores)
80 {
81   int nnodes = TG.getTotalNodes(), nodeid = TG.getNodeID(), coreid = TG.getCoreID();
82   int node_rank;
83 
84   // process hdf5 file
85   if (coreid < n_working_cores)
86   {
87     bool need_lock = n_working_cores > 1;
88     MPI_Comm_rank(TG.getHeadOfNodesComm(), &node_rank); // this should be equal to nodeid
89     int fct = 1;
90 #if defined(QMC_COMPLEX)
91     fct = 2;
92 #endif
93 
94     std::vector<int> block_size(nblk);
95     if (!dump.readEntry(block_size, name + std::string("_block_sizes")))
96     {
97       app_error() << " Error in multiple_reader_hdf5_SpMat: Problems reading ***_block_sizes dataset. \n";
98       return false;
99     }
100 
101     int ntmax = *std::max_element(block_size.begin(), block_size.end());
102     std::vector<IndexType> ivec, ivec2;
103     std::vector<ValueType> vvec, vvec2;
104     ivec.reserve(2 * ntmax);
105     vvec.reserve(ntmax);
106     ivec2.reserve(2 * ntmax);
107     vvec2.reserve(ntmax);
108 
109     std::vector<int> pool_dist;
110     // divide blocks over n_working_cores groups
111     FairDivide(nblk, n_working_cores, pool_dist);
112 
113     int first_local_block = pool_dist[coreid];
114     int last_local_block  = pool_dist[coreid + 1];
115     int nbtot             = last_local_block - first_local_block;
116     int niter             = nbtot / nnodes + std::min(nbtot % nnodes, 1);
117 
118     for (int iter = 0; iter < niter; iter++)
119     {
120       int myblock_number = first_local_block + nnodes * iter + nodeid;
121       int first_block    = first_local_block + nnodes * iter;
122       int last_block     = std::min(first_block + nnodes, last_local_block);
123       if (myblock_number < last_local_block)
124       {
125         ivec.resize(2 * block_size[myblock_number]);
126         vvec.resize(block_size[myblock_number]);
127         if (!dump.readEntry(ivec, name + std::string("_index_") + std::to_string(myblock_number)))
128         {
129           app_error() << " Error in multiple_reader_hdf5_SpMat: Problems reading ***_index_" << myblock_number
130                       << " dataset. \n";
131           return false;
132         }
133         if (!dump.readEntry(vvec, name + std::string("_vals_") + std::to_string(myblock_number)))
134         {
135           app_error() << " Error in multiple_reader_hdf5_SpMat: Problems reading ***_vals_" << myblock_number
136                       << " dataset. \n";
137           return false;
138         }
139       }
140       for (int k = first_block, ipr = 0; k < last_block; k++, ipr++)
141       {
142         ivec2.resize(2 * block_size[k]);
143         vvec2.resize(block_size[k]);
144         if (ipr == node_rank)
145         {
146           assert(myblock_number == k);
147           std::copy(ivec.begin(), ivec.end(), ivec2.begin());
148           MPI_Bcast(ivec2.data(), ivec2.size(), MPI_INT, ipr, TG.getHeadOfNodesComm());
149           std::copy(vvec.begin(), vvec.end(), vvec2.begin());
150           MPI_Bcast(vvec2.data(), fct * vvec2.size(), MPI_DOUBLE, ipr, TG.getHeadOfNodesComm());
151         }
152         else
153         {
154           MPI_Bcast(ivec2.data(), ivec2.size(), MPI_INT, ipr, TG.getHeadOfNodesComm());
155           MPI_Bcast(vvec2.data(), fct * vvec2.size(), MPI_DOUBLE, ipr, TG.getHeadOfNodesComm());
156         }
157 
158         for (int ik = 0, ikend = block_size[k]; ik < ikend; ik++)
159           if (split.local(ivec2[2 * ik], ivec2[2 * ik + 1], vvec2[ik]))
160             SpM.add(ivec2[2 * ik], ivec2[2 * ik + 1], static_cast<SPValueType>(vvec2[ik]), need_lock);
161       }
162     }
163   }
164 
165   // compress matrix
166   SpM.compress(TG.getNodeCommLocal());
167 
168   // gather segment
169   //std::vector<int> counts, displ;
170   //if(coreid == 0) {
171   //  counts.resize(nnodes);
172   //  displ.resize(nnodes+1);
173   //}
174 
175   return true;
176 }
177 
178 template<class SpMatrix, class SpMatrix_Partition, class task_group>
single_reader_hdf5_SpMat(SpMatrix & SpM,SpMatrix_Partition & split,hdf_archive & dump,std::string name,int nblk,task_group & TG)179 inline bool single_reader_hdf5_SpMat(SpMatrix& SpM,
180                                      SpMatrix_Partition& split,
181                                      hdf_archive& dump,
182                                      std::string name,
183                                      int nblk,
184                                      task_group& TG)
185 {
186   return true;
187 }
188 
189 
190 template<class SpMatrix_Partition, class IVec, class task_group>
multiple_reader_count_entries(hdf_archive & dump,SpMatrix_Partition & split,std::string name,int nblk,bool byRow,IVec & counts,task_group & TG,int n_working_cores)191 inline bool multiple_reader_count_entries(hdf_archive& dump,
192                                           SpMatrix_Partition& split,
193                                           std::string name,
194                                           int nblk,
195                                           bool byRow,
196                                           IVec& counts,
197                                           task_group& TG,
198                                           int n_working_cores)
199 {
200   double cut = split.getCutoff();
201   int dim;
202   if (byRow)
203     std::tie(dim, std::ignore) = split.getDimensions();
204   else
205     std::tie(std::ignore, dim) = split.getDimensions();
206   counts.resize(dim);
207   std::fill(counts.begin(), counts.end(), 0);
208   assert(sizeof(counts[0]) == sizeof(int));
209 
210   int nnodes = TG.getTotalNodes(), nodeid = TG.getNodeID(), coreid = TG.getCoreID();
211   int node_rank;
212 
213   // process hdf5 file
214   if (coreid < n_working_cores)
215   {
216     std::vector<int> block_size(nblk);
217     if (!dump.readEntry(block_size, name + std::string("_block_sizes")))
218     {
219       app_error() << " Error in multiple_reader_count_entries: Problems reading ***_block_sizes dataset. \n";
220       return false;
221     }
222 
223     int ntmax = *std::max_element(block_size.begin(), block_size.end());
224     std::vector<IndexType> ivec;
225     std::vector<ValueType> vvec;
226     ivec.reserve(2 * ntmax);
227     vvec.reserve(ntmax);
228 
229     std::vector<int> pool_dist;
230     // divide blocks over n_working_cores groups
231     FairDivide(nblk, n_working_cores, pool_dist);
232 
233     int first_local_block = pool_dist[coreid];
234     int last_local_block  = pool_dist[coreid + 1];
235 
236     for (int ib = first_local_block; ib < last_local_block; ib++)
237     {
238       if (ib % nnodes != nodeid)
239         continue;
240       ivec.resize(2 * block_size[ib]);
241       vvec.resize(block_size[ib]);
242       if (!dump.readEntry(ivec, name + std::string("_index_") + std::to_string(ib)))
243       {
244         app_error() << " Error in multiple_reader_count_entries: Problems reading ***_index_" << ib << " dataset. \n";
245         return false;
246       }
247       if (!dump.readEntry(vvec, name + std::string("_vals_") + std::to_string(ib)))
248       {
249         app_error() << " Error in multiple_reader_count_entries: Problems reading ***_vals_" << ib << " dataset. \n";
250         return false;
251       }
252       if (byRow)
253       {
254         for (int ik = 0, ikend = block_size[ib]; ik < ikend; ik++)
255         {
256           if (std::abs(vvec[ik]) > cut)
257           {
258             assert(ivec[2 * ik] < dim);
259             counts[ivec[2 * ik]]++;
260           }
261         }
262       }
263       else
264       {
265         for (int ik = 0, ikend = block_size[ib]; ik < ikend; ik++)
266         {
267           if (std::abs(vvec[ik]) > cut)
268           {
269             assert(ivec[2 * ik + 1] < dim);
270             counts[ivec[2 * ik + 1]]++;
271           }
272         }
273       }
274     }
275   }
276 
277   IVec a(counts);
278   //  FIX FIX FIX: use image communicator, not COMM_WORLD
279   MPI_Allreduce(a.data(), counts.data(), counts.size(), MPI_INT, MPI_SUM, MPI_COMM_WORLD);
280   return true;
281 }
282 
283 template<class SpMatrix_Partition, class IVec, class task_group>
single_reader_count_entries(hdf_archive & dump,SpMatrix_Partition & split,std::string name,int nblk,bool byRow,IVec & counts,task_group & TG)284 inline bool single_reader_count_entries(hdf_archive& dump,
285                                         SpMatrix_Partition& split,
286                                         std::string name,
287                                         int nblk,
288                                         bool byRow,
289                                         IVec& counts,
290                                         task_group& TG)
291 {
292   if (TG.getCoreID() != 0)
293     return true;
294 
295   int dim;
296   if (byRow)
297     std::tie(dim, std::ignore) = split.getDimensions();
298   else
299     std::tie(std::ignore, dim) = split.getDimensions();
300   double cut = split.getCutoff();
301   counts.resize(dim);
302   std::fill(counts.begin(), counts.end(), 0);
303 
304   std::vector<int> block_size(nblk);
305   if (!dump.readEntry(block_size, name + std::string("_block_sizes")))
306   {
307     app_error() << " Error in single_reader_count_entries: Problems reading ***_block_sizes dataset. \n";
308     return false;
309   }
310 
311   int ntmax = *std::max_element(block_size.begin(), block_size.end());
312   std::vector<IndexType> ivec;
313   std::vector<ValueType> vvec;
314   ivec.reserve(2 * ntmax);
315   vvec.reserve(ntmax);
316 
317   for (int ib = 0; ib < nblk; ib++)
318   {
319     ivec.resize(2 * block_size[ib]);
320     vvec.resize(block_size[ib]);
321     if (!dump.readEntry(ivec, name + std::string("_index_") + std::to_string(ib)))
322     {
323       app_error() << " Error in single_reader_count_entries: Problems reading ***_index_" << ib << " dataset. \n";
324       return false;
325     }
326     if (!dump.readEntry(vvec, name + std::string("_vals_") + std::to_string(ib)))
327     {
328       app_error() << " Error in single_reader_count_entries: Problems reading ***_vals_" << ib << " dataset. \n";
329       return false;
330     }
331     if (byRow)
332     {
333       for (int ik = 0, ikend = block_size[ib]; ik < ikend; ik++)
334       {
335         if (std::abs(vvec[ik]) > cut)
336         {
337           assert(ivec[2 * ik] < dim);
338           counts[ivec[2 * ik]]++;
339         }
340       }
341     }
342     else
343     {
344       for (int ik = 0, ikend = block_size[ib]; ik < ikend; ik++)
345       {
346         if (std::abs(vvec[ik]) > cut)
347         {
348           assert(ivec[2 * ik + 1] < dim);
349           counts[ivec[2 * ik + 1]]++;
350         }
351       }
352     }
353   }
354 
355   return true;
356 }
357 
358 template<class SpMatrix, class SpMatrix_Partition, class task_group>
read_hdf5_SpMat(SpMatrix & SpM,SpMatrix_Partition & split,hdf_archive & dump,std::string name,int nblk,task_group & TG,bool parallel_read=true,int n_working_cores=1)359 inline bool read_hdf5_SpMat(SpMatrix& SpM,
360                             SpMatrix_Partition& split,
361                             hdf_archive& dump,
362                             std::string name,
363                             int nblk,
364                             task_group& TG,
365                             bool parallel_read  = true,
366                             int n_working_cores = 1)
367 {
368   assert(n_working_cores >= 1);
369   if (parallel_read)
370     return multiple_reader_hdf5_SpMat(SpM, split, dump, name, nblk, TG, n_working_cores);
371   else
372     return single_reader_hdf5_SpMat(SpM, split, dump, name, nblk, TG);
373 }
374 
375 template<class SpMatrix_Partition, class IVec, class task_group>
count_entries_hdf5_SpMat(hdf_archive & dump,SpMatrix_Partition & split,std::string name,int nblk,bool byRow,IVec & counts,task_group & TG,bool parallel_read=true,int n_working_cores=1)376 inline bool count_entries_hdf5_SpMat(hdf_archive& dump,
377                                      SpMatrix_Partition& split,
378                                      std::string name,
379                                      int nblk,
380                                      bool byRow,
381                                      IVec& counts,
382                                      task_group& TG,
383                                      bool parallel_read  = true,
384                                      int n_working_cores = 1)
385 {
386   assert(n_working_cores >= 1);
387   if (parallel_read)
388     return multiple_reader_count_entries(dump, split, name, nblk, byRow, counts, TG, n_working_cores);
389   else
390     return single_reader_count_entries(dump, split, name, nblk, byRow, counts, TG);
391 }
392 
393 template<class SpMatrix, class task_group>
write_hdf5_SpMat(SpMatrix & SpM,hdf_archive & dump,std::string name,int nterms_per_blk,task_group & TG)394 inline std::tuple<int, int> write_hdf5_SpMat(SpMatrix& SpM,
395                                              hdf_archive& dump,
396                                              std::string name,
397                                              int nterms_per_blk,
398                                              task_group& TG)
399 {
400   std::size_t nblocks = 0, ntot = 0;
401   if (TG.getTGNumber() > 0)
402     return std::make_tuple(nblocks, ntot);
403   int nnodes_per_TG = TG.getNGroupsPerTG();
404   int node_number   = TG.getLocalGroupNumber();
405   int core_rank     = TG.getCoreRank();
406   if (core_rank == 0 && node_number == 0)
407   {
408     // get ranks of node heads on TG
409     std::vector<int> ranks;
410     int pos = 0;
411     if (nnodes_per_TG > 1)
412       TG.getRanksOfRoots(ranks, pos);
413     else
414     {
415       ranks.push_back(0);
416     }
417 
418     std::size_t sz = SpM.size();
419     int nb_loc     = int(sz / std::size_t(nterms_per_blk) + std::min(sz % std::size_t(nterms_per_blk), std::size_t(1)));
420     if (nnodes_per_TG > 1)
421     {
422       int nt;
423       MPI_Reduce(&nb_loc, &nt, 1, MPI_INT, MPI_SUM, 0, TG.getTGCOMM());
424       nblocks = std::size_t(nt);
425     }
426     else
427     {
428       nblocks = nb_loc;
429     }
430 
431     std::vector<int> bsize;
432     bsize.reserve(nblocks);
433 
434     std::vector<IndexType> ivec;
435     std::vector<ValueType> vvec;
436     ivec.reserve(2 * nterms_per_blk);
437     vvec.reserve(nterms_per_blk);
438 
439     typename SpMatrix::int_iterator col = SpM.cols_begin();
440     typename SpMatrix::int_iterator row = SpM.rows_begin();
441     typename SpMatrix::iterator val     = SpM.vals_begin();
442 
443     // do mine first
444     int cnt        = 0, nterms;
445     IndexType nblk = 0;
446     for (int i = 0; i < nb_loc; i++)
447     {
448       if ((sz - std::size_t(cnt)) < std::size_t(nterms_per_blk))
449         nterms = int(sz - std::size_t(cnt));
450       else
451         nterms = nterms_per_blk;
452       ivec.clear();
453       vvec.clear();
454       for (int k = 0; k < nterms; k++, cnt++, row++, col++, val++)
455       {
456         ivec.push_back(*row);
457         ivec.push_back(*col);
458         vvec.push_back(static_cast<ValueType>(*val));
459       }
460       bsize.push_back(nterms);
461       dump.write(ivec, std::string("Spvn_index_") + std::to_string(nblk));
462       dump.write(vvec, std::string("Spvn_vals_") + std::to_string(nblk));
463       nblk++;
464     }
465 
466     ntot = sz;
467     MPI_Status st;
468     for (int rk = 1; rk < nnodes_per_TG; rk++)
469     {
470       bool done = false;
471       do
472       {
473         // resize to receive
474         ivec.resize(2 * nterms_per_blk);
475         vvec.resize(nterms_per_blk);
476 #if defined(QMC_COMPLEX)
477         MPI_Recv(vvec.data(), 2 * nterms_per_blk, MPI_DOUBLE, ranks[rk], MPI_ANY_TAG, TG.getTGCOMM(), &st);
478 #else
479         MPI_Recv(vvec.data(), nterms_per_blk, MPI_DOUBLE, ranks[rk], MPI_ANY_TAG, TG.getTGCOMM(), &st);
480 #endif
481         MPI_Recv(ivec.data(), 2 * nterms_per_blk, MPI_INT, ranks[rk], MPI_ANY_TAG, TG.getTGCOMM(), &st);
482 
483         int nt_;
484         MPI_Get_count(&st, MPI_INT, &nt_);
485         nterms = std::size_t(nt_ / 2);
486         if (st.MPI_TAG == MORE)
487         {
488           assert(nterms == nterms_per_blk);
489         }
490         else if (st.MPI_TAG == LAST)
491         {
492           assert(nterms <= nterms_per_blk);
493           done = true;
494         }
495         else
496         {
497           APP_ABORT(" Error: This should not happen. \n\n\n");
498         }
499 
500         ntot += std::size_t(nterms);
501         bsize.push_back(nterms);
502         // resize to write
503         ivec.resize(2 * nterms);
504         vvec.resize(nterms);
505         dump.write(ivec, std::string("Spvn_index_") + std::to_string(nblk));
506         dump.write(vvec, std::string("Spvn_vals_") + std::to_string(nblk));
507         nblk++;
508       } while (!done);
509     }
510 
511     dump.write(bsize, "Spvn_block_sizes");
512   }
513   else if (nnodes_per_TG > 1 && core_rank == 0)
514   {
515     std::size_t sz = SpM.size();
516     int nt, nb_loc = int(sz / nterms_per_blk + std::min(sz % nterms_per_blk, std::size_t(1)));
517     MPI_Reduce(&nb_loc, &nt, 1, MPI_INT, MPI_SUM, 0, TG.getTGCOMM());
518 
519     std::vector<IndexType> ivec;
520     std::vector<ValueType> vvec;
521     ivec.reserve(2 * nterms_per_blk);
522     vvec.reserve(nterms_per_blk);
523 
524     typename SpMatrix::int_iterator col = SpM.cols_begin();
525     typename SpMatrix::int_iterator row = SpM.rows_begin();
526     typename SpMatrix::iterator val     = SpM.vals_begin();
527 
528     int cnt        = 0, nterms;
529     IndexType nblk = 0;
530     for (int i = 0; i < nb_loc; i++)
531     {
532       if ((sz - std::size_t(cnt)) < std::size_t(nterms_per_blk))
533         nterms = int(sz - std::size_t(cnt));
534       else
535         nterms = nterms_per_blk;
536       ivec.clear();
537       vvec.clear();
538       for (int k = 0; k < nterms; k++, cnt++, row++, col++, val++)
539       {
540         ivec.push_back(*row);
541         ivec.push_back(*col);
542         vvec.push_back(static_cast<ValueType>(*val));
543       }
544 #if defined(QMC_COMPLEX)
545       MPI_Send(vvec.data(), 2 * nterms, MPI_DOUBLE, 0, (i == nb_loc - 1) ? (LAST) : (MORE), TG.getTGCOMM());
546 #else
547       MPI_Send(vvec.data(), nterms, MPI_DOUBLE, 0, (i == nb_loc - 1) ? (LAST) : (MORE), TG.getTGCOMM());
548 #endif
549       MPI_Send(ivec.data(), 2 * nterms, MPI_INT, 0, (i == nb_loc - 1) ? (LAST) : (MORE), TG.getTGCOMM());
550     }
551   }
552   else if (nnodes_per_TG > 1)
553   {
554     int nb_loc = 0, nt;
555     MPI_Reduce(&nb_loc, &nt, 1, MPI_INT, MPI_SUM, 0, TG.getTGCOMM());
556   }
557   return std::make_tuple(nblocks, ntot);
558 }
559 
560 } // namespace afqmc
561 
562 } // namespace qmcplusplus
563 
564 #endif
565