1 ////////////////////////////////////////////////////////////////////////////////
2 // This file is distributed under the University of Illinois/NCSA Open Source
3 // License.  See LICENSE file in top directory for details.
4 //
5 // Copyright (c) 2016 Jeongnim Kim and QMCPACK developers.
6 //
7 // File developed by:
8 //
9 // File created by:
10 // Miguel Morales, moralessilva2@llnl.gov, Lawrence Livermore National Laboratory
11 ////////////////////////////////////////////////////////////////////////////////
12 
13 #ifndef SPARSE_CSR_MATRIX_CONSTRUCT_HPP
14 #define SPARSE_CSR_MATRIX_CONSTRUCT_HPP
15 
16 #include <complex>
17 #include <array>
18 #include <cassert>
19 #include <iostream>
20 #include <vector>
21 #include <tuple>
22 #include <numeric>
23 #include <memory>
24 #include <type_traits> // enable_if
25 #include <algorithm>
26 #include <mutex>
27 
28 #include "Configuration.h"
29 #include "Utilities/FairDivide.h"
30 #include "AFQMC/config.h"
31 #include "AFQMC/Utilities/Utils.hpp"
32 #include "mpi3/shared_communicator.hpp"
33 #include "mpi3/shared_window.hpp"
34 #include "mpi3/shm/mutex.hpp"
35 #include "AFQMC/Matrix/csr_matrix.hpp"
36 #include "AFQMC/Utilities/myTimer.h"
37 #include "AFQMC/Numerics/detail/utilities.hpp"
38 
39 #include "multi/array.hpp"
40 #include "multi/array_ref.hpp"
41 
42 namespace csr
43 {
44 namespace shm
45 {
46 /*
47  * Constructs a csr_matrix from the elements of the multi::array M applying a truncation.
48  * The input matrix is only meaningful in the core with rank 0
49  */
50 template<class CSR,
51          class MultiArray2D,
52          typename = typename std::enable_if<std::decay<MultiArray2D>::type::dimensionality == 2>::type>
53 CSR construct_csr_matrix_single_input(MultiArray2D&& M, double cutoff, char TA, boost::mpi3::shared_communicator& comm)
54 {
55   assert(TA == 'N' || TA == 'T' || TA == 'H');
56   std::vector<std::size_t> counts;
57   using int_type = typename CSR::index_type;
58   int_type nr, nc;
59   if (comm.rank() == 0)
60   {
61     if (TA == 'N')
62     {
63       nr = M.size(0);
64       nc = M.size(1);
65       counts.resize(nr);
66       for (int_type i = 0; i < nr; i++)
67         for (int_type j = 0; j < nc; j++)
68           if (std::abs(M[i][j]) > cutoff)
69             ++counts[i];
70     }
71     else
72     {
73       nr = M.size(1);
74       nc = M.size(0);
75       counts.resize(nr);
76       for (int_type i = 0; i < M.size(0); i++)
77         for (int_type j = 0; j < M.size(1); j++)
78           if (std::abs(M[i][j]) > cutoff)
79             ++counts[j];
80     }
81   }
82   comm.broadcast_value(nr);
83   comm.broadcast_value(nc);
84   if (comm.rank() != 0)
85     counts.resize(nr);
86   comm.broadcast_n(counts.begin(), counts.size());
87 
88   CSR csr_mat(std::tuple<std::size_t, std::size_t>{nr, nc}, std::tuple<std::size_t, std::size_t>{0, 0}, counts,
89               qmcplusplus::afqmc::shared_allocator<typename CSR::value_type>(comm));
90 
91   if (comm.rank() == 0)
92   {
93     if (TA == 'N')
94     {
95       for (int_type i = 0; i < nr; i++)
96         for (int_type j = 0; j < nc; j++)
97           if (std::abs(M[i][j]) > cutoff)
98             csr_mat.emplace_back({i, j}, static_cast<typename CSR::value_type>(M[i][j]));
99     }
100     else if (TA == 'T')
101     {
102       for (int_type i = 0; i < M.size(1); i++)
103         for (int_type j = 0; j < M.size(0); j++)
104           if (std::abs(M[j][i]) > cutoff)
105             csr_mat.emplace_back({i, j}, static_cast<typename CSR::value_type>(M[j][i]));
106     }
107     else if (TA == 'H')
108     {
109       for (int_type i = 0; i < M.size(1); i++)
110         for (int_type j = 0; j < M.size(0); j++)
111           if (std::abs(M[j][i]) > cutoff)
112             csr_mat.emplace_back({i, j}, static_cast<typename CSR::value_type>(ma::conj(M[j][i])));
113     }
114   }
115   csr_mat.remove_empty_spaces();
116   comm.barrier();
117 
118   return csr_mat;
119 }
120 
121 /*
122  * Constructs a csr_matrix from the elements in the container (of tuples).
123  * All cores in the communicator contribute values.
124  */
125 template<class CSR, class Container>
126 CSR construct_csr_matrix_multiple_input(Container const& M,
127                                         std::size_t nr,
128                                         std::size_t nc,
129                                         char TA,
130                                         boost::mpi3::shared_communicator& comm)
131 {
132   myTimer Timer;
133   Timer.reset("G0");
134   Timer.start("G0");
135 
136   assert(TA == 'N' || TA == 'T' || TA == 'H');
137   using std::get;
138   using VType = typename CSR::value_type;
139   using IType = typename CSR::index_type;
140   using PType = typename CSR::int_type;
141   using UCSR  = typename CSR::base;
142 
143   std::vector<std::size_t> counts(nr);
144   if (TA == 'N')
145     for (auto& v : M)
146       ++counts[get<0>(v)];
147   else
148     for (auto& v : M)
149       ++counts[get<1>(v)];
150   comm.all_reduce_in_place_n(counts.begin(), counts.size(), std::plus<>());
151 
152   Timer.stop("G0");
153   qmcplusplus::app_log() << " In construct_csr_matrix_multiple_input allreduce: " << Timer.total("G0") << std::endl;
154   Timer.reset("G0");
155   Timer.start("G0");
156 
157   UCSR ucsr_mat(std::tuple<std::size_t, std::size_t>{nr, nc}, std::tuple<std::size_t, std::size_t>{0, 0}, counts,
158                 qmcplusplus::afqmc::shared_allocator<VType>(comm));
159 
160   for (std::size_t r = 0; r < comm.size(); r++)
161   {
162     comm.barrier();
163     if (comm.rank() == r)
164     {
165       if (TA == 'N')
166         for (auto& v : M)
167           ucsr_mat.emplace({get<0>(v), get<1>(v)}, get<2>(v));
168       else if (TA == 'T')
169         for (auto& v : M)
170           ucsr_mat.emplace({get<1>(v), get<0>(v)}, get<2>(v));
171       else if (TA == 'H')
172         for (auto& v : M)
173           ucsr_mat.emplace({get<1>(v), get<0>(v)}, ma::conj(get<2>(v)));
174     }
175     comm.barrier();
176   }
177 
178   Timer.stop("G0");
179   qmcplusplus::app_log() << " In construct_csr_matrix_multiple_input emplace: " << Timer.total("G0") << std::endl;
180 
181   return CSR(std::move(ucsr_mat));
182 }
183 
184 /*
185  * Constructs a new csr_matrix by adding to the given csr_matrix::base all the elements
186  * on the other nodes in TG.Global().
187  * All nodes (including all TGs) contribute values.
188  * All nodes involved in the calculation obtain identical copies of the resulting csr matrix.
189  */
190 template<class CSR, class task_group>
191 CSR construct_csr_matrix_from_distributed_ucsr(typename CSR::base&& ucsr, task_group& TG)
192 {
193   if (ucsr.size(0) == 0 || ucsr.size(1) == 0)
194     return CSR(std::move(ucsr));
195   ;
196   int ncores = TG.getTotalCores(), coreid = TG.getCoreID();
197   int nnodes = TG.getTotalNodes(), nodeid = TG.getNodeID();
198 
199   std::size_t ak0, ak1;
200   std::tie(ak0, ak1) = FairDivideBoundary(std::size_t(coreid), ucsr.size(0), std::size_t(ncores));
201 
202   std::vector<std::size_t> counts_local(ak1 - ak0);
203   std::vector<std::size_t> counts_global(ucsr.size(0));
204   for (std::size_t r = ak0, r0 = 0; r < ak1; ++r, ++r0)
205   {
206     counts_local[r0] = std::size_t(*ucsr.pointers_end(r) - *ucsr.pointers_begin(r));
207     counts_global[r] = counts_local[r0];
208   }
209   TG.Global().all_reduce_in_place_n(counts_global.begin(), counts_global.size(), std::plus<>());
210 
211   if (std::accumulate(counts_global.begin(), counts_global.end(), std::size_t(0)) == 0)
212     return CSR(std::move(ucsr));
213   ;
214 
215   // if this has been previously "reserved", it will do nothing
216   ucsr.reserve(counts_global);
217 
218   std::size_t nterms = std::accumulate(counts_local.begin(), counts_local.end(), std::size_t(0));
219   auto sz_per_node   = TG.Cores().all_gather_value(nterms);
220   std::size_t maxnterms(*std::max_element(sz_per_node.begin(), sz_per_node.end()));
221   std::vector<typename CSR::value_type> vals(maxnterms);
222   std::vector<typename CSR::index_type> cols(maxnterms);
223   std::vector<std::size_t> counts(counts_local.size());
224   for (int ni = 0; ni < nnodes; ++ni)
225   {
226     if (nodeid == ni)
227     {
228       auto v0 = vals.begin();
229       auto c0 = cols.begin();
230       std::copy(counts_local.begin(), counts_local.end(), counts.begin());
231       for (std::size_t r = ak0, r0 = 0; r < ak1; ++r, ++r0)
232       {
233         v0 = std::copy_n(to_address(ucsr.non_zero_values_data(r)), counts[r0], v0);
234         c0 = std::copy_n(to_address(ucsr.non_zero_indices2_data(r)), counts[r0], c0);
235       }
236     }
237     TG.Cores().broadcast_n(vals.begin(), sz_per_node[ni], ni);
238     TG.Cores().broadcast_n(cols.begin(), sz_per_node[ni], ni);
239     TG.Cores().broadcast_n(counts.begin(), counts.size(), ni);
240     if (nodeid != ni)
241     {
242       auto itC                   = cols.begin();
243       auto itV                   = vals.begin();
244       typename CSR::index_type r = static_cast<typename CSR::index_type>(ak0);
245       for (auto i : counts)
246       {
247         while (i-- > 0)
248           ucsr.emplace({r, *itC++}, *itV++);
249         ++r;
250       }
251     }
252   }
253   TG.node_barrier();
254   return CSR(std::move(ucsr));
255 }
256 
257 /*
258  * Constructs a new csr_matrix from the elements in the container Q.
259  * All cores (including all TGs) contribute values.
260  * All nodes involved in the calculation obtain identical copies of the resulting csr matrix.
261  */
262 template<class CSR, class Container, class task_group>
263 CSR construct_csr_matrix_from_distributed_containers(Container const& Q,
264                                                      std::size_t nr,
265                                                      std::size_t nc,
266                                                      task_group& TG,
267                                                      bool needs_lock = true)
268 {
269   using std::get;
270   using Type = typename Container::value_type;
271   if (nr == 0 || nc == 0)
272     return CSR(std::tuple<std::size_t, std::size_t>{nr, nc}, std::tuple<std::size_t, std::size_t>{0, 0}, 0,
273                typename CSR::alloc_type(TG.Node()));
274   int ncores = TG.getTotalCores(), coreid = TG.getCoreID();
275   int nnodes = TG.getTotalNodes(), nodeid = TG.getNodeID();
276 
277   std::vector<std::size_t> counts_global(nr);
278   for (auto& v : Q)
279     ++counts_global[get<0>(v)];
280   TG.Global().all_reduce_in_place_n(counts_global.begin(), counts_global.size(), std::plus<>());
281 
282   if (std::accumulate(counts_global.begin(), counts_global.end(), std::size_t(0)) == 0)
283     return CSR(std::tuple<std::size_t, std::size_t>{nr, nc}, std::tuple<std::size_t, std::size_t>{0, 0}, 0,
284                typename CSR::alloc_type(TG.Node()));
285 
286   typename CSR::base ucsr(std::tuple<std::size_t, std::size_t>{nr, nc}, std::tuple<std::size_t, std::size_t>{0, 0},
287                           counts_global, typename CSR::alloc_type(TG.Node()));
288 
289   std::size_t nterms = Q.size();
290   auto sz_per_node   = TG.Cores().all_gather_value(nterms);
291   std::size_t maxnterms(*std::max_element(sz_per_node.begin(), sz_per_node.end()));
292   std::vector<Type> Qc;
293   Qc.reserve(maxnterms);
294   boost::mpi3::shm::mutex m(TG.Node());
295   for (int ni = 0; ni < nnodes; ++ni)
296   {
297     Qc.resize(sz_per_node[ni]);
298     if (nodeid == ni)
299       std::copy_n(Q.begin(), sz_per_node[ni], Qc.begin());
300     //TG.Cores().broadcast_n(Qc.begin(),sz_per_node[ni],ni);
301     // replace when tuples can me communicated directly
302     MPI_Bcast(Qc.data(), sz_per_node[ni] * sizeof(Type), MPI_CHAR, ni, &(TG.Cores()));
303     if (needs_lock)
304     {
305       std::lock_guard<boost::mpi3::shm::mutex> guard(m);
306       for (auto& v : Qc)
307         ucsr.emplace({get<0>(v), get<1>(v)}, get<2>(v));
308     }
309     else
310     {
311       for (auto& v : Qc)
312         ucsr.emplace({get<0>(v), get<1>(v)}, get<2>(v));
313     }
314   }
315   TG.node_barrier();
316   return CSR(std::move(ucsr));
317 }
318 
319 /*
320  * Constructs a new csr_matrix from the elements in the container Q.
321  * The global matrix (including all elements in all cores) will be evenly distributed
322  * accross the nodes in every task group. No particular stucture will be followed in the
323  * partitioning, only strict distribution of non-zero elements.
324  * All TGs will have identical distributions among its nodes.
325  * This approach uses more memory (up to 2 copies of the submatrix), but avoids
326  * having to execute code to count the number of terms ahead of time.
327  * Current algorithm:
328  *   1. all_gather on equivalent nodes on a TG. This will leave every TG with the entire matrix.
329  *   2. Count elements and determine final nnz per node.
330  *   3. Exchange elements between nodes to achieve desired distribution
331  *        i. Cores with too many elements sends excess to head cores of nodes with too few
332  *   4. construct CSR
333  */
334 template<class CSR, class Container, class task_group>
335 CSR construct_distributed_csr_matrix_from_distributed_containers(Container& Q,
336                                                                  std::size_t nr,
337                                                                  std::size_t nc,
338                                                                  task_group& TG)
339 {
340   myTimer Timer;
341   Timer.reset("G0");
342   Timer.start("G0");
343 
344   using std::get;
345   using Type = typename Container::value_type;
346   if (nr == 0 || nc == 0)
347     return CSR(std::tuple<std::size_t, std::size_t>{nr, nc}, std::tuple<std::size_t, std::size_t>{0, 0}, 0,
348                typename CSR::alloc_type(TG.Node()));
349   int ncores = TG.getTotalCores(), coreid = TG.getCoreID();
350   int nodeid        = TG.getNodeID();
351   int node_number   = TG.getLocalGroupNumber();
352   int nnodes_per_TG = TG.getNGroupsPerTG();
353 
354   // 1. Define new communicator for equivalent cores
355   boost::mpi3::communicator eq_cores(TG.Cores().split(node_number, TG.Cores().rank()));
356   // this comm is similar to TG.TG, but includes all the cores in the node in the same comm
357   // it is used to balance the distribution within the nodes on each TG.TG
358   boost::mpi3::communicator eq_node_group(TG.Global().split(nodeid / nnodes_per_TG, TG.Global().rank()));
359 
360   // 2. count elements
361   long nterms_total            = (TG.Global() += Q.size());
362   auto nterms_per_core         = eq_cores.all_gather_value(Q.size());
363   long nterms_in_local_core    = std::accumulate(nterms_per_core.begin(), nterms_per_core.end(), long(0));
364   auto nterms_per_core_in_node = TG.Node().all_gather_value(nterms_in_local_core);
365   long nterms_node_before_loadbalance =
366       std::accumulate(nterms_per_core_in_node.begin(), nterms_per_core_in_node.end(), long(0));
367   std::vector<long> bounds(nnodes_per_TG + 1);
368   FairDivide(size_t(nterms_total), nnodes_per_TG, bounds);
369 
370   // all extra terms are received by CoreID==0
371   if (coreid == 0)
372     Q.reserve(nterms_in_local_core +
373               std::max(long(0), bounds[node_number + 1] - bounds[node_number] - nterms_node_before_loadbalance));
374   else
375     Q.reserve(nterms_in_local_core);
376 
377   // 3. all_gather_v on eq_cores (use mpi3 when tested and ready)
378   //     i. move data in Q to allow use of MPI_IN_PLACE in Allgatherv
379   Q.resize(nterms_in_local_core);
380   if (eq_cores.rank() > 0)
381   {
382     size_t n = std::accumulate(nterms_per_core.begin(), nterms_per_core.begin() + eq_cores.rank() + 1, size_t(0));
383     std::copy_backward(Q.begin(), Q.begin() + nterms_per_core[eq_cores.rank()], Q.begin() + n);
384   }
385 
386   Timer.stop("G0");
387   qmcplusplus::app_log() << " In construct_distributed_csr_matrix_from_distributed_containers setup: "
388                          << Timer.total("G0") << std::endl;
389   Timer.reset("G0");
390   Timer.start("G0");
391 
392   std::vector<int> recvcounts(eq_cores.size());
393   std::vector<int> displ(eq_cores.size());
394   long cnt(0);
395   for (int p = 0; p < eq_cores.size(); p++)
396   {
397     if (nterms_per_core[p] * sizeof(Type) >= static_cast<long>(std::numeric_limits<int>::max()))
398       throw std::out_of_range("row size exceeded the maximum");
399     recvcounts[p] = int(nterms_per_core[p] * sizeof(Type));
400     displ[p]      = int(cnt * sizeof(Type));
401     cnt += nterms_per_core[p];
402     if (cnt * sizeof(Type) >= static_cast<long>(std::numeric_limits<int>::max()))
403       throw std::out_of_range("row size exceeded the maximum");
404   }
405   MPI_Allgatherv(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL, Q.data(), recvcounts.data(), displ.data(), MPI_CHAR, eq_cores.get());
406 
407   Timer.stop("G0");
408   qmcplusplus::app_log() << " In construct_distributed_csr_matrix_from_distributed_containers Allgatherv: "
409                          << Timer.total("G0") << std::endl;
410   Timer.reset("G0");
411   Timer.start("G0");
412 
413   boost::multi::array<long, 2> counts_per_core({nnodes_per_TG, ncores});
414   std::fill_n(counts_per_core.origin(), ncores * nnodes_per_TG, long(0));
415   counts_per_core[node_number][coreid] = nterms_in_local_core;
416   eq_node_group.all_reduce_in_place_n(counts_per_core.origin(), ncores * nnodes_per_TG, std::plus<>());
417 
418   // calculate how many terms each core is potentially sending
419   // when algorithm works, make nplus a vector of pairs to save space, no need to store mostly zeros
420   boost::multi::array<long, 2> nplus({nnodes_per_TG, ncores});
421   std::fill_n(nplus.origin(), ncores * nnodes_per_TG, long(0));
422   auto tgrank = eq_node_group.rank();
423   // number of terms I'm sending
424   long deltaN = 0;
425   // excess terms before me
426   long nbefore = 0;
427   for (int ni = 0, ip = 0; ni < nnodes_per_TG; ++ni)
428   {
429     long nuptome = 0;
430     for (int ci = 0; ci < ncores; ++ci, ++ip)
431     {
432       nuptome += counts_per_core[ni][ci];
433       long dn = std::min(counts_per_core[ni][ci], nuptome - (bounds[ni + 1] - bounds[ni]));
434       if (dn > long(0))
435       {
436         if (ip < tgrank)
437           nbefore += long(dn);
438         if (ip == tgrank)
439           deltaN = long(dn);
440         nplus[ni][ci] = dn;
441       }
442     }
443   }
444 
445   // calculate how many terms each head core is expecting
446   std::vector<long> nminus(nnodes_per_TG);
447   for (int i = 0; i < nnodes_per_TG; i++)
448   {
449     long dn = (bounds[i + 1] - bounds[i]) -
450         std::accumulate(counts_per_core[i].origin(), counts_per_core[i].origin() + ncores, long(0));
451     if (dn > 0)
452       nminus[i] = long(dn);
453   }
454 
455   Timer.stop("G0");
456   qmcplusplus::app_log() << " In construct_distributed_csr_matrix_from_distributed_containers setup comm: "
457                          << Timer.total("G0") << std::endl;
458   Timer.reset("G0");
459   Timer.start("G0");
460 
461   /*
462 qmcplusplus::app_log()<<nnodes_per_TG <<" " <<ncores <<"\n";
463 for(int i=0; i<nnodes_per_TG; i++)
464  qmcplusplus::app_log()<<nminus[i] <<" ";
465 qmcplusplus::app_log()<<std::endl;
466 for(int i=0; i<nnodes_per_TG; i++) {
467  for(int j=0; j<ncores; j++)
468   qmcplusplus::app_log()<<nplus[i][j] <<" ";
469  qmcplusplus::app_log()<<std::endl;
470 }
471 
472 std::ofstream out((std::string("debug.")+std::to_string(TG.Global().rank())).c_str());
473 */
474   //
475   if (deltaN > long(0))
476   {
477     // I'm sending somewhere
478     long nsent(0);
479     for (int i = 0; i < nnodes_per_TG; i++)
480     {
481       if (nminus[i] == long(0))
482         continue;
483       if (nsent + nminus[i] > nbefore)
484       {
485         // send to i
486         long to_send = std::min(deltaN, nsent + nminus[i] - nbefore);
487         if (to_send * sizeof(Type) >= static_cast<long>(std::numeric_limits<int>::max()))
488           throw std::out_of_range("row size exceeded the maximum");
489         int to_send_int = int(to_send * sizeof(Type));
490         //out<<" sending " <<to_send <<" " <<deltaN <<" " <<i*ncores <<" " <<i <<std::endl;
491         MPI_Send(Q.data() + Q.size() - to_send, to_send_int, MPI_CHAR, i * ncores, tgrank, eq_node_group.get());
492         //out<<" done sending " <<to_send <<" " <<deltaN <<" " <<i*ncores <<" " <<i <<std::endl;
493         nbefore += to_send;
494         deltaN -= to_send;
495         while (to_send-- > long(0))
496           Q.pop_back();
497       }
498       nsent += nminus[i];
499       if (deltaN == long(0))
500         break;
501     }
502     if (deltaN > long(0))
503       throw std::out_of_range("detlaN > 0");
504   }
505   else if (coreid == 0 && nminus[node_number] > long(0))
506   {
507     deltaN  = nminus[node_number];
508     nbefore = std::accumulate(nminus.begin(), nminus.begin() + node_number, long(0));
509     long nsent(0);
510     MPI_Status st;
511     for (int ni = 0, ip = 0; ni < nnodes_per_TG; ni++)
512     {
513       for (int ci = 0; ci < ncores; ++ci, ++ip)
514       {
515         if (nplus[ni][ci] == long(0))
516           continue;
517         if (nsent + nplus[ni][ci] > nbefore)
518         {
519           // receive from ip
520           long to_recv = std::min(deltaN, nsent + nplus[ni][ci] - nbefore);
521           if (to_recv * sizeof(Type) >= static_cast<long>(std::numeric_limits<int>::max()))
522             throw std::out_of_range("row size exceeded the maximum");
523           int to_recv_int = int(to_recv * sizeof(Type));
524           long curr_sz    = Q.size();
525           Q.resize(curr_sz + to_recv);
526           //out<<" receiving " <<to_recv <<" " <<deltaN <<" " <<nbefore <<" " <<ip <<std::endl;
527           MPI_Recv(Q.data() + curr_sz, to_recv_int, MPI_CHAR, ip, ip, eq_node_group.get(), &st);
528           nbefore += to_recv;
529           deltaN -= to_recv;
530           //out<<" done receiving " <<to_recv <<" " <<deltaN <<" " <<nbefore <<" " <<ip <<std::endl;
531         }
532         nsent += nplus[ni][ci];
533         if (deltaN == long(0))
534           break;
535       }
536       if (deltaN == long(0))
537         break;
538     }
539     if (deltaN > long(0))
540       throw std::out_of_range("detlaN > 0");
541   }
542   else
543   {
544     //out<<" nothing to do. \n";
545   }
546 
547   //out.close();
548 
549   Timer.stop("G0");
550   qmcplusplus::app_log() << " In construct_distributed_csr_matrix_from_distributed_containers send/recv: "
551                          << Timer.total("G0") << std::endl;
552   Timer.reset("G0");
553   Timer.start("G0");
554 
555 
556   long final_nterms_node = (TG.Node() += Q.size());
557   std::vector<long> final_counts(TG.Cores().size());
558   TG.Cores().gather_n(&final_nterms_node, 1, final_counts.data(), 0);
559   if (TG.Global().root())
560   {
561     qmcplusplus::app_log() << " In construct_distributed_csr_matrix_from_distributed_containers: \n";
562     qmcplusplus::app_log() << " Partitioning of CSR matrix over TG (nnz per node): ";
563     for (int i = 0; i < nnodes_per_TG; i++)
564       qmcplusplus::app_log() << final_counts[i] << " ";
565     qmcplusplus::app_log() << std::endl;
566   }
567 
568   TG.node_barrier();
569   return construct_csr_matrix_multiple_input<CSR>(Q, nr, nc, 'N', TG.Node());
570 }
571 
572 // assume partition by row
573 template<class CSR, class task_group, typename integer = int>
local_balanced_partition(CSR & M,task_group & TG)574 typename CSR::template matrix_view<integer> local_balanced_partition(CSR& M, task_group& TG)
575 {
576   using std::size_t;
577   using array_ = std::array<size_t, 4>;
578   if (TG.getNCoresPerTG() == 1)
579   {
580     return M[array_{0, M.size(0), 0, M.size(1)}];
581   }
582   else
583   {
584     std::vector<size_t> bounds(TG.getNCoresPerTG() + 1);
585     if (TG.getCoreID() == 0)
586     {
587       std::vector<size_t> indx(M.size(0) + 1);
588       indx[0] = size_t(0);
589       for (size_t i = 0, cnt = 0; i < M.size(0); i++)
590       {
591         cnt += size_t(M.pointers_end()[i] - M.pointers_begin()[i]);
592         indx[i + 1] = cnt;
593       }
594       qmcplusplus::afqmc::balance_partition_ordered_set(indx, bounds);
595       if (TG.Global().root())
596       {
597         qmcplusplus::app_log() << std::endl << "Partition of csr matrix (rows) over cores in local TG: ";
598         for (int i = 0; i <= TG.getNCoresPerTG(); i++)
599           qmcplusplus::app_log() << bounds[i] << " ";
600         qmcplusplus::app_log() << std::endl;
601         qmcplusplus::app_log() << "Number of terms in csr matrix view per core: ";
602         for (int i = 0; i < TG.getNCoresPerTG(); i++)
603           qmcplusplus::app_log() << indx[bounds[i + 1]] - indx[bounds[i]] << " ";
604         qmcplusplus::app_log() << std::endl << std::endl;
605       }
606     }
607     TG.Node().broadcast_n(bounds.begin(), bounds.size());
608     return M[array_{bounds[TG.getLocalTGRank()], bounds[TG.getLocalTGRank() + 1], 0, M.size(1)}];
609     //return M[array_{0,M.size(0),0,M.size(1)}];
610   }
611 }
612 
613 /*
614  * Constructs a vector of csr_matrix as a copy from a given csr_matrix but casted to single precision
615  */
616 template<class CSRsp, class CSR>
CSRvector_to_single_precision(std::vector<CSR> const & A)617 std::vector<CSRsp> CSRvector_to_single_precision(std::vector<CSR> const& A)
618 {
619   using Alloc = typename CSRsp::alloc_type;
620   std::vector<CSRsp> B;
621   B.reserve(A.size());
622   for (auto& v : A)
623     B.emplace_back(CSRsp(v, Alloc{v.getAlloc()}));
624   return B;
625 }
626 
627 } // namespace shm
628 
629 } // namespace csr
630 
631 #endif
632