1 ////////////////////////////////////////////////////////////////////////////////
2 // This file is distributed under the University of Illinois/NCSA Open Source
3 // License. See LICENSE file in top directory for details.
4 //
5 // Copyright (c) 2016 Jeongnim Kim and QMCPACK developers.
6 //
7 // File developed by:
8 //
9 // File created by:
10 // Miguel Morales, moralessilva2@llnl.gov, Lawrence Livermore National Laboratory
11 ////////////////////////////////////////////////////////////////////////////////
12
13 #ifndef SPARSE_CSR_MATRIX_CONSTRUCT_HPP
14 #define SPARSE_CSR_MATRIX_CONSTRUCT_HPP
15
16 #include <complex>
17 #include <array>
18 #include <cassert>
19 #include <iostream>
20 #include <vector>
21 #include <tuple>
22 #include <numeric>
23 #include <memory>
24 #include <type_traits> // enable_if
25 #include <algorithm>
26 #include <mutex>
27
28 #include "Configuration.h"
29 #include "Utilities/FairDivide.h"
30 #include "AFQMC/config.h"
31 #include "AFQMC/Utilities/Utils.hpp"
32 #include "mpi3/shared_communicator.hpp"
33 #include "mpi3/shared_window.hpp"
34 #include "mpi3/shm/mutex.hpp"
35 #include "AFQMC/Matrix/csr_matrix.hpp"
36 #include "AFQMC/Utilities/myTimer.h"
37 #include "AFQMC/Numerics/detail/utilities.hpp"
38
39 #include "multi/array.hpp"
40 #include "multi/array_ref.hpp"
41
42 namespace csr
43 {
44 namespace shm
45 {
46 /*
47 * Constructs a csr_matrix from the elements of the multi::array M applying a truncation.
48 * The input matrix is only meaningful in the core with rank 0
49 */
50 template<class CSR,
51 class MultiArray2D,
52 typename = typename std::enable_if<std::decay<MultiArray2D>::type::dimensionality == 2>::type>
53 CSR construct_csr_matrix_single_input(MultiArray2D&& M, double cutoff, char TA, boost::mpi3::shared_communicator& comm)
54 {
55 assert(TA == 'N' || TA == 'T' || TA == 'H');
56 std::vector<std::size_t> counts;
57 using int_type = typename CSR::index_type;
58 int_type nr, nc;
59 if (comm.rank() == 0)
60 {
61 if (TA == 'N')
62 {
63 nr = M.size(0);
64 nc = M.size(1);
65 counts.resize(nr);
66 for (int_type i = 0; i < nr; i++)
67 for (int_type j = 0; j < nc; j++)
68 if (std::abs(M[i][j]) > cutoff)
69 ++counts[i];
70 }
71 else
72 {
73 nr = M.size(1);
74 nc = M.size(0);
75 counts.resize(nr);
76 for (int_type i = 0; i < M.size(0); i++)
77 for (int_type j = 0; j < M.size(1); j++)
78 if (std::abs(M[i][j]) > cutoff)
79 ++counts[j];
80 }
81 }
82 comm.broadcast_value(nr);
83 comm.broadcast_value(nc);
84 if (comm.rank() != 0)
85 counts.resize(nr);
86 comm.broadcast_n(counts.begin(), counts.size());
87
88 CSR csr_mat(std::tuple<std::size_t, std::size_t>{nr, nc}, std::tuple<std::size_t, std::size_t>{0, 0}, counts,
89 qmcplusplus::afqmc::shared_allocator<typename CSR::value_type>(comm));
90
91 if (comm.rank() == 0)
92 {
93 if (TA == 'N')
94 {
95 for (int_type i = 0; i < nr; i++)
96 for (int_type j = 0; j < nc; j++)
97 if (std::abs(M[i][j]) > cutoff)
98 csr_mat.emplace_back({i, j}, static_cast<typename CSR::value_type>(M[i][j]));
99 }
100 else if (TA == 'T')
101 {
102 for (int_type i = 0; i < M.size(1); i++)
103 for (int_type j = 0; j < M.size(0); j++)
104 if (std::abs(M[j][i]) > cutoff)
105 csr_mat.emplace_back({i, j}, static_cast<typename CSR::value_type>(M[j][i]));
106 }
107 else if (TA == 'H')
108 {
109 for (int_type i = 0; i < M.size(1); i++)
110 for (int_type j = 0; j < M.size(0); j++)
111 if (std::abs(M[j][i]) > cutoff)
112 csr_mat.emplace_back({i, j}, static_cast<typename CSR::value_type>(ma::conj(M[j][i])));
113 }
114 }
115 csr_mat.remove_empty_spaces();
116 comm.barrier();
117
118 return csr_mat;
119 }
120
121 /*
122 * Constructs a csr_matrix from the elements in the container (of tuples).
123 * All cores in the communicator contribute values.
124 */
125 template<class CSR, class Container>
126 CSR construct_csr_matrix_multiple_input(Container const& M,
127 std::size_t nr,
128 std::size_t nc,
129 char TA,
130 boost::mpi3::shared_communicator& comm)
131 {
132 myTimer Timer;
133 Timer.reset("G0");
134 Timer.start("G0");
135
136 assert(TA == 'N' || TA == 'T' || TA == 'H');
137 using std::get;
138 using VType = typename CSR::value_type;
139 using IType = typename CSR::index_type;
140 using PType = typename CSR::int_type;
141 using UCSR = typename CSR::base;
142
143 std::vector<std::size_t> counts(nr);
144 if (TA == 'N')
145 for (auto& v : M)
146 ++counts[get<0>(v)];
147 else
148 for (auto& v : M)
149 ++counts[get<1>(v)];
150 comm.all_reduce_in_place_n(counts.begin(), counts.size(), std::plus<>());
151
152 Timer.stop("G0");
153 qmcplusplus::app_log() << " In construct_csr_matrix_multiple_input allreduce: " << Timer.total("G0") << std::endl;
154 Timer.reset("G0");
155 Timer.start("G0");
156
157 UCSR ucsr_mat(std::tuple<std::size_t, std::size_t>{nr, nc}, std::tuple<std::size_t, std::size_t>{0, 0}, counts,
158 qmcplusplus::afqmc::shared_allocator<VType>(comm));
159
160 for (std::size_t r = 0; r < comm.size(); r++)
161 {
162 comm.barrier();
163 if (comm.rank() == r)
164 {
165 if (TA == 'N')
166 for (auto& v : M)
167 ucsr_mat.emplace({get<0>(v), get<1>(v)}, get<2>(v));
168 else if (TA == 'T')
169 for (auto& v : M)
170 ucsr_mat.emplace({get<1>(v), get<0>(v)}, get<2>(v));
171 else if (TA == 'H')
172 for (auto& v : M)
173 ucsr_mat.emplace({get<1>(v), get<0>(v)}, ma::conj(get<2>(v)));
174 }
175 comm.barrier();
176 }
177
178 Timer.stop("G0");
179 qmcplusplus::app_log() << " In construct_csr_matrix_multiple_input emplace: " << Timer.total("G0") << std::endl;
180
181 return CSR(std::move(ucsr_mat));
182 }
183
184 /*
185 * Constructs a new csr_matrix by adding to the given csr_matrix::base all the elements
186 * on the other nodes in TG.Global().
187 * All nodes (including all TGs) contribute values.
188 * All nodes involved in the calculation obtain identical copies of the resulting csr matrix.
189 */
190 template<class CSR, class task_group>
191 CSR construct_csr_matrix_from_distributed_ucsr(typename CSR::base&& ucsr, task_group& TG)
192 {
193 if (ucsr.size(0) == 0 || ucsr.size(1) == 0)
194 return CSR(std::move(ucsr));
195 ;
196 int ncores = TG.getTotalCores(), coreid = TG.getCoreID();
197 int nnodes = TG.getTotalNodes(), nodeid = TG.getNodeID();
198
199 std::size_t ak0, ak1;
200 std::tie(ak0, ak1) = FairDivideBoundary(std::size_t(coreid), ucsr.size(0), std::size_t(ncores));
201
202 std::vector<std::size_t> counts_local(ak1 - ak0);
203 std::vector<std::size_t> counts_global(ucsr.size(0));
204 for (std::size_t r = ak0, r0 = 0; r < ak1; ++r, ++r0)
205 {
206 counts_local[r0] = std::size_t(*ucsr.pointers_end(r) - *ucsr.pointers_begin(r));
207 counts_global[r] = counts_local[r0];
208 }
209 TG.Global().all_reduce_in_place_n(counts_global.begin(), counts_global.size(), std::plus<>());
210
211 if (std::accumulate(counts_global.begin(), counts_global.end(), std::size_t(0)) == 0)
212 return CSR(std::move(ucsr));
213 ;
214
215 // if this has been previously "reserved", it will do nothing
216 ucsr.reserve(counts_global);
217
218 std::size_t nterms = std::accumulate(counts_local.begin(), counts_local.end(), std::size_t(0));
219 auto sz_per_node = TG.Cores().all_gather_value(nterms);
220 std::size_t maxnterms(*std::max_element(sz_per_node.begin(), sz_per_node.end()));
221 std::vector<typename CSR::value_type> vals(maxnterms);
222 std::vector<typename CSR::index_type> cols(maxnterms);
223 std::vector<std::size_t> counts(counts_local.size());
224 for (int ni = 0; ni < nnodes; ++ni)
225 {
226 if (nodeid == ni)
227 {
228 auto v0 = vals.begin();
229 auto c0 = cols.begin();
230 std::copy(counts_local.begin(), counts_local.end(), counts.begin());
231 for (std::size_t r = ak0, r0 = 0; r < ak1; ++r, ++r0)
232 {
233 v0 = std::copy_n(to_address(ucsr.non_zero_values_data(r)), counts[r0], v0);
234 c0 = std::copy_n(to_address(ucsr.non_zero_indices2_data(r)), counts[r0], c0);
235 }
236 }
237 TG.Cores().broadcast_n(vals.begin(), sz_per_node[ni], ni);
238 TG.Cores().broadcast_n(cols.begin(), sz_per_node[ni], ni);
239 TG.Cores().broadcast_n(counts.begin(), counts.size(), ni);
240 if (nodeid != ni)
241 {
242 auto itC = cols.begin();
243 auto itV = vals.begin();
244 typename CSR::index_type r = static_cast<typename CSR::index_type>(ak0);
245 for (auto i : counts)
246 {
247 while (i-- > 0)
248 ucsr.emplace({r, *itC++}, *itV++);
249 ++r;
250 }
251 }
252 }
253 TG.node_barrier();
254 return CSR(std::move(ucsr));
255 }
256
257 /*
258 * Constructs a new csr_matrix from the elements in the container Q.
259 * All cores (including all TGs) contribute values.
260 * All nodes involved in the calculation obtain identical copies of the resulting csr matrix.
261 */
262 template<class CSR, class Container, class task_group>
263 CSR construct_csr_matrix_from_distributed_containers(Container const& Q,
264 std::size_t nr,
265 std::size_t nc,
266 task_group& TG,
267 bool needs_lock = true)
268 {
269 using std::get;
270 using Type = typename Container::value_type;
271 if (nr == 0 || nc == 0)
272 return CSR(std::tuple<std::size_t, std::size_t>{nr, nc}, std::tuple<std::size_t, std::size_t>{0, 0}, 0,
273 typename CSR::alloc_type(TG.Node()));
274 int ncores = TG.getTotalCores(), coreid = TG.getCoreID();
275 int nnodes = TG.getTotalNodes(), nodeid = TG.getNodeID();
276
277 std::vector<std::size_t> counts_global(nr);
278 for (auto& v : Q)
279 ++counts_global[get<0>(v)];
280 TG.Global().all_reduce_in_place_n(counts_global.begin(), counts_global.size(), std::plus<>());
281
282 if (std::accumulate(counts_global.begin(), counts_global.end(), std::size_t(0)) == 0)
283 return CSR(std::tuple<std::size_t, std::size_t>{nr, nc}, std::tuple<std::size_t, std::size_t>{0, 0}, 0,
284 typename CSR::alloc_type(TG.Node()));
285
286 typename CSR::base ucsr(std::tuple<std::size_t, std::size_t>{nr, nc}, std::tuple<std::size_t, std::size_t>{0, 0},
287 counts_global, typename CSR::alloc_type(TG.Node()));
288
289 std::size_t nterms = Q.size();
290 auto sz_per_node = TG.Cores().all_gather_value(nterms);
291 std::size_t maxnterms(*std::max_element(sz_per_node.begin(), sz_per_node.end()));
292 std::vector<Type> Qc;
293 Qc.reserve(maxnterms);
294 boost::mpi3::shm::mutex m(TG.Node());
295 for (int ni = 0; ni < nnodes; ++ni)
296 {
297 Qc.resize(sz_per_node[ni]);
298 if (nodeid == ni)
299 std::copy_n(Q.begin(), sz_per_node[ni], Qc.begin());
300 //TG.Cores().broadcast_n(Qc.begin(),sz_per_node[ni],ni);
301 // replace when tuples can me communicated directly
302 MPI_Bcast(Qc.data(), sz_per_node[ni] * sizeof(Type), MPI_CHAR, ni, &(TG.Cores()));
303 if (needs_lock)
304 {
305 std::lock_guard<boost::mpi3::shm::mutex> guard(m);
306 for (auto& v : Qc)
307 ucsr.emplace({get<0>(v), get<1>(v)}, get<2>(v));
308 }
309 else
310 {
311 for (auto& v : Qc)
312 ucsr.emplace({get<0>(v), get<1>(v)}, get<2>(v));
313 }
314 }
315 TG.node_barrier();
316 return CSR(std::move(ucsr));
317 }
318
319 /*
320 * Constructs a new csr_matrix from the elements in the container Q.
321 * The global matrix (including all elements in all cores) will be evenly distributed
322 * accross the nodes in every task group. No particular stucture will be followed in the
323 * partitioning, only strict distribution of non-zero elements.
324 * All TGs will have identical distributions among its nodes.
325 * This approach uses more memory (up to 2 copies of the submatrix), but avoids
326 * having to execute code to count the number of terms ahead of time.
327 * Current algorithm:
328 * 1. all_gather on equivalent nodes on a TG. This will leave every TG with the entire matrix.
329 * 2. Count elements and determine final nnz per node.
330 * 3. Exchange elements between nodes to achieve desired distribution
331 * i. Cores with too many elements sends excess to head cores of nodes with too few
332 * 4. construct CSR
333 */
334 template<class CSR, class Container, class task_group>
335 CSR construct_distributed_csr_matrix_from_distributed_containers(Container& Q,
336 std::size_t nr,
337 std::size_t nc,
338 task_group& TG)
339 {
340 myTimer Timer;
341 Timer.reset("G0");
342 Timer.start("G0");
343
344 using std::get;
345 using Type = typename Container::value_type;
346 if (nr == 0 || nc == 0)
347 return CSR(std::tuple<std::size_t, std::size_t>{nr, nc}, std::tuple<std::size_t, std::size_t>{0, 0}, 0,
348 typename CSR::alloc_type(TG.Node()));
349 int ncores = TG.getTotalCores(), coreid = TG.getCoreID();
350 int nodeid = TG.getNodeID();
351 int node_number = TG.getLocalGroupNumber();
352 int nnodes_per_TG = TG.getNGroupsPerTG();
353
354 // 1. Define new communicator for equivalent cores
355 boost::mpi3::communicator eq_cores(TG.Cores().split(node_number, TG.Cores().rank()));
356 // this comm is similar to TG.TG, but includes all the cores in the node in the same comm
357 // it is used to balance the distribution within the nodes on each TG.TG
358 boost::mpi3::communicator eq_node_group(TG.Global().split(nodeid / nnodes_per_TG, TG.Global().rank()));
359
360 // 2. count elements
361 long nterms_total = (TG.Global() += Q.size());
362 auto nterms_per_core = eq_cores.all_gather_value(Q.size());
363 long nterms_in_local_core = std::accumulate(nterms_per_core.begin(), nterms_per_core.end(), long(0));
364 auto nterms_per_core_in_node = TG.Node().all_gather_value(nterms_in_local_core);
365 long nterms_node_before_loadbalance =
366 std::accumulate(nterms_per_core_in_node.begin(), nterms_per_core_in_node.end(), long(0));
367 std::vector<long> bounds(nnodes_per_TG + 1);
368 FairDivide(size_t(nterms_total), nnodes_per_TG, bounds);
369
370 // all extra terms are received by CoreID==0
371 if (coreid == 0)
372 Q.reserve(nterms_in_local_core +
373 std::max(long(0), bounds[node_number + 1] - bounds[node_number] - nterms_node_before_loadbalance));
374 else
375 Q.reserve(nterms_in_local_core);
376
377 // 3. all_gather_v on eq_cores (use mpi3 when tested and ready)
378 // i. move data in Q to allow use of MPI_IN_PLACE in Allgatherv
379 Q.resize(nterms_in_local_core);
380 if (eq_cores.rank() > 0)
381 {
382 size_t n = std::accumulate(nterms_per_core.begin(), nterms_per_core.begin() + eq_cores.rank() + 1, size_t(0));
383 std::copy_backward(Q.begin(), Q.begin() + nterms_per_core[eq_cores.rank()], Q.begin() + n);
384 }
385
386 Timer.stop("G0");
387 qmcplusplus::app_log() << " In construct_distributed_csr_matrix_from_distributed_containers setup: "
388 << Timer.total("G0") << std::endl;
389 Timer.reset("G0");
390 Timer.start("G0");
391
392 std::vector<int> recvcounts(eq_cores.size());
393 std::vector<int> displ(eq_cores.size());
394 long cnt(0);
395 for (int p = 0; p < eq_cores.size(); p++)
396 {
397 if (nterms_per_core[p] * sizeof(Type) >= static_cast<long>(std::numeric_limits<int>::max()))
398 throw std::out_of_range("row size exceeded the maximum");
399 recvcounts[p] = int(nterms_per_core[p] * sizeof(Type));
400 displ[p] = int(cnt * sizeof(Type));
401 cnt += nterms_per_core[p];
402 if (cnt * sizeof(Type) >= static_cast<long>(std::numeric_limits<int>::max()))
403 throw std::out_of_range("row size exceeded the maximum");
404 }
405 MPI_Allgatherv(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL, Q.data(), recvcounts.data(), displ.data(), MPI_CHAR, eq_cores.get());
406
407 Timer.stop("G0");
408 qmcplusplus::app_log() << " In construct_distributed_csr_matrix_from_distributed_containers Allgatherv: "
409 << Timer.total("G0") << std::endl;
410 Timer.reset("G0");
411 Timer.start("G0");
412
413 boost::multi::array<long, 2> counts_per_core({nnodes_per_TG, ncores});
414 std::fill_n(counts_per_core.origin(), ncores * nnodes_per_TG, long(0));
415 counts_per_core[node_number][coreid] = nterms_in_local_core;
416 eq_node_group.all_reduce_in_place_n(counts_per_core.origin(), ncores * nnodes_per_TG, std::plus<>());
417
418 // calculate how many terms each core is potentially sending
419 // when algorithm works, make nplus a vector of pairs to save space, no need to store mostly zeros
420 boost::multi::array<long, 2> nplus({nnodes_per_TG, ncores});
421 std::fill_n(nplus.origin(), ncores * nnodes_per_TG, long(0));
422 auto tgrank = eq_node_group.rank();
423 // number of terms I'm sending
424 long deltaN = 0;
425 // excess terms before me
426 long nbefore = 0;
427 for (int ni = 0, ip = 0; ni < nnodes_per_TG; ++ni)
428 {
429 long nuptome = 0;
430 for (int ci = 0; ci < ncores; ++ci, ++ip)
431 {
432 nuptome += counts_per_core[ni][ci];
433 long dn = std::min(counts_per_core[ni][ci], nuptome - (bounds[ni + 1] - bounds[ni]));
434 if (dn > long(0))
435 {
436 if (ip < tgrank)
437 nbefore += long(dn);
438 if (ip == tgrank)
439 deltaN = long(dn);
440 nplus[ni][ci] = dn;
441 }
442 }
443 }
444
445 // calculate how many terms each head core is expecting
446 std::vector<long> nminus(nnodes_per_TG);
447 for (int i = 0; i < nnodes_per_TG; i++)
448 {
449 long dn = (bounds[i + 1] - bounds[i]) -
450 std::accumulate(counts_per_core[i].origin(), counts_per_core[i].origin() + ncores, long(0));
451 if (dn > 0)
452 nminus[i] = long(dn);
453 }
454
455 Timer.stop("G0");
456 qmcplusplus::app_log() << " In construct_distributed_csr_matrix_from_distributed_containers setup comm: "
457 << Timer.total("G0") << std::endl;
458 Timer.reset("G0");
459 Timer.start("G0");
460
461 /*
462 qmcplusplus::app_log()<<nnodes_per_TG <<" " <<ncores <<"\n";
463 for(int i=0; i<nnodes_per_TG; i++)
464 qmcplusplus::app_log()<<nminus[i] <<" ";
465 qmcplusplus::app_log()<<std::endl;
466 for(int i=0; i<nnodes_per_TG; i++) {
467 for(int j=0; j<ncores; j++)
468 qmcplusplus::app_log()<<nplus[i][j] <<" ";
469 qmcplusplus::app_log()<<std::endl;
470 }
471
472 std::ofstream out((std::string("debug.")+std::to_string(TG.Global().rank())).c_str());
473 */
474 //
475 if (deltaN > long(0))
476 {
477 // I'm sending somewhere
478 long nsent(0);
479 for (int i = 0; i < nnodes_per_TG; i++)
480 {
481 if (nminus[i] == long(0))
482 continue;
483 if (nsent + nminus[i] > nbefore)
484 {
485 // send to i
486 long to_send = std::min(deltaN, nsent + nminus[i] - nbefore);
487 if (to_send * sizeof(Type) >= static_cast<long>(std::numeric_limits<int>::max()))
488 throw std::out_of_range("row size exceeded the maximum");
489 int to_send_int = int(to_send * sizeof(Type));
490 //out<<" sending " <<to_send <<" " <<deltaN <<" " <<i*ncores <<" " <<i <<std::endl;
491 MPI_Send(Q.data() + Q.size() - to_send, to_send_int, MPI_CHAR, i * ncores, tgrank, eq_node_group.get());
492 //out<<" done sending " <<to_send <<" " <<deltaN <<" " <<i*ncores <<" " <<i <<std::endl;
493 nbefore += to_send;
494 deltaN -= to_send;
495 while (to_send-- > long(0))
496 Q.pop_back();
497 }
498 nsent += nminus[i];
499 if (deltaN == long(0))
500 break;
501 }
502 if (deltaN > long(0))
503 throw std::out_of_range("detlaN > 0");
504 }
505 else if (coreid == 0 && nminus[node_number] > long(0))
506 {
507 deltaN = nminus[node_number];
508 nbefore = std::accumulate(nminus.begin(), nminus.begin() + node_number, long(0));
509 long nsent(0);
510 MPI_Status st;
511 for (int ni = 0, ip = 0; ni < nnodes_per_TG; ni++)
512 {
513 for (int ci = 0; ci < ncores; ++ci, ++ip)
514 {
515 if (nplus[ni][ci] == long(0))
516 continue;
517 if (nsent + nplus[ni][ci] > nbefore)
518 {
519 // receive from ip
520 long to_recv = std::min(deltaN, nsent + nplus[ni][ci] - nbefore);
521 if (to_recv * sizeof(Type) >= static_cast<long>(std::numeric_limits<int>::max()))
522 throw std::out_of_range("row size exceeded the maximum");
523 int to_recv_int = int(to_recv * sizeof(Type));
524 long curr_sz = Q.size();
525 Q.resize(curr_sz + to_recv);
526 //out<<" receiving " <<to_recv <<" " <<deltaN <<" " <<nbefore <<" " <<ip <<std::endl;
527 MPI_Recv(Q.data() + curr_sz, to_recv_int, MPI_CHAR, ip, ip, eq_node_group.get(), &st);
528 nbefore += to_recv;
529 deltaN -= to_recv;
530 //out<<" done receiving " <<to_recv <<" " <<deltaN <<" " <<nbefore <<" " <<ip <<std::endl;
531 }
532 nsent += nplus[ni][ci];
533 if (deltaN == long(0))
534 break;
535 }
536 if (deltaN == long(0))
537 break;
538 }
539 if (deltaN > long(0))
540 throw std::out_of_range("detlaN > 0");
541 }
542 else
543 {
544 //out<<" nothing to do. \n";
545 }
546
547 //out.close();
548
549 Timer.stop("G0");
550 qmcplusplus::app_log() << " In construct_distributed_csr_matrix_from_distributed_containers send/recv: "
551 << Timer.total("G0") << std::endl;
552 Timer.reset("G0");
553 Timer.start("G0");
554
555
556 long final_nterms_node = (TG.Node() += Q.size());
557 std::vector<long> final_counts(TG.Cores().size());
558 TG.Cores().gather_n(&final_nterms_node, 1, final_counts.data(), 0);
559 if (TG.Global().root())
560 {
561 qmcplusplus::app_log() << " In construct_distributed_csr_matrix_from_distributed_containers: \n";
562 qmcplusplus::app_log() << " Partitioning of CSR matrix over TG (nnz per node): ";
563 for (int i = 0; i < nnodes_per_TG; i++)
564 qmcplusplus::app_log() << final_counts[i] << " ";
565 qmcplusplus::app_log() << std::endl;
566 }
567
568 TG.node_barrier();
569 return construct_csr_matrix_multiple_input<CSR>(Q, nr, nc, 'N', TG.Node());
570 }
571
572 // assume partition by row
573 template<class CSR, class task_group, typename integer = int>
local_balanced_partition(CSR & M,task_group & TG)574 typename CSR::template matrix_view<integer> local_balanced_partition(CSR& M, task_group& TG)
575 {
576 using std::size_t;
577 using array_ = std::array<size_t, 4>;
578 if (TG.getNCoresPerTG() == 1)
579 {
580 return M[array_{0, M.size(0), 0, M.size(1)}];
581 }
582 else
583 {
584 std::vector<size_t> bounds(TG.getNCoresPerTG() + 1);
585 if (TG.getCoreID() == 0)
586 {
587 std::vector<size_t> indx(M.size(0) + 1);
588 indx[0] = size_t(0);
589 for (size_t i = 0, cnt = 0; i < M.size(0); i++)
590 {
591 cnt += size_t(M.pointers_end()[i] - M.pointers_begin()[i]);
592 indx[i + 1] = cnt;
593 }
594 qmcplusplus::afqmc::balance_partition_ordered_set(indx, bounds);
595 if (TG.Global().root())
596 {
597 qmcplusplus::app_log() << std::endl << "Partition of csr matrix (rows) over cores in local TG: ";
598 for (int i = 0; i <= TG.getNCoresPerTG(); i++)
599 qmcplusplus::app_log() << bounds[i] << " ";
600 qmcplusplus::app_log() << std::endl;
601 qmcplusplus::app_log() << "Number of terms in csr matrix view per core: ";
602 for (int i = 0; i < TG.getNCoresPerTG(); i++)
603 qmcplusplus::app_log() << indx[bounds[i + 1]] - indx[bounds[i]] << " ";
604 qmcplusplus::app_log() << std::endl << std::endl;
605 }
606 }
607 TG.Node().broadcast_n(bounds.begin(), bounds.size());
608 return M[array_{bounds[TG.getLocalTGRank()], bounds[TG.getLocalTGRank() + 1], 0, M.size(1)}];
609 //return M[array_{0,M.size(0),0,M.size(1)}];
610 }
611 }
612
613 /*
614 * Constructs a vector of csr_matrix as a copy from a given csr_matrix but casted to single precision
615 */
616 template<class CSRsp, class CSR>
CSRvector_to_single_precision(std::vector<CSR> const & A)617 std::vector<CSRsp> CSRvector_to_single_precision(std::vector<CSR> const& A)
618 {
619 using Alloc = typename CSRsp::alloc_type;
620 std::vector<CSRsp> B;
621 B.reserve(A.size());
622 for (auto& v : A)
623 B.emplace_back(CSRsp(v, Alloc{v.getAlloc()}));
624 return B;
625 }
626
627 } // namespace shm
628
629 } // namespace csr
630
631 #endif
632