1 /****************************************************************/
2 /* Parallel Combinatorial BLAS Library (for Graph Computations) */
3 /* version 1.6 -------------------------------------------------*/
4 /* date: 6/15/2017 ---------------------------------------------*/
5 /* authors: Ariful Azad, Aydin Buluc --------------------------*/
6 /****************************************************************/
7 /*
8 Copyright (c) 2010-2017, The Regents of the University of California
9
10 Permission is hereby granted, free of charge, to any person obtaining a copy
11 of this software and associated documentation files (the "Software"), to deal
12 in the Software without restriction, including without limitation the rights
13 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 copies of the Software, and to permit persons to whom the Software is
15 furnished to do so, subject to the following conditions:
16
17 The above copyright notice and this permission notice shall be included in
18 all copies or substantial portions of the Software.
19
20 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26 THE SOFTWARE.
27 */
28
29
30
31 #include "SpParMat.h"
32 #include "ParFriends.h"
33 #include "Operations.h"
34 #include "FileHeader.h"
35 extern "C" {
36 #include "mmio.h"
37 }
38 #include <sys/types.h>
39 #include <sys/stat.h>
40
41 #include <mpi.h>
42 #include <fstream>
43 #include <algorithm>
44 #include <set>
45 #include <stdexcept>
46
47 namespace combblas {
48
49 /**
50 * If every processor has a distinct triples file such as {A_0, A_1, A_2,... A_p} for p processors
51 **/
52 template <class IT, class NT, class DER>
SpParMat(std::ifstream & input,MPI_Comm & world)53 SpParMat< IT,NT,DER >::SpParMat (std::ifstream & input, MPI_Comm & world)
54 {
55 assert( (sizeof(IT) >= sizeof(typename DER::LocalIT)) );
56 if(!input.is_open())
57 {
58 perror("Input file doesn't exist\n");
59 exit(-1);
60 }
61 commGrid.reset(new CommGrid(world, 0, 0));
62 input >> (*spSeq);
63 }
64
65 template <class IT, class NT, class DER>
SpParMat(DER * myseq,MPI_Comm & world)66 SpParMat< IT,NT,DER >::SpParMat (DER * myseq, MPI_Comm & world): spSeq(myseq)
67 {
68 assert( (sizeof(IT) >= sizeof(typename DER::LocalIT)) );
69 commGrid.reset(new CommGrid(world, 0, 0));
70 }
71
72 template <class IT, class NT, class DER>
SpParMat(DER * myseq,std::shared_ptr<CommGrid> grid)73 SpParMat< IT,NT,DER >::SpParMat (DER * myseq, std::shared_ptr<CommGrid> grid): spSeq(myseq)
74 {
75 assert( (sizeof(IT) >= sizeof(typename DER::LocalIT)) );
76 commGrid = grid;
77 }
78
79 template <class IT, class NT, class DER>
SpParMat(std::shared_ptr<CommGrid> grid)80 SpParMat< IT,NT,DER >::SpParMat (std::shared_ptr<CommGrid> grid)
81 {
82 assert( (sizeof(IT) >= sizeof(typename DER::LocalIT)) );
83 spSeq = new DER();
84 commGrid = grid;
85 }
86
87 //! Deprecated. Don't call the default constructor
88 template <class IT, class NT, class DER>
SpParMat()89 SpParMat< IT,NT,DER >::SpParMat ()
90 {
91 SpParHelper::Print("COMBBLAS Warning: It is dangerous to create (matrix) objects without specifying the communicator, are you sure you want to create this object in MPI_COMM_WORLD?\n");
92 assert( (sizeof(IT) >= sizeof(typename DER::LocalIT)) );
93 spSeq = new DER();
94 commGrid.reset(new CommGrid(MPI_COMM_WORLD, 0, 0));
95 }
96
97 /**
98 * If there is a single file read by the master process only, use this and then call ReadDistribute()
99 **/
100 template <class IT, class NT, class DER>
SpParMat(MPI_Comm world)101 SpParMat< IT,NT,DER >::SpParMat (MPI_Comm world)
102 {
103
104 assert( (sizeof(IT) >= sizeof(typename DER::LocalIT)) );
105 spSeq = new DER();
106 commGrid.reset(new CommGrid(world, 0, 0));
107 }
108
109 template <class IT, class NT, class DER>
~SpParMat()110 SpParMat< IT,NT,DER >::~SpParMat ()
111 {
112 if(spSeq != NULL) delete spSeq;
113 }
114
115 template <class IT, class NT, class DER>
FreeMemory()116 void SpParMat< IT,NT,DER >::FreeMemory ()
117 {
118 if(spSeq != NULL) delete spSeq;
119 spSeq = NULL;
120 }
121
122
123 /**
124 * Private function to guide Select2 communication and avoid code duplication due to loop ends
125 * @param[int, out] klimits {per column k limit gets updated for the next iteration}
126 * @param[out] converged {items to remove from actcolsmap at next iteration{
127 **/
128 template <class IT, class NT, class DER>
129 template <typename VT, typename GIT> // GIT: global index type of vector
TopKGather(std::vector<NT> & all_medians,std::vector<IT> & nnz_per_col,int & thischunk,int & chunksize,const std::vector<NT> & activemedians,const std::vector<IT> & activennzperc,int itersuntil,std::vector<std::vector<NT>> & localmat,const std::vector<IT> & actcolsmap,std::vector<IT> & klimits,std::vector<IT> & toretain,std::vector<std::vector<std::pair<IT,NT>>> & tmppair,IT coffset,const FullyDistVec<GIT,VT> & rvec) const130 void SpParMat<IT,NT,DER>::TopKGather(std::vector<NT> & all_medians, std::vector<IT> & nnz_per_col, int & thischunk, int & chunksize,
131 const std::vector<NT> & activemedians, const std::vector<IT> & activennzperc, int itersuntil,
132 std::vector< std::vector<NT> > & localmat, const std::vector<IT> & actcolsmap, std::vector<IT> & klimits,
133 std::vector<IT> & toretain, std::vector<std::vector<std::pair<IT,NT>>> & tmppair, IT coffset, const FullyDistVec<GIT,VT> & rvec) const
134 {
135 int rankincol = commGrid->GetRankInProcCol();
136 int colneighs = commGrid->GetGridRows();
137 int nprocs = commGrid->GetSize();
138 std::vector<double> finalWeightedMedians(thischunk, 0.0);
139
140 MPI_Gather(activemedians.data() + itersuntil*chunksize, thischunk, MPIType<NT>(), all_medians.data(), thischunk, MPIType<NT>(), 0, commGrid->GetColWorld());
141 MPI_Gather(activennzperc.data() + itersuntil*chunksize, thischunk, MPIType<IT>(), nnz_per_col.data(), thischunk, MPIType<IT>(), 0, commGrid->GetColWorld());
142
143 if(rankincol == 0)
144 {
145 std::vector<double> columnCounts(thischunk, 0.0);
146 std::vector< std::pair<NT, double> > mediansNweights(colneighs); // (median,weight) pairs [to be reused at each iteration]
147
148 for(int j = 0; j < thischunk; ++j) // for each column
149 {
150 for(int k = 0; k<colneighs; ++k)
151 {
152 size_t fetchindex = k*thischunk+j;
153 columnCounts[j] += static_cast<double>(nnz_per_col[fetchindex]);
154 }
155 for(int k = 0; k<colneighs; ++k)
156 {
157 size_t fetchindex = k*thischunk+j;
158 mediansNweights[k] = std::make_pair(all_medians[fetchindex], static_cast<double>(nnz_per_col[fetchindex]) / columnCounts[j]);
159 }
160 sort(mediansNweights.begin(), mediansNweights.end()); // sort by median
161
162 double sumofweights = 0;
163 int k = 0;
164 while( k<colneighs && sumofweights < 0.5)
165 {
166 sumofweights += mediansNweights[k++].second;
167 }
168 finalWeightedMedians[j] = mediansNweights[k-1].first;
169 }
170 }
171 MPI_Bcast(finalWeightedMedians.data(), thischunk, MPIType<double>(), 0, commGrid->GetColWorld());
172
173 std::vector<IT> larger(thischunk, 0);
174 std::vector<IT> smaller(thischunk, 0);
175 std::vector<IT> equal(thischunk, 0);
176
177 #ifdef THREADED
178 #pragma omp parallel for
179 #endif
180 for(int j = 0; j < thischunk; ++j) // for each active column
181 {
182 size_t fetchindex = actcolsmap[j+itersuntil*chunksize];
183 for(size_t k = 0; k < localmat[fetchindex].size(); ++k)
184 {
185 // count those above/below/equal to the median
186 if(localmat[fetchindex][k] > finalWeightedMedians[j])
187 larger[j]++;
188 else if(localmat[fetchindex][k] < finalWeightedMedians[j])
189 smaller[j]++;
190 else
191 equal[j]++;
192 }
193 }
194 MPI_Allreduce(MPI_IN_PLACE, larger.data(), thischunk, MPIType<IT>(), MPI_SUM, commGrid->GetColWorld());
195 MPI_Allreduce(MPI_IN_PLACE, smaller.data(), thischunk, MPIType<IT>(), MPI_SUM, commGrid->GetColWorld());
196 MPI_Allreduce(MPI_IN_PLACE, equal.data(), thischunk, MPIType<IT>(), MPI_SUM, commGrid->GetColWorld());
197
198 int numThreads = 1; // default case
199 #ifdef THREADED
200 omp_lock_t lock[nprocs]; // a lock per recipient
201 for (int i=0; i<nprocs; i++)
202 omp_init_lock(&(lock[i]));
203 #pragma omp parallel
204 {
205 numThreads = omp_get_num_threads();
206 }
207 #endif
208
209 std::vector < std::vector<IT> > perthread2retain(numThreads);
210
211 #ifdef THREADED
212 #pragma omp parallel for
213 #endif
214 for(int j = 0; j < thischunk; ++j) // for each active column
215 {
216 #ifdef THREADED
217 int myThread = omp_get_thread_num();
218 #else
219 int myThread = 0;
220 #endif
221
222 // both clmapindex and fetchindex are unique for a given j (hence not shared among threads)
223 size_t clmapindex = j+itersuntil*chunksize; // klimits is of the same length as actcolsmap
224 size_t fetchindex = actcolsmap[clmapindex]; // localmat can only be dereferenced using the original indices.
225
226 // these following if/else checks are the same (because klimits/large/equal vectors are mirrored) on every processor along ColWorld
227 if(klimits[clmapindex] <= larger[j]) // the entries larger than Weighted-Median are plentiful, we can discard all the smaller/equal guys
228 {
229 std::vector<NT> survivors;
230 for(size_t k = 0; k < localmat[fetchindex].size(); ++k)
231 {
232 if(localmat[fetchindex][k] > finalWeightedMedians[j]) // keep only the large guys (even equal guys go)
233 survivors.push_back(localmat[fetchindex][k]);
234 }
235 localmat[fetchindex].swap(survivors);
236 perthread2retain[myThread].push_back(clmapindex); // items to retain in actcolsmap
237 }
238 else if (klimits[clmapindex] > larger[j] + equal[j]) // the elements that are either larger or equal-to are surely keepers, no need to reprocess them
239 {
240 std::vector<NT> survivors;
241 for(size_t k = 0; k < localmat[fetchindex].size(); ++k)
242 {
243 if(localmat[fetchindex][k] < finalWeightedMedians[j]) // keep only the small guys (even equal guys go)
244 survivors.push_back(localmat[fetchindex][k]);
245 }
246 localmat[fetchindex].swap(survivors);
247
248 klimits[clmapindex] -= (larger[j] + equal[j]); // update the k limit for this column only
249 perthread2retain[myThread].push_back(clmapindex); // items to retain in actcolsmap
250 }
251 else // larger[j] < klimits[clmapindex] && klimits[clmapindex] <= larger[j] + equal[j]
252 { // we will always have equal[j] > 0 because the weighted median is part of the dataset so it has to be equal to itself.
253 std::vector<NT> survivors;
254 for(size_t k = 0; k < localmat[fetchindex].size(); ++k)
255 {
256 if(localmat[fetchindex][k] >= finalWeightedMedians[j]) // keep the larger and equal to guys (might exceed k-limit but that's fine according to MCL)
257 survivors.push_back(localmat[fetchindex][k]);
258 }
259 localmat[fetchindex].swap(survivors);
260
261 // We found it: the kth largest element in column (coffset + fetchindex) is finalWeightedMedians[j]
262 // But everyone in the same processor column has the information, only one of them should send it
263 IT n_perproc = getlocalcols() / colneighs; // find a typical processor's share
264 int assigned = std::max(static_cast<int>(fetchindex/n_perproc), colneighs-1);
265 if( assigned == rankincol)
266 {
267 IT locid;
268 int owner = rvec.Owner(coffset + fetchindex, locid);
269
270 #ifdef THREADED
271 omp_set_lock(&(lock[owner]));
272 #endif
273 tmppair[owner].emplace_back(std::make_pair(locid, finalWeightedMedians[j]));
274 #ifdef THREADED
275 omp_unset_lock(&(lock[owner]));
276 #endif
277 }
278 } // end_else
279 } // end_for
280 // ------ concatenate toretain "indices" processed by threads ------
281 std::vector<IT> tdisp(numThreads+1);
282 tdisp[0] = 0;
283 for(int i=0; i<numThreads; ++i)
284 {
285 tdisp[i+1] = tdisp[i] + perthread2retain[i].size();
286 }
287 toretain.resize(tdisp[numThreads]);
288
289 #pragma omp parallel for
290 for(int i=0; i< numThreads; i++)
291 {
292 std::copy(perthread2retain[i].data() , perthread2retain[i].data()+ perthread2retain[i].size(), toretain.data() + tdisp[i]);
293 }
294
295 #ifdef THREADED
296 for (int i=0; i<nprocs; i++) // destroy the locks
297 omp_destroy_lock(&(lock[i]));
298 #endif
299 }
300
301
302 //! identify the k-th maximum element in each column of a matrix
303 //! if the number of nonzeros in a column is less than k, return the numeric_limits<NT>::min()
304 //! This is an efficient implementation of the Saukas/Song algorithm
305 //! http://www.ime.usp.br/~einar/select/INDEX.HTM
306 //! Preferred for large k values
307 template <class IT, class NT, class DER>
308 template <typename VT, typename GIT> // GIT: global index type of vector
Kselect2(FullyDistVec<GIT,VT> & rvec,IT k_limit) const309 bool SpParMat<IT,NT,DER>::Kselect2(FullyDistVec<GIT,VT> & rvec, IT k_limit) const
310 {
311 if(*rvec.commGrid != *commGrid)
312 {
313 SpParHelper::Print("Grids are not comparable, SpParMat::Kselect() fails!", commGrid->GetWorld());
314 MPI_Abort(MPI_COMM_WORLD,GRIDMISMATCH);
315 }
316
317
318 int rankincol = commGrid->GetRankInProcCol();
319 int rankinrow = commGrid->GetRankInProcRow();
320 int rowneighs = commGrid->GetGridCols(); // get # of processors on the row
321 int colneighs = commGrid->GetGridRows();
322 int myrank = commGrid->GetRank();
323 int nprocs = commGrid->GetSize();
324
325 FullyDistVec<GIT,IT> colcnt(commGrid);
326 Reduce(colcnt, Column, std::plus<IT>(), (IT) 0, [](NT i){ return (IT) 1;});
327
328 // <begin> Gather vector along columns (Logic copied from DimApply)
329 int xsize = (int) colcnt.LocArrSize();
330 int trxsize = 0;
331 int diagneigh = colcnt.commGrid->GetComplementRank();
332 MPI_Status status;
333 MPI_Sendrecv(&xsize, 1, MPI_INT, diagneigh, TRX, &trxsize, 1, MPI_INT, diagneigh, TRX, commGrid->GetWorld(), &status);
334
335 IT * trxnums = new IT[trxsize];
336 MPI_Sendrecv(const_cast<IT*>(SpHelper::p2a(colcnt.arr)), xsize, MPIType<IT>(), diagneigh, TRX, trxnums, trxsize, MPIType<IT>(), diagneigh, TRX, commGrid->GetWorld(), &status);
337
338 int * colsize = new int[colneighs];
339 colsize[rankincol] = trxsize;
340 MPI_Allgather(MPI_IN_PLACE, 1, MPI_INT, colsize, 1, MPI_INT, commGrid->GetColWorld());
341 int * dpls = new int[colneighs](); // displacements (zero initialized pid)
342 std::partial_sum(colsize, colsize+colneighs-1, dpls+1);
343 int accsize = std::accumulate(colsize, colsize+colneighs, 0);
344 std::vector<IT> percsum(accsize); // per column sum of the number of entries
345
346 MPI_Allgatherv(trxnums, trxsize, MPIType<IT>(), percsum.data(), colsize, dpls, MPIType<VT>(), commGrid->GetColWorld());
347 DeleteAll(trxnums,colsize, dpls);
348 // <end> Gather vector along columns
349
350 IT locm = getlocalcols(); // length (number of columns) assigned to this processor (and processor column)
351 std::vector< std::vector<NT> > localmat(locm); // some sort of minimal local copy of matrix
352
353 #ifdef COMBBLAS_DEBUG
354 if(accsize != locm) std::cout << "Gather vector along columns logic is wrong" << std::endl;
355 #endif
356
357 for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit) // iterate over columns
358 {
359 if(percsum[colit.colid()] >= k_limit) // don't make a copy of an inactive column
360 {
361 localmat[colit.colid()].reserve(colit.nnz());
362 for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit < spSeq->endnz(colit); ++nzit)
363 {
364 localmat[colit.colid()].push_back(nzit.value());
365 }
366 }
367 }
368
369 int64_t activecols = std::count_if(percsum.begin(), percsum.end(), [k_limit](IT i){ return i >= k_limit;});
370 int64_t activennz = std::accumulate(percsum.begin(), percsum.end(), (int64_t) 0);
371
372 int64_t totactcols, totactnnzs;
373 MPI_Allreduce(&activecols, &totactcols, 1, MPIType<int64_t>(), MPI_SUM, commGrid->GetRowWorld());
374 if(myrank == 0) std::cout << "Number of initial active columns are " << totactcols << std::endl;
375
376 MPI_Allreduce(&activennz, &totactnnzs, 1, MPIType<int64_t>(), MPI_SUM, commGrid->GetRowWorld());
377 if(myrank == 0) std::cout << "Number of initial nonzeros are " << totactnnzs << std::endl;
378
379 #ifdef COMBBLAS_DEBUG
380 IT glactcols = colcnt.Count([k_limit](IT i){ return i >= k_limit;});
381 if(myrank == 0) std::cout << "Number of initial active columns are " << glactcols << std::endl;
382 if(glactcols != totactcols) if(myrank == 0) std::cout << "Wrong number of active columns are computed" << std::endl;
383 #endif
384
385
386 rvec = FullyDistVec<GIT,VT> ( rvec.getcommgrid(), getncol(), std::numeric_limits<NT>::min()); // set length of rvec correctly
387
388 #ifdef COMBBLAS_DEBUG
389 PrintInfo();
390 rvec.PrintInfo("rvector");
391 #endif
392
393 if(totactcols == 0)
394 {
395 std::ostringstream ss;
396 ss << "TopK: k_limit (" << k_limit <<")" << " >= maxNnzInColumn. Returning the result of Reduce(Column, minimum<NT>()) instead..." << std::endl;
397 SpParHelper::Print(ss.str());
398 return false; // no prune needed
399 }
400
401
402 std::vector<IT> actcolsmap(activecols); // the map that gives the original index of that active column (this map will shrink over iterations)
403 for (IT i=0, j=0; i< locm; ++i) {
404 if(percsum[i] >= k_limit)
405 actcolsmap[j++] = i;
406 }
407
408 std::vector<NT> all_medians;
409 std::vector<IT> nnz_per_col;
410 std::vector<IT> klimits(activecols, k_limit); // is distributed management of this vector needed?
411 int activecols_lowerbound = 10*colneighs;
412
413
414 IT * locncols = new IT[rowneighs];
415 locncols[rankinrow] = locm;
416 MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(),locncols, 1, MPIType<IT>(), commGrid->GetRowWorld());
417 IT coffset = std::accumulate(locncols, locncols+rankinrow, static_cast<IT>(0));
418 delete [] locncols;
419
420 /* Create/allocate variables for vector assignment */
421 MPI_Datatype MPI_pair;
422 MPI_Type_contiguous(sizeof(std::pair<IT,NT>), MPI_CHAR, &MPI_pair);
423 MPI_Type_commit(&MPI_pair);
424
425 int * sendcnt = new int[nprocs];
426 int * recvcnt = new int[nprocs];
427 int * sdispls = new int[nprocs]();
428 int * rdispls = new int[nprocs]();
429
430 while(totactcols > 0)
431 {
432 int chunksize, iterations, lastchunk;
433 if(activecols > activecols_lowerbound)
434 {
435 // two reasons for chunking:
436 // (1) keep memory limited to activecols (<= n/sqrt(p))
437 // (2) avoid overflow in sentcount
438 chunksize = static_cast<int>(activecols/colneighs); // invariant chunksize >= 10 (by activecols_lowerbound)
439 iterations = std::max(static_cast<int>(activecols/chunksize), 1);
440 lastchunk = activecols - (iterations-1)*chunksize; // lastchunk >= chunksize by construction
441 }
442 else
443 {
444 chunksize = activecols;
445 iterations = 1;
446 lastchunk = activecols;
447 }
448 std::vector<NT> activemedians(activecols); // one per "active" column
449 std::vector<IT> activennzperc(activecols);
450
451 #ifdef THREADED
452 #pragma omp parallel for
453 #endif
454 for(IT i=0; i< activecols; ++i) // recompute the medians and nnzperc
455 {
456 size_t orgindex = actcolsmap[i]; // assert: no two threads will share the same "orgindex"
457 if(localmat[orgindex].empty())
458 {
459 activemedians[i] = (NT) 0;
460 activennzperc[i] = 0;
461 }
462 else
463 {
464 // this actually *sorts* increasing but doesn't matter as long we solely care about the median as opposed to a general nth element
465 auto entriesincol(localmat[orgindex]); // create a temporary vector as nth_element modifies the vector
466 std::nth_element(entriesincol.begin(), entriesincol.begin() + entriesincol.size()/2, entriesincol.end());
467 activemedians[i] = entriesincol[entriesincol.size()/2];
468 activennzperc[i] = entriesincol.size();
469 }
470 }
471
472 percsum.resize(activecols, 0);
473 MPI_Allreduce(activennzperc.data(), percsum.data(), activecols, MPIType<IT>(), MPI_SUM, commGrid->GetColWorld());
474 activennz = std::accumulate(percsum.begin(), percsum.end(), (int64_t) 0);
475
476 #ifdef COMBBLAS_DEBUG
477 MPI_Allreduce(&activennz, &totactnnzs, 1, MPIType<int64_t>(), MPI_SUM, commGrid->GetRowWorld());
478 if(myrank == 0) std::cout << "Number of active nonzeros are " << totactnnzs << std::endl;
479 #endif
480
481 std::vector<IT> toretain;
482 if(rankincol == 0)
483 {
484 all_medians.resize(lastchunk*colneighs);
485 nnz_per_col.resize(lastchunk*colneighs);
486 }
487 std::vector< std::vector< std::pair<IT,NT> > > tmppair(nprocs);
488 for(int i=0; i< iterations-1; ++i) // this loop should not be parallelized if we want to keep storage small
489 {
490 TopKGather(all_medians, nnz_per_col, chunksize, chunksize, activemedians, activennzperc, i, localmat, actcolsmap, klimits, toretain, tmppair, coffset, rvec);
491 }
492 TopKGather(all_medians, nnz_per_col, lastchunk, chunksize, activemedians, activennzperc, iterations-1, localmat, actcolsmap, klimits, toretain, tmppair, coffset, rvec);
493
494 /* Set the newly found vector entries */
495 IT totsend = 0;
496 for(IT i=0; i<nprocs; ++i)
497 {
498 sendcnt[i] = tmppair[i].size();
499 totsend += tmppair[i].size();
500 }
501
502 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, commGrid->GetWorld());
503
504 std::partial_sum(sendcnt, sendcnt+nprocs-1, sdispls+1);
505 std::partial_sum(recvcnt, recvcnt+nprocs-1, rdispls+1);
506 IT totrecv = std::accumulate(recvcnt,recvcnt+nprocs, static_cast<IT>(0));
507 assert((totsend < std::numeric_limits<int>::max()));
508 assert((totrecv < std::numeric_limits<int>::max()));
509
510 std::pair<IT,NT> * sendpair = new std::pair<IT,NT>[totsend];
511 for(int i=0; i<nprocs; ++i)
512 {
513 std::copy(tmppair[i].begin(), tmppair[i].end(), sendpair+sdispls[i]);
514 std::vector< std::pair<IT,NT> >().swap(tmppair[i]); // clear memory
515 }
516 std::vector< std::pair<IT,NT> > recvpair(totrecv);
517 MPI_Alltoallv(sendpair, sendcnt, sdispls, MPI_pair, recvpair.data(), recvcnt, rdispls, MPI_pair, commGrid->GetWorld());
518 delete [] sendpair;
519
520 IT updated = 0;
521 for(auto & update : recvpair ) // Now, write these to rvec
522 {
523 updated++;
524 rvec.arr[update.first] = update.second;
525 }
526 #ifdef COMBBLAS_DEBUG
527 MPI_Allreduce(MPI_IN_PLACE, &updated, 1, MPIType<IT>(), MPI_SUM, commGrid->GetWorld());
528 if(myrank == 0) std::cout << "Total vector entries updated " << updated << std::endl;
529 #endif
530
531 /* End of setting up the newly found vector entries */
532
533 std::vector<IT> newactivecols(toretain.size());
534 std::vector<IT> newklimits(toretain.size());
535 IT newindex = 0;
536 for(auto & retind : toretain )
537 {
538 newactivecols[newindex] = actcolsmap[retind];
539 newklimits[newindex++] = klimits[retind];
540 }
541 actcolsmap.swap(newactivecols);
542 klimits.swap(newklimits);
543 activecols = actcolsmap.size();
544
545 MPI_Allreduce(&activecols, &totactcols, 1, MPIType<int64_t>(), MPI_SUM, commGrid->GetRowWorld());
546 #ifdef COMBBLAS_DEBUG
547 if(myrank == 0) std::cout << "Number of active columns are " << totactcols << std::endl;
548 #endif
549 }
550 DeleteAll(sendcnt, recvcnt, sdispls, rdispls);
551 MPI_Type_free(&MPI_pair);
552
553 #ifdef COMBBLAS_DEBUG
554 if(myrank == 0) std::cout << "Exiting kselect2"<< std::endl;
555 #endif
556 return true; // prune needed
557 }
558
559
560
561 template <class IT, class NT, class DER>
Dump(std::string filename) const562 void SpParMat< IT,NT,DER >::Dump(std::string filename) const
563 {
564 MPI_Comm World = commGrid->GetWorld();
565 int rank = commGrid->GetRank();
566 int nprocs = commGrid->GetSize();
567
568 MPI_File thefile;
569 char * _fn = const_cast<char*>(filename.c_str());
570 MPI_File_open(World, _fn, MPI_MODE_CREATE | MPI_MODE_WRONLY, MPI_INFO_NULL, &thefile);
571
572 int rankinrow = commGrid->GetRankInProcRow();
573 int rankincol = commGrid->GetRankInProcCol();
574 int rowneighs = commGrid->GetGridCols(); // get # of processors on the row
575 int colneighs = commGrid->GetGridRows();
576
577 IT * colcnts = new IT[rowneighs];
578 IT * rowcnts = new IT[colneighs];
579 rowcnts[rankincol] = getlocalrows();
580 colcnts[rankinrow] = getlocalcols();
581
582 MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(), colcnts, 1, MPIType<IT>(), commGrid->GetRowWorld());
583 IT coloffset = std::accumulate(colcnts, colcnts+rankinrow, static_cast<IT>(0));
584
585 MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(), rowcnts, 1, MPIType<IT>(), commGrid->GetColWorld());
586 IT rowoffset = std::accumulate(rowcnts, rowcnts+rankincol, static_cast<IT>(0));
587 DeleteAll(colcnts, rowcnts);
588
589 IT * prelens = new IT[nprocs];
590 prelens[rank] = 2*getlocalnnz();
591 MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(), prelens, 1, MPIType<IT>(), commGrid->GetWorld());
592 IT lengthuntil = std::accumulate(prelens, prelens+rank, static_cast<IT>(0));
593
594 // The disp displacement argument specifies the position
595 // (absolute offset in bytes from the beginning of the file)
596 MPI_Offset disp = lengthuntil * sizeof(uint32_t);
597 char native[] = "native";
598 MPI_File_set_view(thefile, disp, MPI_UNSIGNED, MPI_UNSIGNED, native, MPI_INFO_NULL); // AL: the second-to-last argument is a non-const char* (looks like poor MPI standardization, the C++ bindings list it as const), C++ string literals MUST be const (especially in c++11).
599 uint32_t * gen_edges = new uint32_t[prelens[rank]];
600
601 IT k = 0;
602 for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit) // iterate over columns
603 {
604 for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit)
605 {
606 gen_edges[k++] = (uint32_t) (nzit.rowid() + rowoffset);
607 gen_edges[k++] = (uint32_t) (colit.colid() + coloffset);
608 }
609 }
610 assert(k == prelens[rank]);
611 MPI_File_write(thefile, gen_edges, prelens[rank], MPI_UNSIGNED, NULL);
612 MPI_File_close(&thefile);
613
614 delete [] prelens;
615 delete [] gen_edges;
616 }
617
618 template <class IT, class NT, class DER>
SpParMat(const SpParMat<IT,NT,DER> & rhs)619 SpParMat< IT,NT,DER >::SpParMat (const SpParMat< IT,NT,DER > & rhs)
620 {
621 if(rhs.spSeq != NULL)
622 spSeq = new DER(*(rhs.spSeq)); // Deep copy of local block
623
624 commGrid = rhs.commGrid;
625 }
626
627 template <class IT, class NT, class DER>
operator =(const SpParMat<IT,NT,DER> & rhs)628 SpParMat< IT,NT,DER > & SpParMat< IT,NT,DER >::operator=(const SpParMat< IT,NT,DER > & rhs)
629 {
630 if(this != &rhs)
631 {
632 //! Check agains NULL is probably unneccessary, delete won't fail on NULL
633 //! But useful in the presence of a user defined "operator delete" which fails to check NULL
634 if(spSeq != NULL) delete spSeq;
635 if(rhs.spSeq != NULL)
636 spSeq = new DER(*(rhs.spSeq)); // Deep copy of local block
637
638 commGrid = rhs.commGrid;
639 }
640 return *this;
641 }
642
643 template <class IT, class NT, class DER>
operator +=(const SpParMat<IT,NT,DER> & rhs)644 SpParMat< IT,NT,DER > & SpParMat< IT,NT,DER >::operator+=(const SpParMat< IT,NT,DER > & rhs)
645 {
646 if(this != &rhs)
647 {
648 if(*commGrid == *rhs.commGrid)
649 {
650 (*spSeq) += (*(rhs.spSeq));
651 }
652 else
653 {
654 std::cout << "Grids are not comparable for parallel addition (A+B)" << std::endl;
655 }
656 }
657 else
658 {
659 std::cout<< "Missing feature (A+A): Use multiply with 2 instead !"<<std::endl;
660 }
661 return *this;
662 }
663
664 template <class IT, class NT, class DER>
LoadImbalance() const665 float SpParMat< IT,NT,DER >::LoadImbalance() const
666 {
667 IT totnnz = getnnz(); // collective call
668 IT maxnnz = 0;
669 IT localnnz = (IT) spSeq->getnnz();
670 MPI_Allreduce( &localnnz, &maxnnz, 1, MPIType<IT>(), MPI_MAX, commGrid->GetWorld());
671 if(totnnz == 0) return 1;
672 return static_cast<float>((commGrid->GetSize() * maxnnz)) / static_cast<float>(totnnz);
673 }
674
675 template <class IT, class NT, class DER>
676 IT SpParMat< IT,NT,DER >::getnnz() const
677 {
678 IT totalnnz = 0;
679 IT localnnz = spSeq->getnnz();
680 MPI_Allreduce( &localnnz, &totalnnz, 1, MPIType<IT>(), MPI_SUM, commGrid->GetWorld());
681 return totalnnz;
682 }
683
684 template <class IT, class NT, class DER>
685 IT SpParMat< IT,NT,DER >::getnrow() const
686 {
687 IT totalrows = 0;
688 IT localrows = spSeq->getnrow();
689 MPI_Allreduce( &localrows, &totalrows, 1, MPIType<IT>(), MPI_SUM, commGrid->GetColWorld());
690 return totalrows;
691 }
692
693 template <class IT, class NT, class DER>
694 IT SpParMat< IT,NT,DER >::getncol() const
695 {
696 IT totalcols = 0;
697 IT localcols = spSeq->getncol();
698 MPI_Allreduce( &localcols, &totalcols, 1, MPIType<IT>(), MPI_SUM, commGrid->GetRowWorld());
699 return totalcols;
700 }
701
702 template <class IT, class NT, class DER>
703 template <typename _BinaryOperation>
DimApply(Dim dim,const FullyDistVec<IT,NT> & x,_BinaryOperation __binary_op)704 void SpParMat<IT,NT,DER>::DimApply(Dim dim, const FullyDistVec<IT, NT>& x, _BinaryOperation __binary_op)
705 {
706
707 if(!(*commGrid == *(x.commGrid)))
708 {
709 std::cout << "Grids are not comparable for SpParMat::DimApply" << std::endl;
710 MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
711 }
712
713 MPI_Comm World = x.commGrid->GetWorld();
714 MPI_Comm ColWorld = x.commGrid->GetColWorld();
715 MPI_Comm RowWorld = x.commGrid->GetRowWorld();
716 switch(dim)
717 {
718 case Column: // scale each column
719 {
720 int xsize = (int) x.LocArrSize();
721 int trxsize = 0;
722 int diagneigh = x.commGrid->GetComplementRank();
723 MPI_Status status;
724 MPI_Sendrecv(&xsize, 1, MPI_INT, diagneigh, TRX, &trxsize, 1, MPI_INT, diagneigh, TRX, World, &status);
725
726 NT * trxnums = new NT[trxsize];
727 MPI_Sendrecv(const_cast<NT*>(SpHelper::p2a(x.arr)), xsize, MPIType<NT>(), diagneigh, TRX, trxnums, trxsize, MPIType<NT>(), diagneigh, TRX, World, &status);
728
729 int colneighs, colrank;
730 MPI_Comm_size(ColWorld, &colneighs);
731 MPI_Comm_rank(ColWorld, &colrank);
732 int * colsize = new int[colneighs];
733 colsize[colrank] = trxsize;
734
735 MPI_Allgather(MPI_IN_PLACE, 1, MPI_INT, colsize, 1, MPI_INT, ColWorld);
736 int * dpls = new int[colneighs](); // displacements (zero initialized pid)
737 std::partial_sum(colsize, colsize+colneighs-1, dpls+1);
738 int accsize = std::accumulate(colsize, colsize+colneighs, 0);
739 NT * scaler = new NT[accsize];
740
741 MPI_Allgatherv(trxnums, trxsize, MPIType<NT>(), scaler, colsize, dpls, MPIType<NT>(), ColWorld);
742 DeleteAll(trxnums,colsize, dpls);
743
744 for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit) // iterate over columns
745 {
746 for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit)
747 {
748 nzit.value() = __binary_op(nzit.value(), scaler[colit.colid()]);
749 }
750 }
751 delete [] scaler;
752 break;
753 }
754 case Row:
755 {
756 int xsize = (int) x.LocArrSize();
757 int rowneighs, rowrank;
758 MPI_Comm_size(RowWorld, &rowneighs);
759 MPI_Comm_rank(RowWorld, &rowrank);
760 int * rowsize = new int[rowneighs];
761 rowsize[rowrank] = xsize;
762 MPI_Allgather(MPI_IN_PLACE, 1, MPI_INT, rowsize, 1, MPI_INT, RowWorld);
763 int * dpls = new int[rowneighs](); // displacements (zero initialized pid)
764 std::partial_sum(rowsize, rowsize+rowneighs-1, dpls+1);
765 int accsize = std::accumulate(rowsize, rowsize+rowneighs, 0);
766 NT * scaler = new NT[accsize];
767
768 MPI_Allgatherv(const_cast<NT*>(SpHelper::p2a(x.arr)), xsize, MPIType<NT>(), scaler, rowsize, dpls, MPIType<NT>(), RowWorld);
769 DeleteAll(rowsize, dpls);
770
771 for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit)
772 {
773 for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit)
774 {
775 nzit.value() = __binary_op(nzit.value(), scaler[nzit.rowid()]);
776 }
777 }
778 delete [] scaler;
779 break;
780 }
781 default:
782 {
783 std::cout << "Unknown scaling dimension, returning..." << std::endl;
784 break;
785 }
786 }
787 }
788
789 template <class IT, class NT, class DER>
790 template <typename _BinaryOperation, typename _UnaryOperation >
Reduce(Dim dim,_BinaryOperation __binary_op,NT id,_UnaryOperation __unary_op) const791 FullyDistVec<IT,NT> SpParMat<IT,NT,DER>::Reduce(Dim dim, _BinaryOperation __binary_op, NT id, _UnaryOperation __unary_op) const
792 {
793 IT length;
794 switch(dim)
795 {
796 case Column:
797 {
798 length = getncol();
799 break;
800 }
801 case Row:
802 {
803 length = getnrow();
804 break;
805 }
806 default:
807 {
808 std::cout << "Unknown reduction dimension, returning empty vector" << std::endl;
809 break;
810 }
811 }
812 FullyDistVec<IT,NT> parvec(commGrid, length, id);
813 Reduce(parvec, dim, __binary_op, id, __unary_op);
814 return parvec;
815 }
816
817 template <class IT, class NT, class DER>
818 template <typename _BinaryOperation>
Reduce(Dim dim,_BinaryOperation __binary_op,NT id) const819 FullyDistVec<IT,NT> SpParMat<IT,NT,DER>::Reduce(Dim dim, _BinaryOperation __binary_op, NT id) const
820 {
821 IT length;
822 switch(dim)
823 {
824 case Column:
825 {
826 length = getncol();
827 break;
828 }
829 case Row:
830 {
831 length = getnrow();
832 break;
833 }
834 default:
835 {
836 std::cout << "Unknown reduction dimension, returning empty vector" << std::endl;
837 break;
838 }
839 }
840 FullyDistVec<IT,NT> parvec(commGrid, length, id);
841 Reduce(parvec, dim, __binary_op, id, myidentity<NT>()); // myidentity<NT>() is a no-op function
842 return parvec;
843 }
844
845
846 template <class IT, class NT, class DER>
847 template <typename VT, typename GIT, typename _BinaryOperation>
Reduce(FullyDistVec<GIT,VT> & rvec,Dim dim,_BinaryOperation __binary_op,VT id) const848 void SpParMat<IT,NT,DER>::Reduce(FullyDistVec<GIT,VT> & rvec, Dim dim, _BinaryOperation __binary_op, VT id) const
849 {
850 Reduce(rvec, dim, __binary_op, id, myidentity<NT>() );
851 }
852
853
854 template <class IT, class NT, class DER>
855 template <typename VT, typename GIT, typename _BinaryOperation, typename _UnaryOperation> // GIT: global index type of vector
Reduce(FullyDistVec<GIT,VT> & rvec,Dim dim,_BinaryOperation __binary_op,VT id,_UnaryOperation __unary_op) const856 void SpParMat<IT,NT,DER>::Reduce(FullyDistVec<GIT,VT> & rvec, Dim dim, _BinaryOperation __binary_op, VT id, _UnaryOperation __unary_op) const
857 {
858 Reduce(rvec, dim, __binary_op, id, __unary_op, MPIOp<_BinaryOperation, VT>::op() );
859 }
860
861
862 template <class IT, class NT, class DER>
863 template <typename VT, typename GIT, typename _BinaryOperation, typename _UnaryOperation> // GIT: global index type of vector
Reduce(FullyDistVec<GIT,VT> & rvec,Dim dim,_BinaryOperation __binary_op,VT id,_UnaryOperation __unary_op,MPI_Op mympiop) const864 void SpParMat<IT,NT,DER>::Reduce(FullyDistVec<GIT,VT> & rvec, Dim dim, _BinaryOperation __binary_op, VT id, _UnaryOperation __unary_op, MPI_Op mympiop) const
865 {
866 if(*rvec.commGrid != *commGrid)
867 {
868 SpParHelper::Print("Grids are not comparable, SpParMat::Reduce() fails!", commGrid->GetWorld());
869 MPI_Abort(MPI_COMM_WORLD,GRIDMISMATCH);
870 }
871 switch(dim)
872 {
873 case Column: // pack along the columns, result is a vector of size n
874 {
875 // We can't use rvec's distribution (rows first, columns later) here
876 // because the ownership model of the vector has the order P(0,0), P(0,1),...
877 // column reduction will first generate vector distribution in P(0,0), P(1,0),... order.
878
879 IT n_thiscol = getlocalcols(); // length assigned to this processor column
880 int colneighs = commGrid->GetGridRows(); // including oneself
881 int colrank = commGrid->GetRankInProcCol();
882
883 GIT * loclens = new GIT[colneighs];
884 GIT * lensums = new GIT[colneighs+1](); // begin/end points of local lengths
885
886 GIT n_perproc = n_thiscol / colneighs; // length on a typical processor
887 if(colrank == colneighs-1)
888 loclens[colrank] = n_thiscol - (n_perproc*colrank);
889 else
890 loclens[colrank] = n_perproc;
891
892 MPI_Allgather(MPI_IN_PLACE, 0, MPIType<GIT>(), loclens, 1, MPIType<GIT>(), commGrid->GetColWorld());
893 std::partial_sum(loclens, loclens+colneighs, lensums+1); // loclens and lensums are different, but both would fit in 32-bits
894
895 std::vector<VT> trarr;
896 typename DER::SpColIter colit = spSeq->begcol();
897 for(int i=0; i< colneighs; ++i)
898 {
899 VT * sendbuf = new VT[loclens[i]];
900 std::fill(sendbuf, sendbuf+loclens[i], id); // fill with identity
901
902 for(; colit != spSeq->endcol() && colit.colid() < lensums[i+1]; ++colit) // iterate over a portion of columns
903 {
904 for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit) // all nonzeros in this column
905 {
906 sendbuf[colit.colid()-lensums[i]] = __binary_op(static_cast<VT>(__unary_op(nzit.value())), sendbuf[colit.colid()-lensums[i]]);
907 }
908 }
909
910 VT * recvbuf = NULL;
911 if(colrank == i)
912 {
913 trarr.resize(loclens[i]);
914 recvbuf = SpHelper::p2a(trarr);
915 }
916 MPI_Reduce(sendbuf, recvbuf, loclens[i], MPIType<VT>(), mympiop, i, commGrid->GetColWorld()); // root = i
917 delete [] sendbuf;
918 }
919 DeleteAll(loclens, lensums);
920
921 GIT reallen; // Now we have to transpose the vector
922 GIT trlen = trarr.size();
923 int diagneigh = commGrid->GetComplementRank();
924 MPI_Status status;
925 MPI_Sendrecv(&trlen, 1, MPIType<IT>(), diagneigh, TRNNZ, &reallen, 1, MPIType<IT>(), diagneigh, TRNNZ, commGrid->GetWorld(), &status);
926
927 rvec.arr.resize(reallen);
928 MPI_Sendrecv(SpHelper::p2a(trarr), trlen, MPIType<VT>(), diagneigh, TRX, SpHelper::p2a(rvec.arr), reallen, MPIType<VT>(), diagneigh, TRX, commGrid->GetWorld(), &status);
929 rvec.glen = getncol(); // ABAB: Put a sanity check here
930 break;
931
932 }
933 case Row: // pack along the rows, result is a vector of size m
934 {
935 rvec.glen = getnrow();
936 int rowneighs = commGrid->GetGridCols();
937 int rowrank = commGrid->GetRankInProcRow();
938 GIT * loclens = new GIT[rowneighs];
939 GIT * lensums = new GIT[rowneighs+1](); // begin/end points of local lengths
940 loclens[rowrank] = rvec.MyLocLength();
941 MPI_Allgather(MPI_IN_PLACE, 0, MPIType<GIT>(), loclens, 1, MPIType<GIT>(), commGrid->GetRowWorld());
942 std::partial_sum(loclens, loclens+rowneighs, lensums+1);
943 try
944 {
945 rvec.arr.resize(loclens[rowrank], id);
946
947 // keeping track of all nonzero iterators within columns at once is unscalable w.r.t. memory (due to sqrt(p) scaling)
948 // thus we'll do batches of column as opposed to all columns at once. 5 million columns take 80MB (two pointers per column)
949 #define MAXCOLUMNBATCH 5 * 1024 * 1024
950 typename DER::SpColIter begfinger = spSeq->begcol(); // beginning finger to columns
951
952 // Each processor on the same processor row should execute the SAME number of reduce calls
953 int numreducecalls = (int) ceil(static_cast<float>(spSeq->getnzc()) / static_cast<float>(MAXCOLUMNBATCH));
954 int maxreducecalls;
955 MPI_Allreduce( &numreducecalls, &maxreducecalls, 1, MPI_INT, MPI_MAX, commGrid->GetRowWorld());
956
957 for(int k=0; k< maxreducecalls; ++k)
958 {
959 std::vector<typename DER::SpColIter::NzIter> nziters;
960 typename DER::SpColIter curfinger = begfinger;
961 for(; curfinger != spSeq->endcol() && nziters.size() < MAXCOLUMNBATCH ; ++curfinger)
962 {
963 nziters.push_back(spSeq->begnz(curfinger));
964 }
965 for(int i=0; i< rowneighs; ++i) // step by step to save memory
966 {
967 VT * sendbuf = new VT[loclens[i]];
968 std::fill(sendbuf, sendbuf+loclens[i], id); // fill with identity
969
970 typename DER::SpColIter colit = begfinger;
971 IT colcnt = 0; // "processed column" counter
972 for(; colit != curfinger; ++colit, ++colcnt) // iterate over this batch of columns until curfinger
973 {
974 typename DER::SpColIter::NzIter nzit = nziters[colcnt];
975 for(; nzit != spSeq->endnz(colit) && nzit.rowid() < lensums[i+1]; ++nzit) // a portion of nonzeros in this column
976 {
977 sendbuf[nzit.rowid()-lensums[i]] = __binary_op(static_cast<VT>(__unary_op(nzit.value())), sendbuf[nzit.rowid()-lensums[i]]);
978 }
979 nziters[colcnt] = nzit; // set the new finger
980 }
981
982 VT * recvbuf = NULL;
983 if(rowrank == i)
984 {
985 for(int j=0; j< loclens[i]; ++j)
986 {
987 sendbuf[j] = __binary_op(rvec.arr[j], sendbuf[j]); // rvec.arr will be overriden with MPI_Reduce, save its contents
988 }
989 recvbuf = SpHelper::p2a(rvec.arr);
990 }
991 MPI_Reduce(sendbuf, recvbuf, loclens[i], MPIType<VT>(), mympiop, i, commGrid->GetRowWorld()); // root = i
992 delete [] sendbuf;
993 }
994 begfinger = curfinger; // set the next begfilter
995 }
996 DeleteAll(loclens, lensums);
997 }
998 catch (std::length_error& le)
999 {
1000 std::cerr << "Length error: " << le.what() << std::endl;
1001 }
1002 break;
1003 }
1004 default:
1005 {
1006 std::cout << "Unknown reduction dimension, returning empty vector" << std::endl;
1007 break;
1008 }
1009 }
1010 }
1011
1012 #ifndef KSELECTLIMIT
1013 #define KSELECTLIMIT 10000
1014 #endif
1015
1016 //! Kselect wrapper for a select columns of the matrix
1017 //! Indices of the input sparse vectors kth denote the queried columns of the matrix
1018 //! Upon return, values of kth stores the kth entries of the queried columns
1019 //! Returns true if Kselect algorithm is invoked for at least one column
1020 //! Otherwise, returns false
1021 template <class IT, class NT, class DER>
1022 template <typename VT, typename GIT>
Kselect(FullyDistSpVec<GIT,VT> & kth,IT k_limit,int kselectVersion) const1023 bool SpParMat<IT,NT,DER>::Kselect(FullyDistSpVec<GIT,VT> & kth, IT k_limit, int kselectVersion) const
1024 {
1025 #ifdef COMBBLAS_DEBUG
1026 FullyDistVec<GIT,VT> test1(kth.getcommgrid());
1027 FullyDistVec<GIT,VT> test2(kth.getcommgrid());
1028 Kselect1(test1, k_limit, myidentity<NT>());
1029 Kselect2(test2, k_limit);
1030 if(test1 == test2)
1031 SpParHelper::Print("Kselect1 and Kselect2 producing same results\n");
1032 else
1033 {
1034 SpParHelper::Print("WARNING: Kselect1 and Kselect2 producing DIFFERENT results\n");
1035 test1.PrintToFile("test1");
1036 test2.PrintToFile("test2");
1037 }
1038 #endif
1039
1040 if(kselectVersion==1 || k_limit < KSELECTLIMIT)
1041 {
1042 return Kselect1(kth, k_limit, myidentity<NT>());
1043 }
1044 else
1045 {
1046 FullyDistVec<GIT,VT> kthAll ( getcommgrid());
1047 bool ret = Kselect2(kthAll, k_limit);
1048 FullyDistSpVec<GIT,VT> temp = EWiseApply<VT>(kth, kthAll,
1049 [](VT spval, VT dval){return dval;},
1050 [](VT spval, VT dval){return true;},
1051 false, NT());
1052 kth = temp;
1053 return ret;
1054 }
1055 }
1056
1057
1058 //! Returns true if Kselect algorithm is invoked for at least one column
1059 //! Otherwise, returns false
1060 //! if false, rvec contains either the minimum entry in each column or zero
1061 template <class IT, class NT, class DER>
1062 template <typename VT, typename GIT>
Kselect(FullyDistVec<GIT,VT> & rvec,IT k_limit,int kselectVersion) const1063 bool SpParMat<IT,NT,DER>::Kselect(FullyDistVec<GIT,VT> & rvec, IT k_limit, int kselectVersion) const
1064 {
1065 #ifdef COMBBLAS_DEBUG
1066 FullyDistVec<GIT,VT> test1(rvec.getcommgrid());
1067 FullyDistVec<GIT,VT> test2(rvec.getcommgrid());
1068 Kselect1(test1, k_limit, myidentity<NT>());
1069 Kselect2(test2, k_limit);
1070 if(test1 == test2)
1071 SpParHelper::Print("Kselect1 and Kselect2 producing same results\n");
1072 else
1073 {
1074 SpParHelper::Print("WARNING: Kselect1 and Kselect2 producing DIFFERENT results\n");
1075 //test1.PrintToFile("test1");
1076 //test2.PrintToFile("test2");
1077 }
1078 #endif
1079
1080 if(kselectVersion==1 || k_limit < KSELECTLIMIT)
1081 return Kselect1(rvec, k_limit, myidentity<NT>());
1082 else
1083 return Kselect2(rvec, k_limit);
1084
1085 }
1086
1087 /* identify the k-th maximum element in each column of a matrix
1088 ** if the number of nonzeros in a column is less than or equal to k, return minimum entry
1089 ** Caution: this is a preliminary implementation: needs 3*(n/sqrt(p))*k memory per processor
1090 ** this memory requirement is too high for larger k
1091 */
1092 template <class IT, class NT, class DER>
1093 template <typename VT, typename GIT, typename _UnaryOperation> // GIT: global index type of vector
Kselect1(FullyDistVec<GIT,VT> & rvec,IT k,_UnaryOperation __unary_op) const1094 bool SpParMat<IT,NT,DER>::Kselect1(FullyDistVec<GIT,VT> & rvec, IT k, _UnaryOperation __unary_op) const
1095 {
1096 if(*rvec.commGrid != *commGrid)
1097 {
1098 SpParHelper::Print("Grids are not comparable, SpParMat::Kselect() fails!", commGrid->GetWorld());
1099 MPI_Abort(MPI_COMM_WORLD,GRIDMISMATCH);
1100 }
1101
1102 FullyDistVec<IT, IT> nnzPerColumn (getcommgrid());
1103 Reduce(nnzPerColumn, Column, std::plus<IT>(), (IT)0, [](NT val){return (IT)1;});
1104 IT maxnnzPerColumn = nnzPerColumn.Reduce(maximum<IT>(), (IT)0);
1105 if(k>maxnnzPerColumn)
1106 {
1107 SpParHelper::Print("Kselect: k is greater then maxNnzInColumn. Calling Reduce instead...\n");
1108 Reduce(rvec, Column, minimum<NT>(), static_cast<NT>(0));
1109 return false;
1110 }
1111
1112 IT n_thiscol = getlocalcols(); // length (number of columns) assigned to this processor (and processor column)
1113
1114 // check, memory should be min(n_thiscol*k, local nnz)
1115 // hence we will not overflow for very large k
1116 std::vector<VT> sendbuf(n_thiscol*k);
1117 std::vector<IT> send_coldisp(n_thiscol+1,0);
1118 std::vector<IT> local_coldisp(n_thiscol+1,0);
1119
1120
1121 //displacement of local columns
1122 //local_coldisp is the displacement of all nonzeros per column
1123 //send_coldisp is the displacement of k nonzeros per column
1124 IT nzc = 0;
1125 if(spSeq->getnnz()>0)
1126 {
1127 typename DER::SpColIter colit = spSeq->begcol();
1128 for(IT i=0; i<n_thiscol; ++i)
1129 {
1130 local_coldisp[i+1] = local_coldisp[i];
1131 send_coldisp[i+1] = send_coldisp[i];
1132 if(i==colit.colid())
1133 {
1134 local_coldisp[i+1] += colit.nnz();
1135 if(colit.nnz()>=k)
1136 send_coldisp[i+1] += k;
1137 else
1138 send_coldisp[i+1] += colit.nnz();
1139 colit++;
1140 nzc++;
1141 }
1142 }
1143 }
1144 assert(local_coldisp[n_thiscol] == spSeq->getnnz());
1145
1146 // a copy of local part of the matrix
1147 // this can be avoided if we write our own local kselect function instead of using partial_sort
1148 std::vector<VT> localmat(spSeq->getnnz());
1149
1150
1151 #ifdef THREADED
1152 #pragma omp parallel for
1153 #endif
1154 for(IT i=0; i<nzc; i++)
1155 //for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit) // iterate over columns
1156 {
1157 typename DER::SpColIter colit = spSeq->begcol() + i;
1158 IT colid = colit.colid();
1159 IT idx = local_coldisp[colid];
1160 for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit < spSeq->endnz(colit); ++nzit)
1161 {
1162 localmat[idx++] = static_cast<VT>(__unary_op(nzit.value()));
1163 }
1164
1165 if(colit.nnz()<=k)
1166 {
1167 std::sort(localmat.begin()+local_coldisp[colid], localmat.begin()+local_coldisp[colid+1], std::greater<VT>());
1168 std::copy(localmat.begin()+local_coldisp[colid], localmat.begin()+local_coldisp[colid+1], sendbuf.begin()+send_coldisp[colid]);
1169 }
1170 else
1171 {
1172 std::partial_sort(localmat.begin()+local_coldisp[colid], localmat.begin()+local_coldisp[colid]+k, localmat.begin()+local_coldisp[colid+1], std::greater<VT>());
1173 std::copy(localmat.begin()+local_coldisp[colid], localmat.begin()+local_coldisp[colid]+k, sendbuf.begin()+send_coldisp[colid]);
1174 }
1175 }
1176
1177 std::vector<VT>().swap(localmat);
1178 std::vector<IT>().swap(local_coldisp);
1179
1180 std::vector<VT> recvbuf(n_thiscol*k);
1181 std::vector<VT> tempbuf(n_thiscol*k);
1182 std::vector<IT> recv_coldisp(n_thiscol+1);
1183 std::vector<IT> templen(n_thiscol);
1184
1185 int colneighs = commGrid->GetGridRows();
1186 int colrank = commGrid->GetRankInProcCol();
1187
1188 for(int p=2; p <= colneighs; p*=2)
1189 {
1190
1191 if(colrank%p == p/2) // this processor is a sender in this round
1192 {
1193 int receiver = colrank - ceil(p/2);
1194 MPI_Send(send_coldisp.data(), n_thiscol+1, MPIType<IT>(), receiver, 0, commGrid->GetColWorld());
1195 MPI_Send(sendbuf.data(), send_coldisp[n_thiscol], MPIType<VT>(), receiver, 1, commGrid->GetColWorld());
1196 //break;
1197 }
1198 else if(colrank%p == 0) // this processor is a receiver in this round
1199 {
1200 int sender = colrank + ceil(p/2);
1201 if(sender < colneighs)
1202 {
1203
1204 MPI_Recv(recv_coldisp.data(), n_thiscol+1, MPIType<IT>(), sender, 0, commGrid->GetColWorld(), MPI_STATUS_IGNORE);
1205 MPI_Recv(recvbuf.data(), recv_coldisp[n_thiscol], MPIType<VT>(), sender, 1, commGrid->GetColWorld(), MPI_STATUS_IGNORE);
1206
1207
1208
1209 #ifdef THREADED
1210 #pragma omp parallel for
1211 #endif
1212 for(IT i=0; i<n_thiscol; ++i)
1213 {
1214 // partial merge until first k elements
1215 IT j=send_coldisp[i], l=recv_coldisp[i];
1216 //IT templen[i] = k*i;
1217 IT offset = k*i;
1218 IT lid = 0;
1219 for(; j<send_coldisp[i+1] && l<recv_coldisp[i+1] && lid<k;)
1220 {
1221 if(sendbuf[j] > recvbuf[l]) // decision
1222 tempbuf[offset+lid++] = sendbuf[j++];
1223 else
1224 tempbuf[offset+lid++] = recvbuf[l++];
1225 }
1226 while(j<send_coldisp[i+1] && lid<k) tempbuf[offset+lid++] = sendbuf[j++];
1227 while(l<recv_coldisp[i+1] && lid<k) tempbuf[offset+lid++] = recvbuf[l++];
1228 templen[i] = lid;
1229 }
1230
1231 send_coldisp[0] = 0;
1232 for(IT i=0; i<n_thiscol; i++)
1233 {
1234 send_coldisp[i+1] = send_coldisp[i] + templen[i];
1235 }
1236
1237
1238 #ifdef THREADED
1239 #pragma omp parallel for
1240 #endif
1241 for(IT i=0; i<n_thiscol; i++) // direct copy
1242 {
1243 IT offset = k*i;
1244 std::copy(tempbuf.begin()+offset, tempbuf.begin()+offset+templen[i], sendbuf.begin() + send_coldisp[i]);
1245 }
1246 }
1247 }
1248 }
1249 MPI_Barrier(commGrid->GetWorld());
1250 std::vector<VT> kthItem(n_thiscol);
1251
1252 int root = commGrid->GetDiagOfProcCol();
1253 if(root==0 && colrank==0) // rank 0
1254 {
1255 #ifdef THREADED
1256 #pragma omp parallel for
1257 #endif
1258 for(IT i=0; i<n_thiscol; i++)
1259 {
1260 IT nitems = send_coldisp[i+1]-send_coldisp[i];
1261 if(nitems >= k)
1262 kthItem[i] = sendbuf[send_coldisp[i]+k-1];
1263 else
1264 kthItem[i] = std::numeric_limits<VT>::min(); // return minimum possible value if a column is empty or has less than k elements
1265 }
1266 }
1267 else if(root>0 && colrank==0) // send to the diagonl processor of this processor column
1268 {
1269 #ifdef THREADED
1270 #pragma omp parallel for
1271 #endif
1272 for(IT i=0; i<n_thiscol; i++)
1273 {
1274 IT nitems = send_coldisp[i+1]-send_coldisp[i];
1275 if(nitems >= k)
1276 kthItem[i] = sendbuf[send_coldisp[i]+k-1];
1277 else
1278 kthItem[i] = std::numeric_limits<VT>::min(); // return minimum possible value if a column is empty or has less than k elements
1279 }
1280 MPI_Send(kthItem.data(), n_thiscol, MPIType<VT>(), root, 0, commGrid->GetColWorld());
1281 }
1282 else if(root>0 && colrank==root)
1283 {
1284 MPI_Recv(kthItem.data(), n_thiscol, MPIType<VT>(), 0, 0, commGrid->GetColWorld(), MPI_STATUS_IGNORE);
1285 }
1286
1287 std::vector <int> sendcnts;
1288 std::vector <int> dpls;
1289 if(colrank==root)
1290 {
1291 int proccols = commGrid->GetGridCols();
1292 IT n_perproc = n_thiscol / proccols;
1293 sendcnts.resize(proccols);
1294 std::fill(sendcnts.data(), sendcnts.data()+proccols-1, n_perproc);
1295 sendcnts[proccols-1] = n_thiscol - (n_perproc * (proccols-1));
1296 dpls.resize(proccols,0); // displacements (zero initialized pid)
1297 std::partial_sum(sendcnts.data(), sendcnts.data()+proccols-1, dpls.data()+1);
1298 }
1299
1300 int rowroot = commGrid->GetDiagOfProcRow();
1301 int recvcnts = 0;
1302 // scatter received data size
1303 MPI_Scatter(sendcnts.data(),1, MPI_INT, & recvcnts, 1, MPI_INT, rowroot, commGrid->GetRowWorld());
1304
1305 rvec.arr.resize(recvcnts);
1306 MPI_Scatterv(kthItem.data(),sendcnts.data(), dpls.data(), MPIType<VT>(), rvec.arr.data(), rvec.arr.size(), MPIType<VT>(),rowroot, commGrid->GetRowWorld());
1307 rvec.glen = getncol();
1308 return true;
1309 }
1310
1311
1312
1313 template <class IT, class NT, class DER>
1314 template <typename VT, typename GIT, typename _UnaryOperation> // GIT: global index type of vector
Kselect1(FullyDistSpVec<GIT,VT> & rvec,IT k,_UnaryOperation __unary_op) const1315 bool SpParMat<IT,NT,DER>::Kselect1(FullyDistSpVec<GIT,VT> & rvec, IT k, _UnaryOperation __unary_op) const
1316 {
1317 if(*rvec.commGrid != *commGrid)
1318 {
1319 SpParHelper::Print("Grids are not comparable, SpParMat::Kselect() fails!", commGrid->GetWorld());
1320 MPI_Abort(MPI_COMM_WORLD,GRIDMISMATCH);
1321 }
1322
1323 /*
1324 FullyDistVec<IT, IT> nnzPerColumn (getcommgrid());
1325 Reduce(nnzPerColumn, Column, plus<IT>(), (IT)0, [](NT val){return (IT)1;});
1326 IT maxnnzPerColumn = nnzPerColumn.Reduce(maximum<IT>(), (IT)0);
1327 if(k>maxnnzPerColumn)
1328 {
1329 SpParHelper::Print("Kselect: k is greater then maxNnzInColumn. Calling Reduce instead...\n");
1330 Reduce(rvec, Column, minimum<NT>(), static_cast<NT>(0));
1331 return false;
1332 }
1333 */
1334
1335 IT n_thiscol = getlocalcols(); // length (number of columns) assigned to this processor (and processor column)
1336 MPI_Comm World = rvec.commGrid->GetWorld();
1337 MPI_Comm ColWorld = rvec.commGrid->GetColWorld();
1338 MPI_Comm RowWorld = rvec.commGrid->GetRowWorld();
1339 int colneighs = commGrid->GetGridRows();
1340 int colrank = commGrid->GetRankInProcCol();
1341 int coldiagrank = commGrid->GetDiagOfProcCol();
1342
1343
1344 //replicate sparse indices along processor column
1345 int accnz;
1346 int32_t trxlocnz;
1347 GIT lenuntil;
1348 int32_t *trxinds, *activeCols;
1349 VT *trxnums, *numacc;
1350 TransposeVector(World, rvec, trxlocnz, lenuntil, trxinds, trxnums, true);
1351
1352 if(rvec.commGrid->GetGridRows() > 1)
1353 {
1354 //TODO: we only need to communicate indices
1355 AllGatherVector(ColWorld, trxlocnz, lenuntil, trxinds, trxnums, activeCols, numacc, accnz, true); // trxindS/trxnums deallocated, indacc/numacc allocated, accnz set
1356 }
1357 else
1358 {
1359 accnz = trxlocnz;
1360 activeCols = trxinds; //aliasing ptr
1361 numacc = trxnums; //aliasing ptr
1362 }
1363
1364
1365 std::vector<bool> isactive(n_thiscol,false);
1366 for(int i=0; i<accnz ; i++)
1367 {
1368 isactive[activeCols[i]] = true;
1369 //cout << indacc[i] << " ";
1370 }
1371 IT nActiveCols = accnz;//count_if(isactive.begin(), isactive.end(), [](bool ac){return ac;});
1372 // check, memory should be min(n_thiscol*k, local nnz)
1373 // hence we will not overflow for very large k
1374
1375 std::vector<IT> send_coldisp(n_thiscol+1,0);
1376 std::vector<IT> local_coldisp(n_thiscol+1,0);
1377 //vector<VT> sendbuf(nActiveCols*k);
1378 VT * sendbuf = static_cast<VT *> (::operator new (n_thiscol*k*sizeof(VT)));
1379
1380
1381 //displacement of local columns
1382 //local_coldisp is the displacement of all nonzeros per column
1383 //send_coldisp is the displacement of k nonzeros per column
1384 IT nzc = 0;
1385 if(spSeq->getnnz()>0)
1386 {
1387 typename DER::SpColIter colit = spSeq->begcol();
1388 for(IT i=0; i<n_thiscol; ++i)
1389 {
1390 local_coldisp[i+1] = local_coldisp[i];
1391 send_coldisp[i+1] = send_coldisp[i];
1392 if(i==colit.colid())
1393 {
1394 if(isactive[i])
1395 {
1396 local_coldisp[i+1] += colit.nnz();
1397 if(colit.nnz()>=k)
1398 send_coldisp[i+1] += k;
1399 else
1400 send_coldisp[i+1] += colit.nnz();
1401 }
1402 colit++;
1403 nzc++;
1404 }
1405 }
1406 }
1407
1408 // a copy of local part of the matrix
1409 // this can be avoided if we write our own local kselect function instead of using partial_sort
1410 //vector<VT> localmat(local_coldisp[n_thiscol]);
1411 VT * localmat = static_cast<VT *> (::operator new (local_coldisp[n_thiscol]*sizeof(VT)));
1412
1413
1414 #ifdef THREADED
1415 #pragma omp parallel for
1416 #endif
1417 for(IT i=0; i<nzc; i++)
1418 //for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit) // iterate over columns
1419 {
1420 typename DER::SpColIter colit = spSeq->begcol() + i;
1421 IT colid = colit.colid();
1422 if(isactive[colid])
1423 {
1424 IT idx = local_coldisp[colid];
1425 for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit < spSeq->endnz(colit); ++nzit)
1426 {
1427 localmat[idx++] = static_cast<VT>(__unary_op(nzit.value()));
1428 }
1429
1430 if(colit.nnz()<=k)
1431 {
1432 sort(localmat+local_coldisp[colid], localmat+local_coldisp[colid+1], std::greater<VT>());
1433 std::copy(localmat+local_coldisp[colid], localmat+local_coldisp[colid+1], sendbuf+send_coldisp[colid]);
1434 }
1435 else
1436 {
1437 partial_sort(localmat+local_coldisp[colid], localmat+local_coldisp[colid]+k, localmat+local_coldisp[colid+1], std::greater<VT>());
1438 std::copy(localmat+local_coldisp[colid], localmat+local_coldisp[colid]+k, sendbuf+send_coldisp[colid]);
1439 }
1440 }
1441 }
1442
1443
1444 //vector<VT>().swap(localmat);
1445 ::operator delete(localmat);
1446 std::vector<IT>().swap(local_coldisp);
1447
1448 VT * recvbuf = static_cast<VT *> (::operator new (n_thiscol*k*sizeof(VT)));
1449 VT * tempbuf = static_cast<VT *> (::operator new (n_thiscol*k*sizeof(VT)));
1450 //vector<VT> recvbuf(n_thiscol*k);
1451 //vector<VT> tempbuf(n_thiscol*k);
1452 std::vector<IT> recv_coldisp(n_thiscol+1);
1453 std::vector<IT> templen(n_thiscol);
1454
1455
1456
1457 for(int p=2; p <= colneighs; p*=2)
1458 {
1459
1460 if(colrank%p == p/2) // this processor is a sender in this round
1461 {
1462 int receiver = colrank - ceil(p/2);
1463 MPI_Send(send_coldisp.data(), n_thiscol+1, MPIType<IT>(), receiver, 0, commGrid->GetColWorld());
1464 MPI_Send(sendbuf, send_coldisp[n_thiscol], MPIType<VT>(), receiver, 1, commGrid->GetColWorld());
1465 //break;
1466 }
1467 else if(colrank%p == 0) // this processor is a receiver in this round
1468 {
1469 int sender = colrank + ceil(p/2);
1470 if(sender < colneighs)
1471 {
1472
1473 MPI_Recv(recv_coldisp.data(), n_thiscol+1, MPIType<IT>(), sender, 0, commGrid->GetColWorld(), MPI_STATUS_IGNORE);
1474 MPI_Recv(recvbuf, recv_coldisp[n_thiscol], MPIType<VT>(), sender, 1, commGrid->GetColWorld(), MPI_STATUS_IGNORE);
1475
1476
1477
1478 #ifdef THREADED
1479 #pragma omp parallel for
1480 #endif
1481 for(IT i=0; i<n_thiscol; ++i)
1482 {
1483 // partial merge until first k elements
1484 IT j=send_coldisp[i], l=recv_coldisp[i];
1485 //IT templen[i] = k*i;
1486 IT offset = k*i;
1487 IT lid = 0;
1488 for(; j<send_coldisp[i+1] && l<recv_coldisp[i+1] && lid<k;)
1489 {
1490 if(sendbuf[j] > recvbuf[l]) // decision
1491 tempbuf[offset+lid++] = sendbuf[j++];
1492 else
1493 tempbuf[offset+lid++] = recvbuf[l++];
1494 }
1495 while(j<send_coldisp[i+1] && lid<k) tempbuf[offset+lid++] = sendbuf[j++];
1496 while(l<recv_coldisp[i+1] && lid<k) tempbuf[offset+lid++] = recvbuf[l++];
1497 templen[i] = lid;
1498 }
1499
1500 send_coldisp[0] = 0;
1501 for(IT i=0; i<n_thiscol; i++)
1502 {
1503 send_coldisp[i+1] = send_coldisp[i] + templen[i];
1504 }
1505
1506
1507 #ifdef THREADED
1508 #pragma omp parallel for
1509 #endif
1510 for(IT i=0; i<n_thiscol; i++) // direct copy
1511 {
1512 IT offset = k*i;
1513 std::copy(tempbuf+offset, tempbuf+offset+templen[i], sendbuf + send_coldisp[i]);
1514 }
1515 }
1516 }
1517 }
1518 MPI_Barrier(commGrid->GetWorld());
1519
1520
1521
1522
1523 /*--------------------------------------------------------
1524 At this point, top k elements in every active column
1525 are gathered on the first processor row, P(0,:).
1526
1527 Next step: At P(0,i) find the kth largest element in
1528 active columns belonging to P(0,i).
1529 If nnz in a column is less than k, keep the largest nonzero.
1530 If a column is empty, keep the lowest numeric value.
1531 --------------------------------------------------------*/
1532
1533 std::vector<VT> kthItem(nActiveCols); // kth elements of local active columns
1534 if(colrank==0)
1535 {
1536 #ifdef THREADED
1537 #pragma omp parallel for
1538 #endif
1539 for(IT i=0; i<nActiveCols; i++)
1540 {
1541 IT ai = activeCols[i]; // active column index
1542 IT nitems = send_coldisp[ai+1]-send_coldisp[ai];
1543 if(nitems >= k)
1544 kthItem[i] = sendbuf[send_coldisp[ai]+k-1];
1545 else if (nitems==0)
1546 kthItem[i] = std::numeric_limits<VT>::min(); // return minimum possible value if a column is empty
1547 else
1548 kthItem[i] = sendbuf[send_coldisp[ai+1]-1]; // returning the last entry if nnz in this column is less than k
1549
1550 }
1551 }
1552
1553 /*--------------------------------------------------------
1554 At this point, kth largest elements in every active column
1555 are gathered on the first processor row, P(0,:).
1556
1557 Next step: Send the kth largest elements from P(0,i) to P(i,i)
1558 Nothing to do for P(0,0)
1559 --------------------------------------------------------*/
1560 if(coldiagrank>0 && colrank==0)
1561 {
1562 MPI_Send(kthItem.data(), nActiveCols, MPIType<VT>(), coldiagrank, 0, commGrid->GetColWorld());
1563 }
1564 else if(coldiagrank>0 && colrank==coldiagrank) // receive in the diagonal processor
1565 {
1566 MPI_Recv(kthItem.data(), nActiveCols, MPIType<VT>(), 0, 0, commGrid->GetColWorld(), MPI_STATUS_IGNORE);
1567 }
1568
1569 /*--------------------------------------------------------
1570 At this point, kth largest elements in every active column
1571 are gathered on the diagonal processors P(i,i).
1572
1573 Next step: Scatter the kth largest elements from P(i,i)
1574 to all processors in the ith row, P(i,:).
1575 Each processor recevies exactly local nnz of rvec entries
1576 so that the received data can be directly put in rvec.
1577 --------------------------------------------------------*/
1578 int rowroot = commGrid->GetDiagOfProcRow();
1579 int proccols = commGrid->GetGridCols();
1580 std::vector <int> sendcnts(proccols,0);
1581 std::vector <int> dpls(proccols,0);
1582 int lsize = rvec.ind.size();
1583 // local sizes of the input vecotor will be sent from the doagonal processor
1584 MPI_Gather(&lsize,1, MPI_INT, sendcnts.data(), 1, MPI_INT, rowroot, RowWorld);
1585 std::partial_sum(sendcnts.data(), sendcnts.data()+proccols-1, dpls.data()+1);
1586 MPI_Scatterv(kthItem.data(),sendcnts.data(), dpls.data(), MPIType<VT>(), rvec.num.data(), rvec.num.size(), MPIType<VT>(),rowroot, RowWorld);
1587
1588 ::operator delete(sendbuf);
1589 ::operator delete(recvbuf);
1590 ::operator delete(tempbuf);
1591
1592
1593 return true;
1594 }
1595
1596 // only defined for symmetric matrix
1597 template <class IT, class NT, class DER>
1598 IT SpParMat<IT,NT,DER>::Bandwidth() const
1599 {
1600 IT upperlBW = -1;
1601 IT lowerlBW = -1;
1602 IT m_perproc = getnrow() / commGrid->GetGridRows();
1603 IT n_perproc = getncol() / commGrid->GetGridCols();
1604 IT moffset = commGrid->GetRankInProcCol() * m_perproc;
1605 IT noffset = commGrid->GetRankInProcRow() * n_perproc;
1606
1607 for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit) // iterate over columns
1608 {
1609 IT diagrow = colit.colid() + noffset;
1610 typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit);
1611 if(nzit != spSeq->endnz(colit)) // nonempty column
1612 {
1613 IT firstrow = nzit.rowid() + moffset;
1614 IT lastrow = (nzit+ colit.nnz()-1).rowid() + moffset;
1615
1616 if(firstrow <= diagrow) // upper diagonal
1617 {
1618 IT dev = diagrow - firstrow;
1619 if(upperlBW < dev) upperlBW = dev;
1620 }
1621 if(lastrow >= diagrow) // lower diagonal
1622 {
1623 IT dev = lastrow - diagrow;
1624 if(lowerlBW < dev) lowerlBW = dev;
1625 }
1626 }
1627 }
1628 IT upperBW;
1629 //IT lowerBW;
1630 MPI_Allreduce( &upperlBW, &upperBW, 1, MPIType<IT>(), MPI_MAX, commGrid->GetWorld());
1631 //MPI_Allreduce( &lowerlBW, &lowerBW, 1, MPIType<IT>(), MPI_MAX, commGrid->GetWorld());
1632
1633 //return (upperBW + lowerBW + 1);
1634 return (upperBW);
1635 }
1636
1637
1638
1639 // only defined for symmetric matrix
1640 template <class IT, class NT, class DER>
1641 IT SpParMat<IT,NT,DER>::Profile() const
1642 {
1643 int colrank = commGrid->GetRankInProcRow();
1644 IT cols = getncol();
1645 IT rows = getnrow();
1646 IT m_perproc = cols / commGrid->GetGridRows();
1647 IT n_perproc = rows / commGrid->GetGridCols();
1648 IT moffset = commGrid->GetRankInProcCol() * m_perproc;
1649 IT noffset = colrank * n_perproc;
1650
1651
1652 int pc = commGrid->GetGridCols();
1653 IT n_thisproc;
1654 if(colrank!=pc-1 ) n_thisproc = n_perproc;
1655 else n_thisproc = cols - (pc-1)*n_perproc;
1656
1657
1658 std::vector<IT> firstRowInCol(n_thisproc,getnrow());
1659 std::vector<IT> lastRowInCol(n_thisproc,-1);
1660
1661 for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit) // iterate over columns
1662 {
1663 IT diagrow = colit.colid() + noffset;
1664 typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit);
1665 if(nzit != spSeq->endnz(colit)) // nonempty column
1666 {
1667 IT firstrow = nzit.rowid() + moffset;
1668 IT lastrow = (nzit+ colit.nnz()-1).rowid() + moffset;
1669 if(firstrow <= diagrow) // upper diagonal
1670 {
1671 firstRowInCol[colit.colid()] = firstrow;
1672 }
1673 if(lastrow >= diagrow) // lower diagonal
1674 {
1675 lastRowInCol[colit.colid()] = lastrow;
1676 }
1677 }
1678 }
1679
1680 std::vector<IT> firstRowInCol_global(n_thisproc,getnrow());
1681 //vector<IT> lastRowInCol_global(n_thisproc,-1);
1682 MPI_Allreduce( firstRowInCol.data(), firstRowInCol_global.data(), n_thisproc, MPIType<IT>(), MPI_MIN, commGrid->colWorld);
1683 //MPI_Allreduce( lastRowInCol.data(), lastRowInCol_global.data(), n_thisproc, MPIType<IT>(), MPI_MAX, commGrid->GetColWorld());
1684
1685 IT profile = 0;
1686 for(IT i=0; i<n_thisproc; i++)
1687 {
1688 if(firstRowInCol_global[i]==rows) // empty column
1689 profile++;
1690 else
1691 profile += (i + noffset - firstRowInCol_global[i]);
1692 }
1693
1694 IT profile_global = 0;
1695 MPI_Allreduce( &profile, &profile_global, 1, MPIType<IT>(), MPI_SUM, commGrid->rowWorld);
1696
1697 return (profile_global);
1698 }
1699
1700
1701
1702 template <class IT, class NT, class DER>
1703 template <typename VT, typename GIT, typename _BinaryOperation>
MaskedReduce(FullyDistVec<GIT,VT> & rvec,FullyDistSpVec<GIT,VT> & mask,Dim dim,_BinaryOperation __binary_op,VT id,bool exclude) const1704 void SpParMat<IT,NT,DER>::MaskedReduce(FullyDistVec<GIT,VT> & rvec, FullyDistSpVec<GIT,VT> & mask, Dim dim, _BinaryOperation __binary_op, VT id, bool exclude) const
1705 {
1706 if (dim!=Column)
1707 {
1708 SpParHelper::Print("SpParMat::MaskedReduce() is only implemented for Colum\n");
1709 MPI_Abort(MPI_COMM_WORLD,GRIDMISMATCH);
1710 }
1711 MaskedReduce(rvec, mask, dim, __binary_op, id, myidentity<NT>(), exclude);
1712 }
1713
1714 /**
1715 * Reduce along the column into a vector
1716 * @param[in] mask {A sparse vector indicating row indices included/excluded (based on exclude argument) in the reduction }
1717 * @param[in] __binary_op {the operation used for reduction; examples: max, min, plus, multiply, and, or. Its parameters and return type are all VT}
1718 * @param[in] id {scalar that is used as the identity for __binary_op; examples: zero, infinity}
1719 * @param[in] __unary_op {optional unary operation applied to nonzeros *before* the __binary_op; examples: 1/x, x^2}
1720 * @param[in] exclude {if true, masked row indices are included in the reduction}
1721 * @param[out] rvec {the return vector, specified as an output parameter to allow arbitrary return types via VT}
1722 **/
1723 template <class IT, class NT, class DER>
1724 template <typename VT, typename GIT, typename _BinaryOperation, typename _UnaryOperation> // GIT: global index type of vector
MaskedReduce(FullyDistVec<GIT,VT> & rvec,FullyDistSpVec<GIT,VT> & mask,Dim dim,_BinaryOperation __binary_op,VT id,_UnaryOperation __unary_op,bool exclude) const1725 void SpParMat<IT,NT,DER>::MaskedReduce(FullyDistVec<GIT,VT> & rvec, FullyDistSpVec<GIT,VT> & mask, Dim dim, _BinaryOperation __binary_op, VT id, _UnaryOperation __unary_op, bool exclude) const
1726 {
1727 MPI_Comm World = commGrid->GetWorld();
1728 MPI_Comm ColWorld = commGrid->GetColWorld();
1729 MPI_Comm RowWorld = commGrid->GetRowWorld();
1730
1731 if (dim!=Column)
1732 {
1733 SpParHelper::Print("SpParMat::MaskedReduce() is only implemented for Colum\n");
1734 MPI_Abort(MPI_COMM_WORLD,GRIDMISMATCH);
1735 }
1736 if(*rvec.commGrid != *commGrid)
1737 {
1738 SpParHelper::Print("Grids are not comparable, SpParMat::MaskedReduce() fails!", commGrid->GetWorld());
1739 MPI_Abort(MPI_COMM_WORLD,GRIDMISMATCH);
1740 }
1741
1742 int rowneighs = commGrid->GetGridCols();
1743 int rowrank = commGrid->GetRankInProcRow();
1744 std::vector<int> rownz(rowneighs);
1745 int locnnzMask = static_cast<int> (mask.getlocnnz());
1746 rownz[rowrank] = locnnzMask;
1747 MPI_Allgather(MPI_IN_PLACE, 1, MPI_INT, rownz.data(), 1, MPI_INT, RowWorld);
1748 std::vector<int> dpls(rowneighs+1,0);
1749 std::partial_sum(rownz.begin(), rownz.end(), dpls.data()+1);
1750 int accnz = std::accumulate(rownz.begin(), rownz.end(), 0);
1751 std::vector<GIT> sendInd(locnnzMask);
1752 std::transform(mask.ind.begin(), mask.ind.end(),sendInd.begin(), bind2nd(std::plus<GIT>(), mask.RowLenUntil()));
1753
1754 std::vector<GIT> indMask(accnz);
1755 MPI_Allgatherv(sendInd.data(), rownz[rowrank], MPIType<GIT>(), indMask.data(), rownz.data(), dpls.data(), MPIType<GIT>(), RowWorld);
1756
1757
1758 // We can't use rvec's distribution (rows first, columns later) here
1759 IT n_thiscol = getlocalcols(); // length assigned to this processor column
1760 int colneighs = commGrid->GetGridRows(); // including oneself
1761 int colrank = commGrid->GetRankInProcCol();
1762
1763 GIT * loclens = new GIT[colneighs];
1764 GIT * lensums = new GIT[colneighs+1](); // begin/end points of local lengths
1765
1766 GIT n_perproc = n_thiscol / colneighs; // length on a typical processor
1767 if(colrank == colneighs-1)
1768 loclens[colrank] = n_thiscol - (n_perproc*colrank);
1769 else
1770 loclens[colrank] = n_perproc;
1771
1772 MPI_Allgather(MPI_IN_PLACE, 0, MPIType<GIT>(), loclens, 1, MPIType<GIT>(), commGrid->GetColWorld());
1773 std::partial_sum(loclens, loclens+colneighs, lensums+1); // loclens and lensums are different, but both would fit in 32-bits
1774
1775 std::vector<VT> trarr;
1776 typename DER::SpColIter colit = spSeq->begcol();
1777 for(int i=0; i< colneighs; ++i)
1778 {
1779 VT * sendbuf = new VT[loclens[i]];
1780 std::fill(sendbuf, sendbuf+loclens[i], id); // fill with identity
1781
1782 for(; colit != spSeq->endcol() && colit.colid() < lensums[i+1]; ++colit) // iterate over a portion of columns
1783 {
1784 int k=0;
1785 typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit);
1786
1787 for(; nzit != spSeq->endnz(colit) && k < indMask.size(); ) // all nonzeros in this column
1788 {
1789 if(nzit.rowid() < indMask[k])
1790 {
1791 if(exclude)
1792 {
1793 sendbuf[colit.colid()-lensums[i]] = __binary_op(static_cast<VT>(__unary_op(nzit.value())), sendbuf[colit.colid()-lensums[i]]);
1794 }
1795 ++nzit;
1796 }
1797 else if(nzit.rowid() > indMask[k]) ++k;
1798 else
1799 {
1800 if(!exclude)
1801 {
1802 sendbuf[colit.colid()-lensums[i]] = __binary_op(static_cast<VT>(__unary_op(nzit.value())), sendbuf[colit.colid()-lensums[i]]);
1803 }
1804 ++k;
1805 ++nzit;
1806 }
1807
1808 }
1809 if(exclude)
1810 {
1811 while(nzit != spSeq->endnz(colit))
1812 {
1813 sendbuf[colit.colid()-lensums[i]] = __binary_op(static_cast<VT>(__unary_op(nzit.value())), sendbuf[colit.colid()-lensums[i]]);
1814 ++nzit;
1815 }
1816 }
1817 }
1818
1819 VT * recvbuf = NULL;
1820 if(colrank == i)
1821 {
1822 trarr.resize(loclens[i]);
1823 recvbuf = SpHelper::p2a(trarr);
1824 }
1825 MPI_Reduce(sendbuf, recvbuf, loclens[i], MPIType<VT>(), MPIOp<_BinaryOperation, VT>::op(), i, commGrid->GetColWorld()); // root = i
1826 delete [] sendbuf;
1827 }
1828 DeleteAll(loclens, lensums);
1829
1830 GIT reallen; // Now we have to transpose the vector
1831 GIT trlen = trarr.size();
1832 int diagneigh = commGrid->GetComplementRank();
1833 MPI_Status status;
1834 MPI_Sendrecv(&trlen, 1, MPIType<IT>(), diagneigh, TRNNZ, &reallen, 1, MPIType<IT>(), diagneigh, TRNNZ, commGrid->GetWorld(), &status);
1835
1836 rvec.arr.resize(reallen);
1837 MPI_Sendrecv(SpHelper::p2a(trarr), trlen, MPIType<VT>(), diagneigh, TRX, SpHelper::p2a(rvec.arr), reallen, MPIType<VT>(), diagneigh, TRX, commGrid->GetWorld(), &status);
1838 rvec.glen = getncol(); // ABAB: Put a sanity check here
1839
1840 }
1841
1842
1843
1844
1845 template <class IT, class NT, class DER>
1846 template <typename NNT,typename NDER>
operator SpParMat<IT,NNT,NDER>() const1847 SpParMat<IT,NT,DER>::operator SpParMat<IT,NNT,NDER> () const
1848 {
1849 NDER * convert = new NDER(*spSeq);
1850 return SpParMat<IT,NNT,NDER> (convert, commGrid);
1851 }
1852
1853 //! Change index type as well
1854 template <class IT, class NT, class DER>
1855 template <typename NIT, typename NNT,typename NDER>
operator SpParMat<NIT,NNT,NDER>() const1856 SpParMat<IT,NT,DER>::operator SpParMat<NIT,NNT,NDER> () const
1857 {
1858 NDER * convert = new NDER(*spSeq);
1859 return SpParMat<NIT,NNT,NDER> (convert, commGrid);
1860 }
1861
1862 /**
1863 * Create a submatrix of size m x (size(ci) * s) on a r x s processor grid
1864 * Essentially fetches the columns ci[0], ci[1],... ci[size(ci)] from every submatrix
1865 */
1866 template <class IT, class NT, class DER>
SubsRefCol(const std::vector<IT> & ci) const1867 SpParMat<IT,NT,DER> SpParMat<IT,NT,DER>::SubsRefCol (const std::vector<IT> & ci) const
1868 {
1869 std::vector<IT> ri;
1870 DER * tempseq = new DER((*spSeq)(ri, ci));
1871 return SpParMat<IT,NT,DER> (tempseq, commGrid);
1872 }
1873
1874 /**
1875 * Generalized sparse matrix indexing (ri/ci are 0-based indexed)
1876 * Both the storage and the actual values in FullyDistVec should be IT
1877 * The index vectors are dense and FULLY distributed on all processors
1878 * We can use this function to apply a permutation like A(p,q)
1879 * Sequential indexing subroutine (via multiplication) is general enough.
1880 */
1881 template <class IT, class NT, class DER>
1882 template <typename PTNTBOOL, typename PTBOOLNT>
SubsRef_SR(const FullyDistVec<IT,IT> & ri,const FullyDistVec<IT,IT> & ci,bool inplace)1883 SpParMat<IT,NT,DER> SpParMat<IT,NT,DER>::SubsRef_SR (const FullyDistVec<IT,IT> & ri, const FullyDistVec<IT,IT> & ci, bool inplace)
1884 {
1885 // infer the concrete type SpMat<IT,IT>
1886 typedef typename create_trait<DER, IT, bool>::T_inferred DER_IT;
1887
1888 if((*(ri.commGrid) != *(commGrid)) || (*(ci.commGrid) != *(commGrid)))
1889 {
1890 SpParHelper::Print("Grids are not comparable, SpRef fails !");
1891 MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
1892 }
1893
1894 // Safety check
1895 IT locmax_ri = 0;
1896 IT locmax_ci = 0;
1897 if(!ri.arr.empty())
1898 locmax_ri = *std::max_element(ri.arr.begin(), ri.arr.end());
1899 if(!ci.arr.empty())
1900 locmax_ci = *std::max_element(ci.arr.begin(), ci.arr.end());
1901
1902 IT total_m = getnrow();
1903 IT total_n = getncol();
1904 if(locmax_ri > total_m || locmax_ci > total_n)
1905 {
1906 throw outofrangeexception();
1907 }
1908
1909 // The indices for FullyDistVec are offset'd to 1/p pieces
1910 // The matrix indices are offset'd to 1/sqrt(p) pieces
1911 // Add the corresponding offset before sending the data
1912 IT roffset = ri.RowLenUntil();
1913 IT rrowlen = ri.MyRowLength();
1914 IT coffset = ci.RowLenUntil();
1915 IT crowlen = ci.MyRowLength();
1916
1917 // We create two boolean matrices P and Q
1918 // Dimensions: P is size(ri) x m
1919 // Q is n x size(ci)
1920 // Range(ri) = {0,...,m-1}
1921 // Range(ci) = {0,...,n-1}
1922
1923 IT rowneighs = commGrid->GetGridCols(); // number of neighbors along this processor row (including oneself)
1924 IT totalm = getnrow(); // collective call
1925 IT totaln = getncol();
1926 IT m_perproccol = totalm / rowneighs;
1927 IT n_perproccol = totaln / rowneighs;
1928
1929 // Get the right local dimensions
1930 IT diagneigh = commGrid->GetComplementRank();
1931 IT mylocalrows = getlocalrows();
1932 IT mylocalcols = getlocalcols();
1933 IT trlocalrows;
1934 MPI_Status status;
1935 MPI_Sendrecv(&mylocalrows, 1, MPIType<IT>(), diagneigh, TRROWX, &trlocalrows, 1, MPIType<IT>(), diagneigh, TRROWX, commGrid->GetWorld(), &status);
1936 // we don't need trlocalcols because Q.Transpose() will take care of it
1937
1938 std::vector< std::vector<IT> > rowid(rowneighs); // reuse for P and Q
1939 std::vector< std::vector<IT> > colid(rowneighs);
1940
1941 // Step 1: Create P
1942 IT locvec = ri.arr.size(); // nnz in local vector
1943 for(typename std::vector<IT>::size_type i=0; i< (unsigned)locvec; ++i)
1944 {
1945 // numerical values (permutation indices) are 0-based
1946 // recipient alone progessor row
1947 IT rowrec = (m_perproccol!=0) ? std::min(ri.arr[i] / m_perproccol, rowneighs-1) : (rowneighs-1);
1948
1949 // ri's numerical values give the colids and its local indices give rowids
1950 rowid[rowrec].push_back( i + roffset);
1951 colid[rowrec].push_back(ri.arr[i] - (rowrec * m_perproccol));
1952 }
1953
1954 int * sendcnt = new int[rowneighs]; // reuse in Q as well
1955 int * recvcnt = new int[rowneighs];
1956 for(IT i=0; i<rowneighs; ++i)
1957 sendcnt[i] = rowid[i].size();
1958
1959 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, commGrid->GetRowWorld()); // share the counts
1960 int * sdispls = new int[rowneighs]();
1961 int * rdispls = new int[rowneighs]();
1962 std::partial_sum(sendcnt, sendcnt+rowneighs-1, sdispls+1);
1963 std::partial_sum(recvcnt, recvcnt+rowneighs-1, rdispls+1);
1964 IT p_nnz = std::accumulate(recvcnt,recvcnt+rowneighs, static_cast<IT>(0));
1965
1966 // create space for incoming data ...
1967 IT * p_rows = new IT[p_nnz];
1968 IT * p_cols = new IT[p_nnz];
1969 IT * senddata = new IT[locvec]; // re-used for both rows and columns
1970 for(int i=0; i<rowneighs; ++i)
1971 {
1972 std::copy(rowid[i].begin(), rowid[i].end(), senddata+sdispls[i]);
1973 std::vector<IT>().swap(rowid[i]); // clear memory of rowid
1974 }
1975 MPI_Alltoallv(senddata, sendcnt, sdispls, MPIType<IT>(), p_rows, recvcnt, rdispls, MPIType<IT>(), commGrid->GetRowWorld());
1976
1977 for(int i=0; i<rowneighs; ++i)
1978 {
1979 std::copy(colid[i].begin(), colid[i].end(), senddata+sdispls[i]);
1980 std::vector<IT>().swap(colid[i]); // clear memory of colid
1981 }
1982 MPI_Alltoallv(senddata, sendcnt, sdispls, MPIType<IT>(), p_cols, recvcnt, rdispls, MPIType<IT>(), commGrid->GetRowWorld());
1983 delete [] senddata;
1984
1985 std::tuple<IT,IT,bool> * p_tuples = new std::tuple<IT,IT,bool>[p_nnz];
1986 for(IT i=0; i< p_nnz; ++i)
1987 {
1988 p_tuples[i] = std::make_tuple(p_rows[i], p_cols[i], 1);
1989 }
1990 DeleteAll(p_rows, p_cols);
1991
1992 DER_IT * PSeq = new DER_IT();
1993 PSeq->Create( p_nnz, rrowlen, trlocalrows, p_tuples); // deletion of tuples[] is handled by SpMat::Create
1994
1995 SpParMat<IT,NT,DER> PA(commGrid);
1996 if(&ri == &ci) // Symmetric permutation
1997 {
1998 DeleteAll(sendcnt, recvcnt, sdispls, rdispls);
1999 #ifdef SPREFDEBUG
2000 SpParHelper::Print("Symmetric permutation\n", commGrid->GetWorld());
2001 #endif
2002 SpParMat<IT,bool,DER_IT> P (PSeq, commGrid);
2003 if(inplace)
2004 {
2005 #ifdef SPREFDEBUG
2006 SpParHelper::Print("In place multiplication\n", commGrid->GetWorld());
2007 #endif
2008 *this = Mult_AnXBn_DoubleBuff<PTBOOLNT, NT, DER>(P, *this, false, true); // clear the memory of *this
2009
2010 //ostringstream outb;
2011 //outb << "P_after_" << commGrid->myrank;
2012 //ofstream ofb(outb.str().c_str());
2013 //P.put(ofb);
2014
2015 P.Transpose();
2016 *this = Mult_AnXBn_DoubleBuff<PTNTBOOL, NT, DER>(*this, P, true, true); // clear the memory of both *this and P
2017 return SpParMat<IT,NT,DER>(commGrid); // dummy return to match signature
2018 }
2019 else
2020 {
2021 PA = Mult_AnXBn_DoubleBuff<PTBOOLNT, NT, DER>(P,*this);
2022 P.Transpose();
2023 return Mult_AnXBn_DoubleBuff<PTNTBOOL, NT, DER>(PA, P);
2024 }
2025 }
2026 else
2027 {
2028 // Intermediate step (to save memory): Form PA and store it in P
2029 // Distributed matrix generation (collective call)
2030 SpParMat<IT,bool,DER_IT> P (PSeq, commGrid);
2031
2032 // Do parallel matrix-matrix multiply
2033 PA = Mult_AnXBn_DoubleBuff<PTBOOLNT, NT, DER>(P, *this);
2034 } // P is destructed here
2035 #ifndef NDEBUG
2036 PA.PrintInfo();
2037 #endif
2038 // Step 2: Create Q (use the same row-wise communication and transpose at the end)
2039 // This temporary to-be-transposed Q is size(ci) x n
2040 locvec = ci.arr.size(); // nnz in local vector (reset variable)
2041 for(typename std::vector<IT>::size_type i=0; i< (unsigned)locvec; ++i)
2042 {
2043 // numerical values (permutation indices) are 0-based
2044 IT rowrec = (n_perproccol!=0) ? std::min(ci.arr[i] / n_perproccol, rowneighs-1) : (rowneighs-1);
2045
2046 // ri's numerical values give the colids and its local indices give rowids
2047 rowid[rowrec].push_back( i + coffset);
2048 colid[rowrec].push_back(ci.arr[i] - (rowrec * n_perproccol));
2049 }
2050
2051 for(IT i=0; i<rowneighs; ++i)
2052 sendcnt[i] = rowid[i].size(); // update with new sizes
2053
2054 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, commGrid->GetRowWorld()); // share the counts
2055 std::fill(sdispls, sdispls+rowneighs, 0); // reset
2056 std::fill(rdispls, rdispls+rowneighs, 0);
2057 std::partial_sum(sendcnt, sendcnt+rowneighs-1, sdispls+1);
2058 std::partial_sum(recvcnt, recvcnt+rowneighs-1, rdispls+1);
2059 IT q_nnz = std::accumulate(recvcnt,recvcnt+rowneighs, static_cast<IT>(0));
2060
2061 // create space for incoming data ...
2062 IT * q_rows = new IT[q_nnz];
2063 IT * q_cols = new IT[q_nnz];
2064 senddata = new IT[locvec];
2065 for(int i=0; i<rowneighs; ++i)
2066 {
2067 std::copy(rowid[i].begin(), rowid[i].end(), senddata+sdispls[i]);
2068 std::vector<IT>().swap(rowid[i]); // clear memory of rowid
2069 }
2070 MPI_Alltoallv(senddata, sendcnt, sdispls, MPIType<IT>(), q_rows, recvcnt, rdispls, MPIType<IT>(), commGrid->GetRowWorld());
2071
2072 for(int i=0; i<rowneighs; ++i)
2073 {
2074 std::copy(colid[i].begin(), colid[i].end(), senddata+sdispls[i]);
2075 std::vector<IT>().swap(colid[i]); // clear memory of colid
2076 }
2077 MPI_Alltoallv(senddata, sendcnt, sdispls, MPIType<IT>(), q_cols, recvcnt, rdispls, MPIType<IT>(), commGrid->GetRowWorld());
2078 DeleteAll(senddata, sendcnt, recvcnt, sdispls, rdispls);
2079
2080 std::tuple<IT,IT,bool> * q_tuples = new std::tuple<IT,IT,bool>[q_nnz];
2081 for(IT i=0; i< q_nnz; ++i)
2082 {
2083 q_tuples[i] = std::make_tuple(q_rows[i], q_cols[i], 1);
2084 }
2085 DeleteAll(q_rows, q_cols);
2086 DER_IT * QSeq = new DER_IT();
2087 QSeq->Create( q_nnz, crowlen, mylocalcols, q_tuples); // Creating Q' instead
2088
2089 // Step 3: Form PAQ
2090 // Distributed matrix generation (collective call)
2091 SpParMat<IT,bool,DER_IT> Q (QSeq, commGrid);
2092 Q.Transpose();
2093 if(inplace)
2094 {
2095 *this = Mult_AnXBn_DoubleBuff<PTNTBOOL, NT, DER>(PA, Q, true, true); // clear the memory of both PA and P
2096 return SpParMat<IT,NT,DER>(commGrid); // dummy return to match signature
2097 }
2098 else
2099 {
2100 return Mult_AnXBn_DoubleBuff<PTNTBOOL, NT, DER>(PA, Q);
2101 }
2102 }
2103
2104
2105 template <class IT, class NT, class DER>
SpAsgn(const FullyDistVec<IT,IT> & ri,const FullyDistVec<IT,IT> & ci,SpParMat<IT,NT,DER> & B)2106 void SpParMat<IT,NT,DER>::SpAsgn(const FullyDistVec<IT,IT> & ri, const FullyDistVec<IT,IT> & ci, SpParMat<IT,NT,DER> & B)
2107 {
2108 typedef PlusTimesSRing<NT, NT> PTRing;
2109
2110 if((*(ri.commGrid) != *(B.commGrid)) || (*(ci.commGrid) != *(B.commGrid)))
2111 {
2112 SpParHelper::Print("Grids are not comparable, SpAsgn fails !", commGrid->GetWorld());
2113 MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2114 }
2115 IT total_m_A = getnrow();
2116 IT total_n_A = getncol();
2117 IT total_m_B = B.getnrow();
2118 IT total_n_B = B.getncol();
2119
2120 if(total_m_B != ri.TotalLength())
2121 {
2122 SpParHelper::Print("First dimension of B does NOT match the length of ri, SpAsgn fails !", commGrid->GetWorld());
2123 MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
2124 }
2125 if(total_n_B != ci.TotalLength())
2126 {
2127 SpParHelper::Print("Second dimension of B does NOT match the length of ci, SpAsgn fails !", commGrid->GetWorld());
2128 MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
2129 }
2130 Prune(ri, ci); // make a hole
2131
2132 // embed B to the size of A
2133 FullyDistVec<IT,IT> * rvec = new FullyDistVec<IT,IT>(ri.commGrid);
2134 rvec->iota(total_m_B, 0); // sparse() expects a zero based index
2135
2136 SpParMat<IT,NT,DER> R(total_m_A, total_m_B, ri, *rvec, 1);
2137 delete rvec; // free memory
2138 SpParMat<IT,NT,DER> RB = Mult_AnXBn_DoubleBuff<PTRing, NT, DER>(R, B, true, false); // clear memory of R but not B
2139
2140 FullyDistVec<IT,IT> * qvec = new FullyDistVec<IT,IT>(ri.commGrid);
2141 qvec->iota(total_n_B, 0);
2142 SpParMat<IT,NT,DER> Q(total_n_B, total_n_A, *qvec, ci, 1);
2143 delete qvec; // free memory
2144 SpParMat<IT,NT,DER> RBQ = Mult_AnXBn_DoubleBuff<PTRing, NT, DER>(RB, Q, true, true); // clear memory of RB and Q
2145 *this += RBQ; // extend-add
2146 }
2147
2148 template <class IT, class NT, class DER>
Prune(const FullyDistVec<IT,IT> & ri,const FullyDistVec<IT,IT> & ci)2149 void SpParMat<IT,NT,DER>::Prune(const FullyDistVec<IT,IT> & ri, const FullyDistVec<IT,IT> & ci)
2150 {
2151 typedef PlusTimesSRing<NT, NT> PTRing;
2152
2153 if((*(ri.commGrid) != *(commGrid)) || (*(ci.commGrid) != *(commGrid)))
2154 {
2155 SpParHelper::Print("Grids are not comparable, Prune fails!\n", commGrid->GetWorld());
2156 MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2157 }
2158
2159 // Safety check
2160 IT locmax_ri = 0;
2161 IT locmax_ci = 0;
2162 if(!ri.arr.empty())
2163 locmax_ri = *std::max_element(ri.arr.begin(), ri.arr.end());
2164 if(!ci.arr.empty())
2165 locmax_ci = *std::max_element(ci.arr.begin(), ci.arr.end());
2166
2167 IT total_m = getnrow();
2168 IT total_n = getncol();
2169 if(locmax_ri > total_m || locmax_ci > total_n)
2170 {
2171 throw outofrangeexception();
2172 }
2173
2174 SpParMat<IT,NT,DER> S(total_m, total_m, ri, ri, 1);
2175 SpParMat<IT,NT,DER> SA = Mult_AnXBn_DoubleBuff<PTRing, NT, DER>(S, *this, true, false); // clear memory of S but not *this
2176
2177 SpParMat<IT,NT,DER> T(total_n, total_n, ci, ci, 1);
2178 SpParMat<IT,NT,DER> SAT = Mult_AnXBn_DoubleBuff<PTRing, NT, DER>(SA, T, true, true); // clear memory of SA and T
2179 EWiseMult(SAT, true); // In-place EWiseMult with not(SAT)
2180 }
2181
2182 //! Prune every column of a sparse matrix based on pvals
2183 template <class IT, class NT, class DER>
2184 template <typename _BinaryOperation>
PruneColumn(const FullyDistVec<IT,NT> & pvals,_BinaryOperation __binary_op,bool inPlace)2185 SpParMat<IT,NT,DER> SpParMat<IT,NT,DER>::PruneColumn(const FullyDistVec<IT,NT> & pvals, _BinaryOperation __binary_op, bool inPlace)
2186 {
2187 MPI_Barrier(MPI_COMM_WORLD);
2188 if(getncol() != pvals.TotalLength())
2189 {
2190 std::ostringstream outs;
2191 outs << "Can not prune column-by-column, dimensions does not match"<< std::endl;
2192 outs << getncol() << " != " << pvals.TotalLength() << std::endl;
2193 SpParHelper::Print(outs.str());
2194 MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
2195 }
2196 if(! ( *(getcommgrid()) == *(pvals.getcommgrid())) )
2197 {
2198 std::cout << "Grids are not comparable for PurneColumn" << std::endl;
2199 MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2200 }
2201
2202 MPI_Comm World = pvals.commGrid->GetWorld();
2203 MPI_Comm ColWorld = pvals.commGrid->GetColWorld();
2204
2205 int xsize = (int) pvals.LocArrSize();
2206 int trxsize = 0;
2207
2208
2209 int diagneigh = pvals.commGrid->GetComplementRank();
2210 MPI_Status status;
2211 MPI_Sendrecv(&xsize, 1, MPI_INT, diagneigh, TRX, &trxsize, 1, MPI_INT, diagneigh, TRX, World, &status);
2212
2213
2214 NT * trxnums = new NT[trxsize];
2215 MPI_Sendrecv(const_cast<NT*>(SpHelper::p2a(pvals.arr)), xsize, MPIType<NT>(), diagneigh, TRX, trxnums, trxsize, MPIType<NT>(), diagneigh, TRX, World, &status);
2216
2217 int colneighs, colrank;
2218 MPI_Comm_size(ColWorld, &colneighs);
2219 MPI_Comm_rank(ColWorld, &colrank);
2220 int * colsize = new int[colneighs];
2221 colsize[colrank] = trxsize;
2222 MPI_Allgather(MPI_IN_PLACE, 1, MPI_INT, colsize, 1, MPI_INT, ColWorld);
2223 int * dpls = new int[colneighs](); // displacements (zero initialized pid)
2224 std::partial_sum(colsize, colsize+colneighs-1, dpls+1);
2225 int accsize = std::accumulate(colsize, colsize+colneighs, 0);
2226 std::vector<NT> numacc(accsize);
2227
2228 #ifdef COMBBLAS_DEBUG
2229 std::ostringstream outs2;
2230 outs2 << "PruneColumn displacements: ";
2231 for(int i=0; i< colneighs; ++i)
2232 {
2233 outs2 << dpls[i] << " ";
2234 }
2235 outs2 << std::endl;
2236 SpParHelper::Print(outs2.str());
2237 MPI_Barrier(World);
2238 #endif
2239
2240
2241 MPI_Allgatherv(trxnums, trxsize, MPIType<NT>(), numacc.data(), colsize, dpls, MPIType<NT>(), ColWorld);
2242 delete [] trxnums;
2243 delete [] colsize;
2244 delete [] dpls;
2245
2246 //sanity check
2247 assert(accsize == getlocalcols());
2248 if (inPlace)
2249 {
2250 spSeq->PruneColumn(numacc.data(), __binary_op, inPlace);
2251 return SpParMat<IT,NT,DER>(getcommgrid()); // return blank to match signature
2252 }
2253 else
2254 {
2255 return SpParMat<IT,NT,DER>(spSeq->PruneColumn(numacc.data(), __binary_op, inPlace), commGrid);
2256 }
2257 }
2258
2259
2260 //! Prune columns of a sparse matrix selected by nonzero indices of pvals
2261 //! Each selected column is pruned by corresponding values in pvals
2262 template <class IT, class NT, class DER>
2263 template <typename _BinaryOperation>
PruneColumn(const FullyDistSpVec<IT,NT> & pvals,_BinaryOperation __binary_op,bool inPlace)2264 SpParMat<IT,NT,DER> SpParMat<IT,NT,DER>::PruneColumn(const FullyDistSpVec<IT,NT> & pvals, _BinaryOperation __binary_op, bool inPlace)
2265 {
2266 MPI_Barrier(MPI_COMM_WORLD);
2267 if(getncol() != pvals.TotalLength())
2268 {
2269 std::ostringstream outs;
2270 outs << "Can not prune column-by-column, dimensions does not match"<< std::endl;
2271 outs << getncol() << " != " << pvals.TotalLength() << std::endl;
2272 SpParHelper::Print(outs.str());
2273 MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
2274 }
2275 if(! ( *(getcommgrid()) == *(pvals.getcommgrid())) )
2276 {
2277 std::cout << "Grids are not comparable for PurneColumn" << std::endl;
2278 MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2279 }
2280
2281 MPI_Comm World = pvals.commGrid->GetWorld();
2282 MPI_Comm ColWorld = pvals.commGrid->GetColWorld();
2283 int diagneigh = pvals.commGrid->GetComplementRank();
2284
2285 IT xlocnz = pvals.getlocnnz();
2286 IT roffst = pvals.RowLenUntil();
2287 IT roffset;
2288 IT trxlocnz = 0;
2289
2290 MPI_Status status;
2291 MPI_Sendrecv(&roffst, 1, MPIType<IT>(), diagneigh, TROST, &roffset, 1, MPIType<IT>(), diagneigh, TROST, World, &status);
2292 MPI_Sendrecv(&xlocnz, 1, MPIType<IT>(), diagneigh, TRNNZ, &trxlocnz, 1, MPIType<IT>(), diagneigh, TRNNZ, World, &status);
2293
2294 std::vector<IT> trxinds (trxlocnz);
2295 std::vector<NT> trxnums (trxlocnz);
2296 MPI_Sendrecv(pvals.ind.data(), xlocnz, MPIType<IT>(), diagneigh, TRI, trxinds.data(), trxlocnz, MPIType<IT>(), diagneigh, TRI, World, &status);
2297 MPI_Sendrecv(pvals.num.data(), xlocnz, MPIType<NT>(), diagneigh, TRX, trxnums.data(), trxlocnz, MPIType<NT>(), diagneigh, TRX, World, &status);
2298 std::transform(trxinds.data(), trxinds.data()+trxlocnz, trxinds.data(), std::bind2nd(std::plus<IT>(), roffset));
2299
2300
2301 int colneighs, colrank;
2302 MPI_Comm_size(ColWorld, &colneighs);
2303 MPI_Comm_rank(ColWorld, &colrank);
2304 int * colnz = new int[colneighs];
2305 colnz[colrank] = trxlocnz;
2306 MPI_Allgather(MPI_IN_PLACE, 1, MPI_INT, colnz, 1, MPI_INT, ColWorld);
2307 int * dpls = new int[colneighs](); // displacements (zero initialized pid)
2308 std::partial_sum(colnz, colnz+colneighs-1, dpls+1);
2309 IT accnz = std::accumulate(colnz, colnz+colneighs, 0);
2310
2311 std::vector<IT> indacc(accnz);
2312 std::vector<NT> numacc(accnz);
2313 MPI_Allgatherv(trxinds.data(), trxlocnz, MPIType<IT>(), indacc.data(), colnz, dpls, MPIType<IT>(), ColWorld);
2314 MPI_Allgatherv(trxnums.data(), trxlocnz, MPIType<NT>(), numacc.data(), colnz, dpls, MPIType<NT>(), ColWorld);
2315
2316 delete [] colnz;
2317 delete [] dpls;
2318
2319
2320 if (inPlace)
2321 {
2322 spSeq->PruneColumn(indacc.data(), numacc.data(), __binary_op, inPlace);
2323 return SpParMat<IT,NT,DER>(getcommgrid()); // return blank to match signature
2324 }
2325 else
2326 {
2327 return SpParMat<IT,NT,DER>(spSeq->PruneColumn(indacc.data(), numacc.data(), __binary_op, inPlace), commGrid);
2328 }
2329 }
2330
2331
2332
2333 // In-place version where rhs type is the same (no need for type promotion)
2334 template <class IT, class NT, class DER>
EWiseMult(const SpParMat<IT,NT,DER> & rhs,bool exclude)2335 void SpParMat<IT,NT,DER>::EWiseMult (const SpParMat< IT,NT,DER > & rhs, bool exclude)
2336 {
2337 if(*commGrid == *rhs.commGrid)
2338 {
2339 spSeq->EWiseMult(*(rhs.spSeq), exclude); // Dimension compatibility check performed by sequential function
2340 }
2341 else
2342 {
2343 std::cout << "Grids are not comparable, EWiseMult() fails !" << std::endl;
2344 MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2345 }
2346 }
2347
2348
2349 template <class IT, class NT, class DER>
EWiseScale(const DenseParMat<IT,NT> & rhs)2350 void SpParMat<IT,NT,DER>::EWiseScale(const DenseParMat<IT, NT> & rhs)
2351 {
2352 if(*commGrid == *rhs.commGrid)
2353 {
2354 spSeq->EWiseScale(rhs.array, rhs.m, rhs.n); // Dimension compatibility check performed by sequential function
2355 }
2356 else
2357 {
2358 std::cout << "Grids are not comparable, EWiseScale() fails !" << std::endl;
2359 MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2360 }
2361 }
2362
2363 template <class IT, class NT, class DER>
2364 template <typename _BinaryOperation>
UpdateDense(DenseParMat<IT,NT> & rhs,_BinaryOperation __binary_op) const2365 void SpParMat<IT,NT,DER>::UpdateDense(DenseParMat<IT, NT> & rhs, _BinaryOperation __binary_op) const
2366 {
2367 if(*commGrid == *rhs.commGrid)
2368 {
2369 if(getlocalrows() == rhs.m && getlocalcols() == rhs.n)
2370 {
2371 spSeq->UpdateDense(rhs.array, __binary_op);
2372 }
2373 else
2374 {
2375 std::cout << "Matrices have different dimensions, UpdateDense() fails !" << std::endl;
2376 MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
2377 }
2378 }
2379 else
2380 {
2381 std::cout << "Grids are not comparable, UpdateDense() fails !" << std::endl;
2382 MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2383 }
2384 }
2385
2386 template <class IT, class NT, class DER>
PrintInfo() const2387 void SpParMat<IT,NT,DER>::PrintInfo() const
2388 {
2389 IT mm = getnrow();
2390 IT nn = getncol();
2391 IT nznz = getnnz();
2392
2393 if (commGrid->myrank == 0)
2394 std::cout << "As a whole: " << mm << " rows and "<< nn <<" columns and "<< nznz << " nonzeros" << std::endl;
2395
2396 #ifdef DEBUG
2397 IT allprocs = commGrid->grrows * commGrid->grcols;
2398 for(IT i=0; i< allprocs; ++i)
2399 {
2400 if (commGrid->myrank == i)
2401 {
2402 std::cout << "Processor (" << commGrid->GetRankInProcRow() << "," << commGrid->GetRankInProcCol() << ")'s data: " << std::endl;
2403 spSeq->PrintInfo();
2404 }
2405 MPI_Barrier(commGrid->GetWorld());
2406 }
2407 #endif
2408 }
2409
2410 template <class IT, class NT, class DER>
operator ==(const SpParMat<IT,NT,DER> & rhs) const2411 bool SpParMat<IT,NT,DER>::operator== (const SpParMat<IT,NT,DER> & rhs) const
2412 {
2413 int local = static_cast<int>((*spSeq) == (*(rhs.spSeq)));
2414 int whole = 1;
2415 MPI_Allreduce( &local, &whole, 1, MPI_INT, MPI_BAND, commGrid->GetWorld());
2416 return static_cast<bool>(whole);
2417 }
2418
2419
2420 /**
2421 ** Private function that carries code common to different sparse() constructors
2422 ** Before this call, commGrid is already set
2423 **/
2424 template <class IT, class NT, class DER>
2425 template <typename _BinaryOperation, typename LIT>
SparseCommon(std::vector<std::vector<std::tuple<LIT,LIT,NT>>> & data,LIT locsize,IT total_m,IT total_n,_BinaryOperation BinOp)2426 void SpParMat< IT,NT,DER >::SparseCommon(std::vector< std::vector < std::tuple<LIT,LIT,NT> > > & data, LIT locsize, IT total_m, IT total_n, _BinaryOperation BinOp)
2427 {
2428 int nprocs = commGrid->GetSize();
2429 int * sendcnt = new int[nprocs];
2430 int * recvcnt = new int[nprocs];
2431 for(int i=0; i<nprocs; ++i)
2432 sendcnt[i] = data[i].size(); // sizes are all the same
2433
2434 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, commGrid->GetWorld()); // share the counts
2435 int * sdispls = new int[nprocs]();
2436 int * rdispls = new int[nprocs]();
2437 std::partial_sum(sendcnt, sendcnt+nprocs-1, sdispls+1);
2438 std::partial_sum(recvcnt, recvcnt+nprocs-1, rdispls+1);
2439 IT totrecv = std::accumulate(recvcnt,recvcnt+nprocs, static_cast<IT>(0));
2440 IT totsent = std::accumulate(sendcnt,sendcnt+nprocs, static_cast<IT>(0));
2441
2442 assert((totsent < std::numeric_limits<int>::max()));
2443 assert((totrecv < std::numeric_limits<int>::max()));
2444
2445
2446 #if 0
2447 ofstream oput;
2448 commGrid->OpenDebugFile("Displacements", oput);
2449 copy(sdispls, sdispls+nprocs, ostream_iterator<int>(oput, " ")); oput << endl;
2450 copy(rdispls, rdispls+nprocs, ostream_iterator<int>(oput, " ")); oput << endl;
2451 oput.close();
2452
2453 IT * gsizes;
2454 if(commGrid->GetRank() == 0) gsizes = new IT[nprocs];
2455 MPI_Gather(&totrecv, 1, MPIType<IT>(), gsizes, 1, MPIType<IT>(), 0, commGrid->GetWorld());
2456 if(commGrid->GetRank() == 0) { std::copy(gsizes, gsizes+nprocs, std::ostream_iterator<IT>(std::cout, " ")); std::cout << std::endl; }
2457 MPI_Barrier(commGrid->GetWorld());
2458 MPI_Gather(&totsent, 1, MPIType<IT>(), gsizes, 1, MPIType<IT>(), 0, commGrid->GetWorld());
2459 if(commGrid->GetRank() == 0) { copy(gsizes, gsizes+nprocs, ostream_iterator<IT>(cout, " ")); cout << endl; }
2460 MPI_Barrier(commGrid->GetWorld());
2461 if(commGrid->GetRank() == 0) delete [] gsizes;
2462 #endif
2463
2464 std::tuple<LIT,LIT,NT> * senddata = new std::tuple<LIT,LIT,NT>[locsize]; // re-used for both rows and columns
2465 for(int i=0; i<nprocs; ++i)
2466 {
2467 std::copy(data[i].begin(), data[i].end(), senddata+sdispls[i]);
2468 data[i].clear(); // clear memory
2469 data[i].shrink_to_fit();
2470 }
2471 MPI_Datatype MPI_triple;
2472 MPI_Type_contiguous(sizeof(std::tuple<LIT,LIT,NT>), MPI_CHAR, &MPI_triple);
2473 MPI_Type_commit(&MPI_triple);
2474
2475 std::tuple<LIT,LIT,NT> * recvdata = new std::tuple<LIT,LIT,NT>[totrecv];
2476 MPI_Alltoallv(senddata, sendcnt, sdispls, MPI_triple, recvdata, recvcnt, rdispls, MPI_triple, commGrid->GetWorld());
2477
2478 DeleteAll(senddata, sendcnt, recvcnt, sdispls, rdispls);
2479 MPI_Type_free(&MPI_triple);
2480
2481 int r = commGrid->GetGridRows();
2482 int s = commGrid->GetGridCols();
2483 IT m_perproc = total_m / r;
2484 IT n_perproc = total_n / s;
2485 int myprocrow = commGrid->GetRankInProcCol();
2486 int myproccol = commGrid->GetRankInProcRow();
2487 IT locrows, loccols;
2488 if(myprocrow != r-1) locrows = m_perproc;
2489 else locrows = total_m - myprocrow * m_perproc;
2490 if(myproccol != s-1) loccols = n_perproc;
2491 else loccols = total_n - myproccol * n_perproc;
2492
2493 SpTuples<LIT,NT> A(totrecv, locrows, loccols, recvdata); // It is ~SpTuples's job to deallocate
2494
2495 // the previous constructor sorts based on columns-first (but that doesn't matter as long as they are sorted one way or another)
2496 A.RemoveDuplicates(BinOp);
2497
2498 spSeq = new DER(A,false); // Convert SpTuples to DER
2499 }
2500
2501
2502 //! All vectors are zero-based indexed (as usual)
2503 template <class IT, class NT, class DER>
SpParMat(IT total_m,IT total_n,const FullyDistVec<IT,IT> & distrows,const FullyDistVec<IT,IT> & distcols,const FullyDistVec<IT,NT> & distvals,bool SumDuplicates)2504 SpParMat< IT,NT,DER >::SpParMat (IT total_m, IT total_n, const FullyDistVec<IT,IT> & distrows,
2505 const FullyDistVec<IT,IT> & distcols, const FullyDistVec<IT,NT> & distvals, bool SumDuplicates)
2506 {
2507 if((*(distrows.commGrid) != *(distcols.commGrid)) || (*(distcols.commGrid) != *(distvals.commGrid)))
2508 {
2509 SpParHelper::Print("Grids are not comparable, Sparse() fails!\n"); // commGrid is not initialized yet
2510 MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2511 }
2512 if((distrows.TotalLength() != distcols.TotalLength()) || (distcols.TotalLength() != distvals.TotalLength()))
2513 {
2514 SpParHelper::Print("Vectors have different sizes, Sparse() fails!");
2515 MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
2516 }
2517
2518 commGrid = distrows.commGrid;
2519 int nprocs = commGrid->GetSize();
2520 std::vector< std::vector < std::tuple<IT,IT,NT> > > data(nprocs);
2521
2522 IT locsize = distrows.LocArrSize();
2523 for(IT i=0; i<locsize; ++i)
2524 {
2525 IT lrow, lcol;
2526 int owner = Owner(total_m, total_n, distrows.arr[i], distcols.arr[i], lrow, lcol);
2527 data[owner].push_back(std::make_tuple(lrow,lcol,distvals.arr[i]));
2528 }
2529 if(SumDuplicates)
2530 {
2531 SparseCommon(data, locsize, total_m, total_n, std::plus<NT>());
2532 }
2533 else
2534 {
2535 SparseCommon(data, locsize, total_m, total_n, maximum<NT>());
2536 }
2537 }
2538
2539
2540
2541 template <class IT, class NT, class DER>
SpParMat(IT total_m,IT total_n,const FullyDistVec<IT,IT> & distrows,const FullyDistVec<IT,IT> & distcols,const NT & val,bool SumDuplicates)2542 SpParMat< IT,NT,DER >::SpParMat (IT total_m, IT total_n, const FullyDistVec<IT,IT> & distrows,
2543 const FullyDistVec<IT,IT> & distcols, const NT & val, bool SumDuplicates)
2544 {
2545 if((*(distrows.commGrid) != *(distcols.commGrid)) )
2546 {
2547 SpParHelper::Print("Grids are not comparable, Sparse() fails!\n");
2548 MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2549 }
2550 if((distrows.TotalLength() != distcols.TotalLength()) )
2551 {
2552 SpParHelper::Print("Vectors have different sizes, Sparse() fails!\n");
2553 MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
2554 }
2555 commGrid = distrows.commGrid;
2556 int nprocs = commGrid->GetSize();
2557 std::vector< std::vector < std::tuple<IT,IT,NT> > > data(nprocs);
2558
2559 IT locsize = distrows.LocArrSize();
2560 for(IT i=0; i<locsize; ++i)
2561 {
2562 IT lrow, lcol;
2563 int owner = Owner(total_m, total_n, distrows.arr[i], distcols.arr[i], lrow, lcol);
2564 data[owner].push_back(std::make_tuple(lrow,lcol,val));
2565 }
2566 if(SumDuplicates)
2567 {
2568 SparseCommon(data, locsize, total_m, total_n, std::plus<NT>());
2569 }
2570 else
2571 {
2572 SparseCommon(data, locsize, total_m, total_n, maximum<NT>());
2573 }
2574 }
2575
2576 template <class IT, class NT, class DER>
2577 template <class DELIT>
SpParMat(const DistEdgeList<DELIT> & DEL,bool removeloops)2578 SpParMat< IT,NT,DER >::SpParMat (const DistEdgeList<DELIT> & DEL, bool removeloops)
2579 {
2580 commGrid = DEL.commGrid;
2581 typedef typename DER::LocalIT LIT;
2582
2583 int nprocs = commGrid->GetSize();
2584 int gridrows = commGrid->GetGridRows();
2585 int gridcols = commGrid->GetGridCols();
2586 std::vector< std::vector<LIT> > data(nprocs); // enties are pre-converted to local indices before getting pushed into "data"
2587
2588 LIT m_perproc = DEL.getGlobalV() / gridrows;
2589 LIT n_perproc = DEL.getGlobalV() / gridcols;
2590
2591 if(sizeof(LIT) < sizeof(DELIT))
2592 {
2593 std::ostringstream outs;
2594 outs << "Warning: Using smaller indices for the matrix than DistEdgeList\n";
2595 outs << "Local matrices are " << m_perproc << "-by-" << n_perproc << std::endl;
2596 SpParHelper::Print(outs.str(), commGrid->GetWorld()); // commgrid initialized
2597 }
2598
2599 LIT stages = MEM_EFFICIENT_STAGES; // to lower memory consumption, form sparse matrix in stages
2600
2601 // even if local indices (LIT) are 32-bits, we should work with 64-bits for global info
2602 int64_t perstage = DEL.nedges / stages;
2603 LIT totrecv = 0;
2604 std::vector<LIT> alledges;
2605
2606 for(LIT s=0; s< stages; ++s)
2607 {
2608 int64_t n_befor = s*perstage;
2609 int64_t n_after= ((s==(stages-1))? DEL.nedges : ((s+1)*perstage));
2610
2611 // clear the source vertex by setting it to -1
2612 int realedges = 0; // these are "local" realedges
2613
2614 if(DEL.pedges)
2615 {
2616 for (int64_t i = n_befor; i < n_after; i++)
2617 {
2618 int64_t fr = get_v0_from_edge(&(DEL.pedges[i]));
2619 int64_t to = get_v1_from_edge(&(DEL.pedges[i]));
2620
2621 if(fr >= 0 && to >= 0) // otherwise skip
2622 {
2623 IT lrow, lcol;
2624 int owner = Owner(DEL.getGlobalV(), DEL.getGlobalV(), fr, to, lrow, lcol);
2625 data[owner].push_back(lrow); // row_id
2626 data[owner].push_back(lcol); // col_id
2627 ++realedges;
2628 }
2629 }
2630 }
2631 else
2632 {
2633 for (int64_t i = n_befor; i < n_after; i++)
2634 {
2635 if(DEL.edges[2*i+0] >= 0 && DEL.edges[2*i+1] >= 0) // otherwise skip
2636 {
2637 IT lrow, lcol;
2638 int owner = Owner(DEL.getGlobalV(), DEL.getGlobalV(), DEL.edges[2*i+0], DEL.edges[2*i+1], lrow, lcol);
2639 data[owner].push_back(lrow);
2640 data[owner].push_back(lcol);
2641 ++realedges;
2642 }
2643 }
2644 }
2645
2646 LIT * sendbuf = new LIT[2*realedges];
2647 int * sendcnt = new int[nprocs];
2648 int * sdispls = new int[nprocs];
2649 for(int i=0; i<nprocs; ++i)
2650 sendcnt[i] = data[i].size();
2651
2652 int * rdispls = new int[nprocs];
2653 int * recvcnt = new int[nprocs];
2654 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT,commGrid->GetWorld()); // share the counts
2655
2656 sdispls[0] = 0;
2657 rdispls[0] = 0;
2658 for(int i=0; i<nprocs-1; ++i)
2659 {
2660 sdispls[i+1] = sdispls[i] + sendcnt[i];
2661 rdispls[i+1] = rdispls[i] + recvcnt[i];
2662 }
2663 for(int i=0; i<nprocs; ++i)
2664 std::copy(data[i].begin(), data[i].end(), sendbuf+sdispls[i]);
2665
2666 // clear memory
2667 for(int i=0; i<nprocs; ++i)
2668 std::vector<LIT>().swap(data[i]);
2669
2670 // ABAB: Total number of edges received might not be LIT-addressible
2671 // However, each edge_id is LIT-addressible
2672 IT thisrecv = std::accumulate(recvcnt,recvcnt+nprocs, static_cast<IT>(0)); // thisrecv = 2*locedges
2673 LIT * recvbuf = new LIT[thisrecv];
2674 totrecv += thisrecv;
2675
2676 MPI_Alltoallv(sendbuf, sendcnt, sdispls, MPIType<LIT>(), recvbuf, recvcnt, rdispls, MPIType<LIT>(), commGrid->GetWorld());
2677 DeleteAll(sendcnt, recvcnt, sdispls, rdispls,sendbuf);
2678 std::copy (recvbuf,recvbuf+thisrecv,std::back_inserter(alledges)); // copy to all edges
2679 delete [] recvbuf;
2680 }
2681
2682 int myprocrow = commGrid->GetRankInProcCol();
2683 int myproccol = commGrid->GetRankInProcRow();
2684 LIT locrows, loccols;
2685 if(myprocrow != gridrows-1) locrows = m_perproc;
2686 else locrows = DEL.getGlobalV() - myprocrow * m_perproc;
2687 if(myproccol != gridcols-1) loccols = n_perproc;
2688 else loccols = DEL.getGlobalV() - myproccol * n_perproc;
2689
2690 SpTuples<LIT,NT> A(totrecv/2, locrows, loccols, alledges, removeloops); // alledges is empty upon return
2691 spSeq = new DER(A,false); // Convert SpTuples to DER
2692 }
2693
2694 template <class IT, class NT, class DER>
2695 IT SpParMat<IT,NT,DER>::RemoveLoops()
2696 {
2697 MPI_Comm DiagWorld = commGrid->GetDiagWorld();
2698 IT totrem;
2699 IT removed = 0;
2700 if(DiagWorld != MPI_COMM_NULL) // Diagonal processors only
2701 {
2702 SpTuples<IT,NT> tuples(*spSeq);
2703 delete spSeq;
2704 removed = tuples.RemoveLoops();
2705 spSeq = new DER(tuples, false); // Convert to DER
2706 }
2707 MPI_Allreduce( &removed, & totrem, 1, MPIType<IT>(), MPI_SUM, commGrid->GetWorld());
2708 return totrem;
2709 }
2710
2711
2712
2713 template <class IT, class NT, class DER>
AddLoops(NT loopval,bool replaceExisting)2714 void SpParMat<IT,NT,DER>::AddLoops(NT loopval, bool replaceExisting)
2715 {
2716 MPI_Comm DiagWorld = commGrid->GetDiagWorld();
2717 if(DiagWorld != MPI_COMM_NULL) // Diagonal processors only
2718 {
2719 typedef typename DER::LocalIT LIT;
2720 SpTuples<LIT,NT> tuples(*spSeq);
2721 delete spSeq;
2722 tuples.AddLoops(loopval, replaceExisting);
2723 tuples.SortColBased();
2724 spSeq = new DER(tuples, false); // Convert to DER
2725 }
2726 }
2727
2728
2729 // Different values on the diagonal
2730 template <class IT, class NT, class DER>
AddLoops(FullyDistVec<IT,NT> loopvals,bool replaceExisting)2731 void SpParMat<IT,NT,DER>::AddLoops(FullyDistVec<IT,NT> loopvals, bool replaceExisting)
2732 {
2733
2734
2735 if(*loopvals.commGrid != *commGrid)
2736 {
2737 SpParHelper::Print("Grids are not comparable, SpParMat::AddLoops() fails!\n", commGrid->GetWorld());
2738 MPI_Abort(MPI_COMM_WORLD,GRIDMISMATCH);
2739 }
2740 if (getncol()!= loopvals.TotalLength())
2741 {
2742 SpParHelper::Print("The number of entries in loopvals is not equal to the number of diagonal entries.\n");
2743 MPI_Abort(MPI_COMM_WORLD,DIMMISMATCH);
2744 }
2745
2746 // Gather data on the diagonal processor
2747 IT locsize = loopvals.LocArrSize();
2748 int rowProcs = commGrid->GetGridCols();
2749 std::vector<int> recvcnt(rowProcs, 0);
2750 std::vector<int> rdpls(rowProcs, 0);
2751 MPI_Gather(&locsize, 1, MPI_INT, recvcnt.data(), 1, MPI_INT, commGrid->GetDiagOfProcRow(), commGrid->GetRowWorld());
2752 std::partial_sum(recvcnt.data(), recvcnt.data()+rowProcs-1, rdpls.data()+1);
2753
2754 IT totrecv = rdpls[rowProcs-1] + recvcnt[rowProcs-1];
2755 assert((totrecv < std::numeric_limits<int>::max()));
2756
2757 std::vector<NT> rowvals(totrecv);
2758 MPI_Gatherv(loopvals.arr.data(), locsize, MPIType<NT>(), rowvals.data(), recvcnt.data(), rdpls.data(),
2759 MPIType<NT>(), commGrid->GetDiagOfProcRow(), commGrid->GetRowWorld());
2760
2761
2762 MPI_Comm DiagWorld = commGrid->GetDiagWorld();
2763 if(DiagWorld != MPI_COMM_NULL) // Diagonal processors only
2764 {
2765 typedef typename DER::LocalIT LIT;
2766 SpTuples<LIT,NT> tuples(*spSeq);
2767 delete spSeq;
2768 tuples.AddLoops(rowvals, replaceExisting);
2769 tuples.SortColBased();
2770 spSeq = new DER(tuples, false); // Convert to DER
2771 }
2772 }
2773
2774
2775 //! Pre-allocates buffers for row communication
2776 //! additionally (if GATHERVOPT is defined, incomplete as of March 2016):
2777 //! - Splits the local column indices to sparse & dense pieces to avoid redundant AllGather (sparse pieces get p2p)
2778 template <class IT, class NT, class DER>
2779 template <typename LIT, typename OT>
OptimizeForGraph500(OptBuf<LIT,OT> & optbuf)2780 void SpParMat<IT,NT,DER>::OptimizeForGraph500(OptBuf<LIT,OT> & optbuf)
2781 {
2782 if(spSeq->getnsplit() > 0)
2783 {
2784 SpParHelper::Print("Can not declare preallocated buffers for multithreaded execution\n", commGrid->GetWorld());
2785 return;
2786 }
2787
2788 typedef typename DER::LocalIT LocIT; // ABAB: should match the type of LIT. Check?
2789
2790 // Set up communication buffers, one for all
2791 LocIT mA = spSeq->getnrow();
2792 LocIT nA = spSeq->getncol();
2793
2794 int p_c = commGrid->GetGridCols();
2795 int p_r = commGrid->GetGridRows();
2796
2797 LocIT rwperproc = mA / p_c; // per processors in row-wise communication
2798 LocIT cwperproc = nA / p_r; // per processors in column-wise communication
2799
2800 #ifdef GATHERVOPT
2801 LocIT * colinds = seq->GetDCSC()->jc; // local nonzero column id's
2802 LocIT locnzc = seq->getnzc();
2803 LocIT cci = 0; // index to column id's array (cci: current column index)
2804 int * gsizes = NULL;
2805 IT * ents = NULL;
2806 IT * dpls = NULL;
2807 std::vector<LocIT> pack2send;
2808
2809 FullyDistSpVec<IT,IT> dummyRHS ( commGrid, getncol()); // dummy RHS vector to estimate index start position
2810 IT recveclen;
2811
2812 for(int pid = 1; pid <= p_r; pid++)
2813 {
2814 IT diagoffset;
2815 MPI_Status status;
2816 IT offset = dummyRHS.RowLenUntil(pid-1);
2817 int diagneigh = commGrid->GetComplementRank();
2818 MPI_Sendrecv(&offset, 1, MPIType<IT>(), diagneigh, TRTAGNZ, &diagoffset, 1, MPIType<IT>(), diagneigh, TRTAGNZ, commGrid->GetWorld(), &status);
2819
2820 LocIT endind = (pid == p_r)? nA : static_cast<LocIT>(pid) * cwperproc; // the last one might have a larger share (is this fitting to the vector boundaries?)
2821 while(cci < locnzc && colinds[cci] < endind)
2822 {
2823 pack2send.push_back(colinds[cci++]-diagoffset);
2824 }
2825 if(pid-1 == myrank) gsizes = new int[p_r];
2826 MPI_Gather(&mysize, 1, MPI_INT, gsizes, 1, MPI_INT, pid-1, commGrid->GetColWorld());
2827 if(pid-1 == myrank)
2828 {
2829 IT colcnt = std::accumulate(gsizes, gsizes+p_r, static_cast<IT>(0));
2830 recvbuf = new IT[colcnt];
2831 dpls = new IT[p_r](); // displacements (zero initialized pid)
2832 std::partial_sum(gsizes, gsizes+p_r-1, dpls+1);
2833 }
2834
2835 // int MPI_Gatherv (void* sbuf, int scount, MPI_Datatype stype, void* rbuf, int *rcount, int* displs, MPI_Datatype rtype, int root, MPI_Comm comm)
2836 MPI_Gatherv(SpHelper::p2a(pack2send), mysize, MPIType<LocIT>(), recvbuf, gsizes, dpls, MPIType<LocIT>(), pid-1, commGrid->GetColWorld());
2837 std::vector<LocIT>().swap(pack2send);
2838
2839 if(pid-1 == myrank)
2840 {
2841 recveclen = dummyRHS.MyLocLength();
2842 std::vector< std::vector<LocIT> > service(recveclen);
2843 for(int i=0; i< p_r; ++i)
2844 {
2845 for(int j=0; j< gsizes[i]; ++j)
2846 {
2847 IT colid2update = recvbuf[dpls[i]+j];
2848 if(service[colid2update].size() < GATHERVNEIGHLIMIT)
2849 {
2850 service.push_back(i);
2851 }
2852 // else don't increase any further and mark it done after the iterations are complete
2853 }
2854 }
2855 }
2856 }
2857 #endif
2858
2859
2860 std::vector<bool> isthere(mA, false); // perhaps the only appropriate use of this crippled data structure
2861 std::vector<int> maxlens(p_c,0); // maximum data size to be sent to any neighbor along the processor row
2862
2863 for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit)
2864 {
2865 for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit)
2866 {
2867 LocIT rowid = nzit.rowid();
2868 if(!isthere[rowid])
2869 {
2870 LocIT owner = std::min(nzit.rowid() / rwperproc, (LocIT) p_c-1);
2871 maxlens[owner]++;
2872 isthere[rowid] = true;
2873 }
2874 }
2875 }
2876 SpParHelper::Print("Optimization buffers set\n", commGrid->GetWorld());
2877 optbuf.Set(maxlens,mA);
2878 }
2879
2880 template <class IT, class NT, class DER>
ActivateThreading(int numsplits)2881 void SpParMat<IT,NT,DER>::ActivateThreading(int numsplits)
2882 {
2883 spSeq->RowSplit(numsplits);
2884 }
2885
2886
2887 /**
2888 * Parallel routine that returns A*A on the semiring SR
2889 * Uses only MPI-1 features (relies on simple blocking broadcast)
2890 **/
2891 template <class IT, class NT, class DER>
2892 template <typename SR>
Square()2893 void SpParMat<IT,NT,DER>::Square ()
2894 {
2895 int stages, dummy; // last two parameters of productgrid are ignored for synchronous multiplication
2896 std::shared_ptr<CommGrid> Grid = ProductGrid(commGrid.get(), commGrid.get(), stages, dummy, dummy);
2897
2898 typedef typename DER::LocalIT LIT;
2899
2900 LIT AA_m = spSeq->getnrow();
2901 LIT AA_n = spSeq->getncol();
2902
2903 DER seqTrn = spSeq->TransposeConst(); // will be automatically discarded after going out of scope
2904
2905 MPI_Barrier(commGrid->GetWorld());
2906
2907 LIT ** NRecvSizes = SpHelper::allocate2D<LIT>(DER::esscount, stages);
2908 LIT ** TRecvSizes = SpHelper::allocate2D<LIT>(DER::esscount, stages);
2909
2910 SpParHelper::GetSetSizes( *spSeq, NRecvSizes, commGrid->GetRowWorld());
2911 SpParHelper::GetSetSizes( seqTrn, TRecvSizes, commGrid->GetColWorld());
2912
2913 // Remotely fetched matrices are stored as pointers
2914 DER * NRecv;
2915 DER * TRecv;
2916 std::vector< SpTuples<LIT,NT> *> tomerge;
2917
2918 int Nself = commGrid->GetRankInProcRow();
2919 int Tself = commGrid->GetRankInProcCol();
2920
2921 for(int i = 0; i < stages; ++i)
2922 {
2923 std::vector<LIT> ess;
2924 if(i == Nself) NRecv = spSeq; // shallow-copy
2925 else
2926 {
2927 ess.resize(DER::esscount);
2928 for(int j=0; j< DER::esscount; ++j)
2929 ess[j] = NRecvSizes[j][i]; // essentials of the ith matrix in this row
2930 NRecv = new DER(); // first, create the object
2931 }
2932
2933 SpParHelper::BCastMatrix(Grid->GetRowWorld(), *NRecv, ess, i); // then, broadcast its elements
2934 ess.clear();
2935
2936 if(i == Tself) TRecv = &seqTrn; // shallow-copy
2937 else
2938 {
2939 ess.resize(DER::esscount);
2940 for(int j=0; j< DER::esscount; ++j)
2941 ess[j] = TRecvSizes[j][i];
2942 TRecv = new DER();
2943 }
2944 SpParHelper::BCastMatrix(Grid->GetColWorld(), *TRecv, ess, i);
2945
2946 SpTuples<LIT,NT> * AA_cont = MultiplyReturnTuples<SR, NT>(*NRecv, *TRecv, false, true);
2947 if(!AA_cont->isZero())
2948 tomerge.push_back(AA_cont);
2949
2950 if(i != Nself) delete NRecv;
2951 if(i != Tself) delete TRecv;
2952 }
2953
2954 SpHelper::deallocate2D(NRecvSizes, DER::esscount);
2955 SpHelper::deallocate2D(TRecvSizes, DER::esscount);
2956
2957 delete spSeq;
2958 spSeq = new DER(MergeAll<SR>(tomerge, AA_m, AA_n), false); // First get the result in SpTuples, then convert to UDER
2959 for(unsigned int i=0; i<tomerge.size(); ++i)
2960 delete tomerge[i];
2961 }
2962
2963
2964 template <class IT, class NT, class DER>
Transpose()2965 void SpParMat<IT,NT,DER>::Transpose()
2966 {
2967 if(commGrid->myproccol == commGrid->myprocrow) // Diagonal
2968 {
2969 spSeq->Transpose();
2970 }
2971 else
2972 {
2973 typedef typename DER::LocalIT LIT;
2974 SpTuples<LIT,NT> Atuples(*spSeq);
2975 LIT locnnz = Atuples.getnnz();
2976 LIT * rows = new LIT[locnnz];
2977 LIT * cols = new LIT[locnnz];
2978 NT * vals = new NT[locnnz];
2979 for(LIT i=0; i < locnnz; ++i)
2980 {
2981 rows[i] = Atuples.colindex(i); // swap (i,j) here
2982 cols[i] = Atuples.rowindex(i);
2983 vals[i] = Atuples.numvalue(i);
2984 }
2985 LIT locm = getlocalcols();
2986 LIT locn = getlocalrows();
2987 delete spSeq;
2988
2989 LIT remotem, remoten, remotennz;
2990 std::swap(locm,locn);
2991 int diagneigh = commGrid->GetComplementRank();
2992
2993 MPI_Status status;
2994 MPI_Sendrecv(&locnnz, 1, MPIType<LIT>(), diagneigh, TRTAGNZ, &remotennz, 1, MPIType<LIT>(), diagneigh, TRTAGNZ, commGrid->GetWorld(), &status);
2995 MPI_Sendrecv(&locn, 1, MPIType<LIT>(), diagneigh, TRTAGM, &remotem, 1, MPIType<LIT>(), diagneigh, TRTAGM, commGrid->GetWorld(), &status);
2996 MPI_Sendrecv(&locm, 1, MPIType<LIT>(), diagneigh, TRTAGN, &remoten, 1, MPIType<LIT>(), diagneigh, TRTAGN, commGrid->GetWorld(), &status);
2997
2998 LIT * rowsrecv = new LIT[remotennz];
2999 MPI_Sendrecv(rows, locnnz, MPIType<LIT>(), diagneigh, TRTAGROWS, rowsrecv, remotennz, MPIType<LIT>(), diagneigh, TRTAGROWS, commGrid->GetWorld(), &status);
3000 delete [] rows;
3001
3002 LIT * colsrecv = new LIT[remotennz];
3003 MPI_Sendrecv(cols, locnnz, MPIType<LIT>(), diagneigh, TRTAGCOLS, colsrecv, remotennz, MPIType<LIT>(), diagneigh, TRTAGCOLS, commGrid->GetWorld(), &status);
3004 delete [] cols;
3005
3006 NT * valsrecv = new NT[remotennz];
3007 MPI_Sendrecv(vals, locnnz, MPIType<NT>(), diagneigh, TRTAGVALS, valsrecv, remotennz, MPIType<NT>(), diagneigh, TRTAGVALS, commGrid->GetWorld(), &status);
3008 delete [] vals;
3009
3010 std::tuple<LIT,LIT,NT> * arrtuples = new std::tuple<LIT,LIT,NT>[remotennz];
3011 for(LIT i=0; i< remotennz; ++i)
3012 {
3013 arrtuples[i] = std::make_tuple(rowsrecv[i], colsrecv[i], valsrecv[i]);
3014 }
3015 DeleteAll(rowsrecv, colsrecv, valsrecv);
3016 ColLexiCompare<LIT,NT> collexicogcmp;
3017 sort(arrtuples , arrtuples+remotennz, collexicogcmp ); // sort w.r.t columns here
3018
3019 spSeq = new DER();
3020 spSeq->Create( remotennz, remotem, remoten, arrtuples); // the deletion of arrtuples[] is handled by SpMat::Create
3021 }
3022 }
3023
3024
3025 template <class IT, class NT, class DER>
3026 template <class HANDLER>
SaveGathered(std::string filename,HANDLER handler,bool transpose) const3027 void SpParMat< IT,NT,DER >::SaveGathered(std::string filename, HANDLER handler, bool transpose) const
3028 {
3029 int proccols = commGrid->GetGridCols();
3030 int procrows = commGrid->GetGridRows();
3031 IT totalm = getnrow();
3032 IT totaln = getncol();
3033 IT totnnz = getnnz();
3034 int flinelen = 0;
3035 std::ofstream out;
3036 if(commGrid->GetRank() == 0)
3037 {
3038 std::string s;
3039 std::stringstream strm;
3040 strm << "%%MatrixMarket matrix coordinate real general" << std::endl;
3041 strm << totalm << " " << totaln << " " << totnnz << std::endl;
3042 s = strm.str();
3043 out.open(filename.c_str(),std::ios_base::trunc);
3044 flinelen = s.length();
3045 out.write(s.c_str(), flinelen);
3046 out.close();
3047 }
3048 int colrank = commGrid->GetRankInProcCol();
3049 int colneighs = commGrid->GetGridRows();
3050 IT * locnrows = new IT[colneighs]; // number of rows is calculated by a reduction among the processor column
3051 locnrows[colrank] = (IT) getlocalrows();
3052 MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(),locnrows, 1, MPIType<IT>(), commGrid->GetColWorld());
3053 IT roffset = std::accumulate(locnrows, locnrows+colrank, 0);
3054 delete [] locnrows;
3055
3056 MPI_Datatype datatype;
3057 MPI_Type_contiguous(sizeof(std::pair<IT,NT>), MPI_CHAR, &datatype);
3058 MPI_Type_commit(&datatype);
3059
3060 for(int i = 0; i < procrows; i++) // for all processor row (in order)
3061 {
3062 if(commGrid->GetRankInProcCol() == i) // only the ith processor row
3063 {
3064 IT localrows = spSeq->getnrow(); // same along the processor row
3065 std::vector< std::vector< std::pair<IT,NT> > > csr(localrows);
3066 if(commGrid->GetRankInProcRow() == 0) // get the head of processor row
3067 {
3068 IT localcols = spSeq->getncol(); // might be different on the last processor on this processor row
3069 MPI_Bcast(&localcols, 1, MPIType<IT>(), 0, commGrid->GetRowWorld());
3070 for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit) // iterate over nonempty subcolumns
3071 {
3072 for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit)
3073 {
3074 csr[nzit.rowid()].push_back( std::make_pair(colit.colid(), nzit.value()) );
3075 }
3076 }
3077 }
3078 else // get the rest of the processors
3079 {
3080 IT n_perproc;
3081 MPI_Bcast(&n_perproc, 1, MPIType<IT>(), 0, commGrid->GetRowWorld());
3082 IT noffset = commGrid->GetRankInProcRow() * n_perproc;
3083 for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit) // iterate over nonempty subcolumns
3084 {
3085 for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit)
3086 {
3087 csr[nzit.rowid()].push_back( std::make_pair(colit.colid() + noffset, nzit.value()) );
3088 }
3089 }
3090 }
3091 std::pair<IT,NT> * ents = NULL;
3092 int * gsizes = NULL, * dpls = NULL;
3093 if(commGrid->GetRankInProcRow() == 0) // only the head of processor row
3094 {
3095 out.open(filename.c_str(),std::ios_base::app);
3096 gsizes = new int[proccols];
3097 dpls = new int[proccols](); // displacements (zero initialized pid)
3098 }
3099 for(int j = 0; j < localrows; ++j)
3100 {
3101 IT rowcnt = 0;
3102 sort(csr[j].begin(), csr[j].end());
3103 int mysize = csr[j].size();
3104 MPI_Gather(&mysize, 1, MPI_INT, gsizes, 1, MPI_INT, 0, commGrid->GetRowWorld());
3105 if(commGrid->GetRankInProcRow() == 0)
3106 {
3107 rowcnt = std::accumulate(gsizes, gsizes+proccols, static_cast<IT>(0));
3108 std::partial_sum(gsizes, gsizes+proccols-1, dpls+1);
3109 ents = new std::pair<IT,NT>[rowcnt]; // nonzero entries in the j'th local row
3110 }
3111
3112 // int MPI_Gatherv (void* sbuf, int scount, MPI_Datatype stype,
3113 // void* rbuf, int *rcount, int* displs, MPI_Datatype rtype, int root, MPI_Comm comm)
3114 MPI_Gatherv(SpHelper::p2a(csr[j]), mysize, datatype, ents, gsizes, dpls, datatype, 0, commGrid->GetRowWorld());
3115 if(commGrid->GetRankInProcRow() == 0)
3116 {
3117 for(int k=0; k< rowcnt; ++k)
3118 {
3119 //out << j + roffset + 1 << "\t" << ents[k].first + 1 <<"\t" << ents[k].second << endl;
3120 if (!transpose)
3121 // regular
3122 out << j + roffset + 1 << "\t" << ents[k].first + 1 << "\t";
3123 else
3124 // transpose row/column
3125 out << ents[k].first + 1 << "\t" << j + roffset + 1 << "\t";
3126 handler.save(out, ents[k].second, j + roffset, ents[k].first);
3127 out << std::endl;
3128 }
3129 delete [] ents;
3130 }
3131 }
3132 if(commGrid->GetRankInProcRow() == 0)
3133 {
3134 DeleteAll(gsizes, dpls);
3135 out.close();
3136 }
3137 } // end_if the ith processor row
3138 MPI_Barrier(commGrid->GetWorld()); // signal the end of ith processor row iteration (so that all processors block)
3139 }
3140 }
3141
3142
3143 //! Private subroutine of ReadGeneralizedTuples
3144 //! totallength is the length of the dictionary, which we don't know in this labeled tuples format apriori
3145 template <class IT, class NT, class DER>
TupleRead1stPassNExchange(const std::string & filename,TYPE2SEND * & senddata,IT & totsend,FullyDistVec<IT,STRASARRAY> & distmapper,uint64_t & totallength)3146 MPI_File SpParMat< IT,NT,DER >::TupleRead1stPassNExchange (const std::string & filename, TYPE2SEND * & senddata, IT & totsend,
3147 FullyDistVec<IT,STRASARRAY> & distmapper, uint64_t & totallength)
3148 {
3149 int myrank = commGrid->GetRank();
3150 int nprocs = commGrid->GetSize();
3151
3152 MPI_Offset fpos, end_fpos;
3153 struct stat st; // get file size
3154 if (stat(filename.c_str(), &st) == -1)
3155 {
3156 MPI_Abort(MPI_COMM_WORLD, NOFILE);
3157 }
3158 int64_t file_size = st.st_size;
3159 if(myrank == 0) // the offset needs to be for this rank
3160 {
3161 std::cout << "File is " << file_size << " bytes" << std::endl;
3162 }
3163 fpos = myrank * file_size / nprocs;
3164
3165 if(myrank != (nprocs-1)) end_fpos = (myrank + 1) * file_size / nprocs;
3166 else end_fpos = file_size;
3167
3168 MPI_File mpi_fh;
3169 MPI_File_open (commGrid->commWorld, const_cast<char*>(filename.c_str()), MPI_MODE_RDONLY, MPI_INFO_NULL, &mpi_fh);
3170
3171 typedef std::map<std::string, uint64_t> KEYMAP; // due to potential (but extremely unlikely) collusions in MurmurHash, make the key to the std:map the string itself
3172 std::vector< KEYMAP > allkeys(nprocs); // map keeps the outgoing data unique, we could have applied this to HipMer too
3173
3174 std::vector<std::string> lines;
3175 bool finished = SpParHelper::FetchBatch(mpi_fh, fpos, end_fpos, true, lines, myrank);
3176 int64_t entriesread = lines.size();
3177 SpHelper::ProcessLinesWithStringKeys(allkeys, lines,nprocs);
3178
3179 while(!finished)
3180 {
3181 finished = SpParHelper::FetchBatch(mpi_fh, fpos, end_fpos, false, lines, myrank);
3182
3183 entriesread += lines.size();
3184 SpHelper::ProcessLinesWithStringKeys(allkeys, lines,nprocs);
3185 }
3186 int64_t allentriesread;
3187 MPI_Reduce(&entriesread, &allentriesread, 1, MPIType<int64_t>(), MPI_SUM, 0, commGrid->commWorld);
3188 #ifdef COMBBLAS_DEBUG
3189 if(myrank == 0)
3190 std::cout << "Initial reading finished. Total number of entries read across all processors is " << allentriesread << std::endl;
3191 #endif
3192
3193 int * sendcnt = new int[nprocs];
3194 int * recvcnt = new int[nprocs];
3195 for(int i=0; i<nprocs; ++i)
3196 sendcnt[i] = allkeys[i].size();
3197
3198 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, commGrid->GetWorld()); // share the counts
3199 int * sdispls = new int[nprocs]();
3200 int * rdispls = new int[nprocs]();
3201 std::partial_sum(sendcnt, sendcnt+nprocs-1, sdispls+1);
3202 std::partial_sum(recvcnt, recvcnt+nprocs-1, rdispls+1);
3203 totsend = std::accumulate(sendcnt,sendcnt+nprocs, static_cast<IT>(0));
3204 IT totrecv = std::accumulate(recvcnt,recvcnt+nprocs, static_cast<IT>(0));
3205
3206 assert((totsend < std::numeric_limits<int>::max()));
3207 assert((totrecv < std::numeric_limits<int>::max()));
3208
3209 // The following are declared in SpParMat.h
3210 // typedef std::array<char, MAXVERTNAME> STRASARRAY;
3211 // typedef std::pair< STRASARRAY, uint64_t> TYPE2SEND;
3212 senddata = new TYPE2SEND[totsend];
3213
3214 #pragma omp parallel for
3215 for(int i=0; i<nprocs; ++i)
3216 {
3217 size_t j = 0;
3218 for(auto pobj:allkeys[i])
3219 {
3220 // The naked C-style array type is not copyable or assignable, but pair will require it, hence used std::array
3221 std::array<char, MAXVERTNAME> vname;
3222 std::copy( pobj.first.begin(), pobj.first.end(), vname.begin() );
3223 if(pobj.first.length() < MAXVERTNAME) vname[pobj.first.length()] = '\0'; // null termination
3224
3225 senddata[sdispls[i]+j] = TYPE2SEND(vname, pobj.second);
3226 j++;
3227 }
3228 }
3229 allkeys.clear(); // allkeys is no longer needed after this point
3230
3231 MPI_Datatype MPI_HASH;
3232 MPI_Type_contiguous(sizeof(TYPE2SEND), MPI_CHAR, &MPI_HASH);
3233 MPI_Type_commit(&MPI_HASH);
3234
3235 TYPE2SEND * recvdata = new TYPE2SEND[totrecv];
3236 MPI_Alltoallv(senddata, sendcnt, sdispls, MPI_HASH, recvdata, recvcnt, rdispls, MPI_HASH, commGrid->GetWorld());
3237 // do not delete send buffers yet as we will use them to recv back the data
3238
3239 std::set< std::pair<uint64_t, std::string> > uniqsorted;
3240 for(IT i=0; i< totrecv; ++i)
3241 {
3242 auto locnull = std::find(recvdata[i].first.begin(), recvdata[i].first.end(), '\0'); // find the null character (or string::end)
3243 std::string strtmp(recvdata[i].first.begin(), locnull); // range constructor
3244
3245 uniqsorted.insert(std::make_pair(recvdata[i].second, strtmp));
3246 }
3247 uint64_t uniqsize = uniqsorted.size();
3248
3249 #ifdef COMBBLAS_DEBUG
3250 if(myrank == 0)
3251 std::cout << "out of " << totrecv << " vertices received, " << uniqsize << " were unique" << std::endl;
3252 #endif
3253 uint64_t sizeuntil = 0;
3254 totallength = 0;
3255 MPI_Exscan( &uniqsize, &sizeuntil, 1, MPIType<uint64_t>(), MPI_SUM, commGrid->GetWorld() );
3256 MPI_Allreduce(&uniqsize, &totallength, 1, MPIType<uint64_t>(), MPI_SUM, commGrid->GetWorld());
3257 if(myrank == 0) sizeuntil = 0; // because MPI_Exscan says the recvbuf in process 0 is undefined
3258
3259 distmapper = FullyDistVec<IT,STRASARRAY>(commGrid, totallength,STRASARRAY{});
3260
3261 // invindex does not conform to FullyDistVec boundaries, otherwise its contents are essentially the same as distmapper
3262 KEYMAP invindex; // KEYMAP is map<string, uint64_t>.
3263 uint64_t locindex = 0;
3264 std::vector< std::vector< IT > > locs_send(nprocs);
3265 std::vector< std::vector< std::string > > data_send(nprocs);
3266 int * map_scnt = new int[nprocs](); // send counts for this map only (to no confuse with the other sendcnt)
3267 for(auto itr = uniqsorted.begin(); itr != uniqsorted.end(); ++itr)
3268 {
3269 uint64_t globalindex = sizeuntil + locindex;
3270 invindex.insert(std::make_pair(itr->second, globalindex));
3271
3272 IT newlocid;
3273 int owner = distmapper.Owner(globalindex, newlocid);
3274
3275 //if(myrank == 0)
3276 // std::cout << "invindex received " << itr->second << " with global index " << globalindex << " to be owned by " << owner << " with index " << newlocid << std::endl;
3277
3278 locs_send[owner].push_back(newlocid);
3279 data_send[owner].push_back(itr->second);
3280 map_scnt[owner]++;
3281 locindex++;
3282 }
3283 uniqsorted.clear(); // clear memory
3284
3285
3286 /* BEGIN: Redistributing the permutation vector to fit the FullyDistVec semantics */
3287 SpParHelper::ReDistributeToVector(map_scnt, locs_send, data_send, distmapper.arr, commGrid->GetWorld()); // map_scnt is deleted here
3288 /* END: Redistributing the permutation vector to fit the FullyDistVec semantics */
3289
3290 for(IT i=0; i< totrecv; ++i)
3291 {
3292 auto locnull = std::find(recvdata[i].first.begin(), recvdata[i].first.end(), '\0');
3293 std::string searchstr(recvdata[i].first.begin(), locnull); // range constructor
3294
3295 auto resp = invindex.find(searchstr); // recvdata[i] is of type pair< STRASARRAY, uint64_t>
3296 if (resp != invindex.end())
3297 {
3298 recvdata[i].second = resp->second; // now instead of random numbers, recvdata's second entry will be its new index
3299 }
3300 else
3301 std::cout << "Assertion failed at proc " << myrank << ": the absence of the entry in invindex is unexpected!!!" << std::endl;
3302 }
3303 MPI_Alltoallv(recvdata, recvcnt, rdispls, MPI_HASH, senddata, sendcnt, sdispls, MPI_HASH, commGrid->GetWorld());
3304 DeleteAll(recvdata, sendcnt, recvcnt, sdispls, rdispls);
3305 MPI_Type_free(&MPI_HASH);
3306
3307 // the following gets deleted here: allkeys
3308 return mpi_fh;
3309 }
3310
3311
3312
3313 //! Handles all sorts of orderings as long as there are no duplicates
3314 //! Does not take matrix market banner (only tuples)
3315 //! Data can be load imbalanced and the vertex labels can be arbitrary strings
3316 //! Replaces ReadDistribute for imbalanced arbitrary input in tuples format
3317 template <class IT, class NT, class DER>
3318 template <typename _BinaryOperation>
ReadGeneralizedTuples(const std::string & filename,_BinaryOperation BinOp)3319 FullyDistVec<IT,std::array<char, MAXVERTNAME> > SpParMat< IT,NT,DER >::ReadGeneralizedTuples (const std::string & filename, _BinaryOperation BinOp)
3320 {
3321 int myrank = commGrid->GetRank();
3322 int nprocs = commGrid->GetSize();
3323 TYPE2SEND * senddata;
3324 IT totsend;
3325 uint64_t totallength;
3326 FullyDistVec<IT,STRASARRAY> distmapper(commGrid); // choice of array<char, MAXVERTNAME> over string = array is required to be a contiguous container and an aggregate
3327
3328 MPI_File mpi_fh = TupleRead1stPassNExchange(filename, senddata, totsend, distmapper, totallength);
3329
3330 typedef std::map<std::string, uint64_t> KEYMAP;
3331 KEYMAP ultimateperm; // the ultimate permutation
3332 for(IT i=0; i< totsend; ++i)
3333 {
3334 auto locnull = std::find(senddata[i].first.begin(), senddata[i].first.end(), '\0');
3335
3336 std::string searchstr(senddata[i].first.begin(), locnull);
3337 auto ret = ultimateperm.emplace(std::make_pair(searchstr, senddata[i].second));
3338 if(!ret.second) // the second is the boolean that tells success
3339 {
3340 // remember, we only sent unique vertex ids in the first place so we are expecting unique values in return
3341 std::cout << "the duplication in ultimateperm is unexpected!!!" << std::endl;
3342 }
3343 }
3344 delete [] senddata;
3345
3346 // rename the data now, first reset file pointers
3347 MPI_Offset fpos, end_fpos;
3348 struct stat st; // get file size
3349 if (stat(filename.c_str(), &st) == -1)
3350 {
3351 MPI_Abort(MPI_COMM_WORLD, NOFILE);
3352 }
3353 int64_t file_size = st.st_size;
3354
3355 fpos = myrank * file_size / nprocs;
3356
3357 if(myrank != (nprocs-1)) end_fpos = (myrank + 1) * file_size / nprocs;
3358 else end_fpos = file_size;
3359
3360 typedef typename DER::LocalIT LIT;
3361 std::vector<LIT> rows;
3362 std::vector<LIT> cols;
3363 std::vector<NT> vals;
3364
3365 std::vector<std::string> lines;
3366 bool finished = SpParHelper::FetchBatch(mpi_fh, fpos, end_fpos, true, lines, myrank);
3367 int64_t entriesread = lines.size();
3368
3369 SpHelper::ProcessStrLinesNPermute(rows, cols, vals, lines, ultimateperm);
3370
3371 while(!finished)
3372 {
3373 finished = SpParHelper::FetchBatch(mpi_fh, fpos, end_fpos, false, lines, myrank);
3374 entriesread += lines.size();
3375 SpHelper::ProcessStrLinesNPermute(rows, cols, vals, lines, ultimateperm);
3376 }
3377 int64_t allentriesread;
3378 MPI_Reduce(&entriesread, &allentriesread, 1, MPIType<int64_t>(), MPI_SUM, 0, commGrid->commWorld);
3379 #ifdef COMBBLAS_DEBUG
3380 if(myrank == 0)
3381 std::cout << "Second reading finished. Total number of entries read across all processors is " << allentriesread << std::endl;
3382 #endif
3383
3384 MPI_File_close(&mpi_fh);
3385 std::vector< std::vector < std::tuple<LIT,LIT,NT> > > data(nprocs);
3386
3387 LIT locsize = rows.size(); // remember: locsize != entriesread (unless the matrix is unsymmetric)
3388 for(LIT i=0; i<locsize; ++i)
3389 {
3390 LIT lrow, lcol;
3391 int owner = Owner(totallength, totallength, rows[i], cols[i], lrow, lcol);
3392 data[owner].push_back(std::make_tuple(lrow,lcol,vals[i]));
3393 }
3394 std::vector<LIT>().swap(rows);
3395 std::vector<LIT>().swap(cols);
3396 std::vector<NT>().swap(vals);
3397
3398 #ifdef COMBBLAS_DEBUG
3399 if(myrank == 0)
3400 std::cout << "Packing to recipients finished, about to send..." << std::endl;
3401 #endif
3402
3403 if(spSeq) delete spSeq;
3404 SparseCommon(data, locsize, totallength, totallength, BinOp);
3405 // PrintInfo();
3406 // distmapper.ParallelWrite("distmapper.mtx", 1, CharArraySaveHandler());
3407 return distmapper;
3408 }
3409
3410
3411
3412 //! Handles all sorts of orderings, even duplicates (what happens to them is determined by BinOp)
3413 //! Requires proper matrix market banner at the moment
3414 //! Replaces ReadDistribute for properly load balanced input in matrix market format
3415 template <class IT, class NT, class DER>
3416 template <typename _BinaryOperation>
ParallelReadMM(const std::string & filename,bool onebased,_BinaryOperation BinOp)3417 void SpParMat< IT,NT,DER >::ParallelReadMM (const std::string & filename, bool onebased, _BinaryOperation BinOp)
3418 {
3419 int32_t type = -1;
3420 int32_t symmetric = 0;
3421 int64_t nrows, ncols, nonzeros;
3422 int64_t linesread = 0;
3423
3424 FILE *f;
3425 int myrank = commGrid->GetRank();
3426 int nprocs = commGrid->GetSize();
3427 if(myrank == 0)
3428 {
3429 MM_typecode matcode;
3430 if ((f = fopen(filename.c_str(), "r")) == NULL)
3431 {
3432 printf("COMBBLAS: Matrix-market file %s can not be found\n", filename.c_str());
3433 MPI_Abort(MPI_COMM_WORLD, NOFILE);
3434 }
3435 if (mm_read_banner(f, &matcode) != 0)
3436 {
3437 printf("Could not process Matrix Market banner.\n");
3438 exit(1);
3439 }
3440 linesread++;
3441
3442 if (mm_is_complex(matcode))
3443 {
3444 printf("Sorry, this application does not support complext types");
3445 printf("Market Market type: [%s]\n", mm_typecode_to_str(matcode));
3446 }
3447 else if(mm_is_real(matcode))
3448 {
3449 std::cout << "Matrix is Float" << std::endl;
3450 type = 0;
3451 }
3452 else if(mm_is_integer(matcode))
3453 {
3454 std::cout << "Matrix is Integer" << std::endl;
3455 type = 1;
3456 }
3457 else if(mm_is_pattern(matcode))
3458 {
3459 std::cout << "Matrix is Boolean" << std::endl;
3460 type = 2;
3461 }
3462 if(mm_is_symmetric(matcode) || mm_is_hermitian(matcode))
3463 {
3464 std::cout << "Matrix is symmetric" << std::endl;
3465 symmetric = 1;
3466 }
3467 int ret_code;
3468 if ((ret_code = mm_read_mtx_crd_size(f, &nrows, &ncols, &nonzeros, &linesread)) !=0) // ABAB: mm_read_mtx_crd_size made 64-bit friendly
3469 exit(1);
3470
3471 std::cout << "Total number of nonzeros expected across all processors is " << nonzeros << std::endl;
3472
3473 }
3474 MPI_Bcast(&type, 1, MPI_INT, 0, commGrid->commWorld);
3475 MPI_Bcast(&symmetric, 1, MPI_INT, 0, commGrid->commWorld);
3476 MPI_Bcast(&nrows, 1, MPIType<int64_t>(), 0, commGrid->commWorld);
3477 MPI_Bcast(&ncols, 1, MPIType<int64_t>(), 0, commGrid->commWorld);
3478 MPI_Bcast(&nonzeros, 1, MPIType<int64_t>(), 0, commGrid->commWorld);
3479
3480 // Use fseek again to go backwards two bytes and check that byte with fgetc
3481 struct stat st; // get file size
3482 if (stat(filename.c_str(), &st) == -1)
3483 {
3484 MPI_Abort(MPI_COMM_WORLD, NOFILE);
3485 }
3486 int64_t file_size = st.st_size;
3487 MPI_Offset fpos, end_fpos, endofheader;
3488 if(commGrid->GetRank() == 0) // the offset needs to be for this rank
3489 {
3490 std::cout << "File is " << file_size << " bytes" << std::endl;
3491 fpos = ftell(f);
3492 endofheader = fpos;
3493 MPI_Bcast(&endofheader, 1, MPIType<MPI_Offset>(), 0, commGrid->commWorld);
3494 fclose(f);
3495 }
3496 else
3497 {
3498 MPI_Bcast(&endofheader, 1, MPIType<MPI_Offset>(), 0, commGrid->commWorld); // receive the file loc at the end of header
3499 fpos = endofheader + myrank * (file_size-endofheader) / nprocs;
3500 }
3501 if(myrank != (nprocs-1)) end_fpos = endofheader + (myrank + 1) * (file_size-endofheader) / nprocs;
3502 else end_fpos = file_size;
3503
3504 MPI_File mpi_fh;
3505 MPI_File_open (commGrid->commWorld, const_cast<char*>(filename.c_str()), MPI_MODE_RDONLY, MPI_INFO_NULL, &mpi_fh);
3506
3507
3508 typedef typename DER::LocalIT LIT;
3509 std::vector<LIT> rows;
3510 std::vector<LIT> cols;
3511 std::vector<NT> vals;
3512
3513 std::vector<std::string> lines;
3514 bool finished = SpParHelper::FetchBatch(mpi_fh, fpos, end_fpos, true, lines, myrank);
3515 int64_t entriesread = lines.size();
3516 SpHelper::ProcessLines(rows, cols, vals, lines, symmetric, type, onebased);
3517 MPI_Barrier(commGrid->commWorld);
3518
3519 while(!finished)
3520 {
3521 finished = SpParHelper::FetchBatch(mpi_fh, fpos, end_fpos, false, lines, myrank);
3522 entriesread += lines.size();
3523 SpHelper::ProcessLines(rows, cols, vals, lines, symmetric, type, onebased);
3524 }
3525 int64_t allentriesread;
3526 MPI_Reduce(&entriesread, &allentriesread, 1, MPIType<int64_t>(), MPI_SUM, 0, commGrid->commWorld);
3527 #ifdef COMBBLAS_DEBUG
3528 if(myrank == 0)
3529 std::cout << "Reading finished. Total number of entries read across all processors is " << allentriesread << std::endl;
3530 #endif
3531
3532 std::vector< std::vector < std::tuple<LIT,LIT,NT> > > data(nprocs);
3533
3534 LIT locsize = rows.size(); // remember: locsize != entriesread (unless the matrix is unsymmetric)
3535 for(LIT i=0; i<locsize; ++i)
3536 {
3537 LIT lrow, lcol;
3538 int owner = Owner(nrows, ncols, rows[i], cols[i], lrow, lcol);
3539 data[owner].push_back(std::make_tuple(lrow,lcol,vals[i]));
3540 }
3541 std::vector<LIT>().swap(rows);
3542 std::vector<LIT>().swap(cols);
3543 std::vector<NT>().swap(vals);
3544
3545 #ifdef COMBBLAS_DEBUG
3546 if(myrank == 0)
3547 std::cout << "Packing to recepients finished, about to send..." << std::endl;
3548 #endif
3549
3550 if(spSeq) delete spSeq;
3551 SparseCommon(data, locsize, nrows, ncols, BinOp);
3552 }
3553
3554
3555 //! Handles all sorts of orderings as long as there are no duplicates
3556 //! May perform better when the data is already reverse column-sorted (i.e. in decreasing order)
3557 //! if nonum is true, then numerics are not supplied and they are assumed to be all 1's
3558 template <class IT, class NT, class DER>
3559 template <class HANDLER>
ReadDistribute(const std::string & filename,int master,bool nonum,HANDLER handler,bool transpose,bool pario)3560 void SpParMat< IT,NT,DER >::ReadDistribute (const std::string & filename, int master, bool nonum, HANDLER handler, bool transpose, bool pario)
3561 {
3562 #ifdef TAU_PROFILE
3563 TAU_PROFILE_TIMER(rdtimer, "ReadDistribute", "void SpParMat::ReadDistribute (const string & , int, bool, HANDLER, bool)", TAU_DEFAULT);
3564 TAU_PROFILE_START(rdtimer);
3565 #endif
3566
3567 std::ifstream infile;
3568 FILE * binfile = NULL; // points to "past header" if the file is binary
3569 int seeklength = 0;
3570 HeaderInfo hfile;
3571 if(commGrid->GetRank() == master) // 1 processor
3572 {
3573 hfile = ParseHeader(filename, binfile, seeklength);
3574 }
3575 MPI_Bcast(&seeklength, 1, MPI_INT, master, commGrid->commWorld);
3576
3577 IT total_m, total_n, total_nnz;
3578 IT m_perproc = 0, n_perproc = 0;
3579
3580 int colneighs = commGrid->GetGridRows(); // number of neighbors along this processor column (including oneself)
3581 int rowneighs = commGrid->GetGridCols(); // number of neighbors along this processor row (including oneself)
3582
3583 IT buffpercolneigh = MEMORYINBYTES / (colneighs * (2 * sizeof(IT) + sizeof(NT)));
3584 IT buffperrowneigh = MEMORYINBYTES / (rowneighs * (2 * sizeof(IT) + sizeof(NT)));
3585 if(pario)
3586 {
3587 // since all colneighs will be reading the data at the same time
3588 // chances are they might all read the data that should go to one
3589 // in that case buffperrowneigh > colneighs * buffpercolneigh
3590 // in order not to overflow
3591 buffpercolneigh /= colneighs;
3592 if(seeklength == 0)
3593 SpParHelper::Print("COMBBLAS: Parallel I/O requested but binary header is corrupted\n", commGrid->GetWorld());
3594 }
3595
3596 // make sure that buffperrowneigh >= buffpercolneigh to cover for this patological case:
3597 // -- all data received by a given column head (by vertical communication) are headed to a single processor along the row
3598 // -- then making sure buffperrowneigh >= buffpercolneigh guarantees that the horizontal buffer will never overflow
3599 buffperrowneigh = std::max(buffperrowneigh, buffpercolneigh);
3600 if(std::max(buffpercolneigh * colneighs, buffperrowneigh * rowneighs) > std::numeric_limits<int>::max())
3601 {
3602 SpParHelper::Print("COMBBLAS: MPI doesn't support sending int64_t send/recv counts or displacements\n", commGrid->GetWorld());
3603 }
3604
3605 int * cdispls = new int[colneighs];
3606 for (IT i=0; i<colneighs; ++i) cdispls[i] = i*buffpercolneigh;
3607 int * rdispls = new int[rowneighs];
3608 for (IT i=0; i<rowneighs; ++i) rdispls[i] = i*buffperrowneigh;
3609
3610 int *ccurptrs = NULL, *rcurptrs = NULL;
3611 int recvcount = 0;
3612 IT * rows = NULL;
3613 IT * cols = NULL;
3614 NT * vals = NULL;
3615
3616 // Note: all other column heads that initiate the horizontal communication has the same "rankinrow" with the master
3617 int rankincol = commGrid->GetRankInProcCol(master); // get master's rank in its processor column
3618 int rankinrow = commGrid->GetRankInProcRow(master);
3619 std::vector< std::tuple<IT, IT, NT> > localtuples;
3620
3621 if(commGrid->GetRank() == master) // 1 processor
3622 {
3623 if( !hfile.fileexists )
3624 {
3625 SpParHelper::Print( "COMBBLAS: Input file doesn't exist\n", commGrid->GetWorld());
3626 total_n = 0; total_m = 0;
3627 BcastEssentials(commGrid->commWorld, total_m, total_n, total_nnz, master);
3628 return;
3629 }
3630 if (hfile.headerexists && hfile.format == 1)
3631 {
3632 SpParHelper::Print("COMBBLAS: Ascii input with binary headers is not supported\n", commGrid->GetWorld());
3633 total_n = 0; total_m = 0;
3634 BcastEssentials(commGrid->commWorld, total_m, total_n, total_nnz, master);
3635 return;
3636 }
3637 if ( !hfile.headerexists ) // no header - ascii file (at this point, file exists)
3638 {
3639 infile.open(filename.c_str());
3640 char comment[256];
3641 infile.getline(comment,256);
3642 while(comment[0] == '%')
3643 {
3644 infile.getline(comment,256);
3645 }
3646 std::stringstream ss;
3647 ss << std::string(comment);
3648 ss >> total_m >> total_n >> total_nnz;
3649 if(pario)
3650 {
3651 SpParHelper::Print("COMBBLAS: Trying to read binary headerless file in parallel, aborting\n", commGrid->GetWorld());
3652 total_n = 0; total_m = 0;
3653 BcastEssentials(commGrid->commWorld, total_m, total_n, total_nnz, master);
3654 return;
3655 }
3656 }
3657 else // hfile.headerexists && hfile.format == 0
3658 {
3659 total_m = hfile.m;
3660 total_n = hfile.n;
3661 total_nnz = hfile.nnz;
3662 }
3663 m_perproc = total_m / colneighs;
3664 n_perproc = total_n / rowneighs;
3665 BcastEssentials(commGrid->commWorld, total_m, total_n, total_nnz, master);
3666 AllocateSetBuffers(rows, cols, vals, rcurptrs, ccurptrs, rowneighs, colneighs, buffpercolneigh);
3667
3668 if(seeklength > 0 && pario) // sqrt(p) processors also do parallel binary i/o
3669 {
3670 IT entriestoread = total_nnz / colneighs;
3671 #ifdef IODEBUG
3672 std::ofstream oput;
3673 commGrid->OpenDebugFile("Read", oput);
3674 oput << "Total nnz: " << total_nnz << " entries to read: " << entriestoread << std::endl;
3675 oput.close();
3676 #endif
3677 ReadAllMine(binfile, rows, cols, vals, localtuples, rcurptrs, ccurptrs, rdispls, cdispls, m_perproc, n_perproc,
3678 rowneighs, colneighs, buffperrowneigh, buffpercolneigh, entriestoread, handler, rankinrow, transpose);
3679 }
3680 else // only this (master) is doing I/O (text or binary)
3681 {
3682 IT temprow, tempcol;
3683 NT tempval;
3684 IT ntrow = 0, ntcol = 0; // not transposed row and column index
3685 char line[1024];
3686 bool nonumline = nonum;
3687 IT cnz = 0;
3688 for(; cnz < total_nnz; ++cnz)
3689 {
3690 int colrec;
3691 size_t commonindex;
3692 std::stringstream linestream;
3693 if( (!hfile.headerexists) && (!infile.eof()))
3694 {
3695 // read one line at a time so that missing numerical values can be detected
3696 infile.getline(line, 1024);
3697 linestream << line;
3698 linestream >> temprow >> tempcol;
3699 if (!nonum)
3700 {
3701 // see if this line has a value
3702 linestream >> std::skipws;
3703 nonumline = linestream.eof();
3704 }
3705 --temprow; // file is 1-based where C-arrays are 0-based
3706 --tempcol;
3707 ntrow = temprow;
3708 ntcol = tempcol;
3709 }
3710 else if(hfile.headerexists && (!feof(binfile)) )
3711 {
3712 handler.binaryfill(binfile, temprow , tempcol, tempval);
3713 }
3714 if (transpose)
3715 {
3716 IT swap = temprow;
3717 temprow = tempcol;
3718 tempcol = swap;
3719 }
3720 colrec = std::min(static_cast<int>(temprow / m_perproc), colneighs-1); // precipient processor along the column
3721 commonindex = colrec * buffpercolneigh + ccurptrs[colrec];
3722
3723 rows[ commonindex ] = temprow;
3724 cols[ commonindex ] = tempcol;
3725 if( (!hfile.headerexists) && (!infile.eof()))
3726 {
3727 vals[ commonindex ] = nonumline ? handler.getNoNum(ntrow, ntcol) : handler.read(linestream, ntrow, ntcol); //tempval;
3728 }
3729 else if(hfile.headerexists && (!feof(binfile)) )
3730 {
3731 vals[ commonindex ] = tempval;
3732 }
3733 ++ (ccurptrs[colrec]);
3734 if(ccurptrs[colrec] == buffpercolneigh || (cnz == (total_nnz-1)) ) // one buffer is full, or file is done !
3735 {
3736 MPI_Scatter(ccurptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, rankincol, commGrid->colWorld); // first, send the receive counts
3737
3738 // generate space for own recv data ... (use arrays because vector<bool> is cripled, if NT=bool)
3739 IT * temprows = new IT[recvcount];
3740 IT * tempcols = new IT[recvcount];
3741 NT * tempvals = new NT[recvcount];
3742
3743 // then, send all buffers that to their recipients ...
3744 MPI_Scatterv(rows, ccurptrs, cdispls, MPIType<IT>(), temprows, recvcount, MPIType<IT>(), rankincol, commGrid->colWorld);
3745 MPI_Scatterv(cols, ccurptrs, cdispls, MPIType<IT>(), tempcols, recvcount, MPIType<IT>(), rankincol, commGrid->colWorld);
3746 MPI_Scatterv(vals, ccurptrs, cdispls, MPIType<NT>(), tempvals, recvcount, MPIType<NT>(), rankincol, commGrid->colWorld);
3747
3748 std::fill_n(ccurptrs, colneighs, 0); // finally, reset current pointers !
3749 DeleteAll(rows, cols, vals);
3750
3751 HorizontalSend(rows, cols, vals,temprows, tempcols, tempvals, localtuples, rcurptrs, rdispls,
3752 buffperrowneigh, rowneighs, recvcount, m_perproc, n_perproc, rankinrow);
3753
3754 if( cnz != (total_nnz-1) ) // otherwise the loop will exit with noone to claim memory back
3755 {
3756 // reuse these buffers for the next vertical communication
3757 rows = new IT [ buffpercolneigh * colneighs ];
3758 cols = new IT [ buffpercolneigh * colneighs ];
3759 vals = new NT [ buffpercolneigh * colneighs ];
3760 }
3761 } // end_if for "send buffer is full" case
3762 } // end_for for "cnz < entriestoread" case
3763 assert (cnz == total_nnz);
3764
3765 // Signal the end of file to other processors along the column
3766 std::fill_n(ccurptrs, colneighs, std::numeric_limits<int>::max());
3767 MPI_Scatter(ccurptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, rankincol, commGrid->colWorld);
3768
3769 // And along the row ...
3770 std::fill_n(rcurptrs, rowneighs, std::numeric_limits<int>::max());
3771 MPI_Scatter(rcurptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, rankinrow, commGrid->rowWorld);
3772 } // end of "else" (only one processor reads) block
3773 } // end_if for "master processor" case
3774 else if( commGrid->OnSameProcCol(master) ) // (r-1) processors
3775 {
3776 BcastEssentials(commGrid->commWorld, total_m, total_n, total_nnz, master);
3777 m_perproc = total_m / colneighs;
3778 n_perproc = total_n / rowneighs;
3779
3780 if(seeklength > 0 && pario) // these processors also do parallel binary i/o
3781 {
3782 binfile = fopen(filename.c_str(), "rb");
3783 IT entrysize = handler.entrylength();
3784 int myrankincol = commGrid->GetRankInProcCol();
3785 IT perreader = total_nnz / colneighs;
3786 IT read_offset = entrysize * static_cast<IT>(myrankincol) * perreader + seeklength;
3787 IT entriestoread = perreader;
3788 if (myrankincol == colneighs-1)
3789 entriestoread = total_nnz - static_cast<IT>(myrankincol) * perreader;
3790 fseek(binfile, read_offset, SEEK_SET);
3791
3792 #ifdef IODEBUG
3793 std::ofstream oput;
3794 commGrid->OpenDebugFile("Read", oput);
3795 oput << "Total nnz: " << total_nnz << " OFFSET : " << read_offset << " entries to read: " << entriestoread << std::endl;
3796 oput.close();
3797 #endif
3798
3799 AllocateSetBuffers(rows, cols, vals, rcurptrs, ccurptrs, rowneighs, colneighs, buffpercolneigh);
3800 ReadAllMine(binfile, rows, cols, vals, localtuples, rcurptrs, ccurptrs, rdispls, cdispls, m_perproc, n_perproc,
3801 rowneighs, colneighs, buffperrowneigh, buffpercolneigh, entriestoread, handler, rankinrow, transpose);
3802 }
3803 else // only master does the I/O
3804 {
3805 while(total_n > 0 || total_m > 0) // otherwise input file does not exist !
3806 {
3807 // void MPI::Comm::Scatterv(const void* sendbuf, const int sendcounts[], const int displs[], const MPI::Datatype& sendtype,
3808 // void* recvbuf, int recvcount, const MPI::Datatype & recvtype, int root) const
3809 // The outcome is as if the root executed n send operations,
3810 // MPI_Send(sendbuf + displs[i] * extent(sendtype), sendcounts[i], sendtype, i, ...)
3811 // and each process executed a receive,
3812 // MPI_Recv(recvbuf, recvcount, recvtype, root, ...)
3813 // The send buffer is ignored for all nonroot processes.
3814
3815 MPI_Scatter(ccurptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, rankincol, commGrid->colWorld); // first receive the receive counts ...
3816 if( recvcount == std::numeric_limits<int>::max()) break;
3817
3818 // create space for incoming data ...
3819 IT * temprows = new IT[recvcount];
3820 IT * tempcols = new IT[recvcount];
3821 NT * tempvals = new NT[recvcount];
3822
3823 // receive actual data ... (first 4 arguments are ignored in the receiver side)
3824 MPI_Scatterv(rows, ccurptrs, cdispls, MPIType<IT>(), temprows, recvcount, MPIType<IT>(), rankincol, commGrid->colWorld);
3825 MPI_Scatterv(cols, ccurptrs, cdispls, MPIType<IT>(), tempcols, recvcount, MPIType<IT>(), rankincol, commGrid->colWorld);
3826 MPI_Scatterv(vals, ccurptrs, cdispls, MPIType<NT>(), tempvals, recvcount, MPIType<NT>(), rankincol, commGrid->colWorld);
3827
3828 // now, send the data along the horizontal
3829 rcurptrs = new int[rowneighs];
3830 std::fill_n(rcurptrs, rowneighs, 0);
3831
3832 // HorizontalSend frees the memory of temp_xxx arrays and then creates and frees memory of all the six arrays itself
3833 HorizontalSend(rows, cols, vals,temprows, tempcols, tempvals, localtuples, rcurptrs, rdispls,
3834 buffperrowneigh, rowneighs, recvcount, m_perproc, n_perproc, rankinrow);
3835 }
3836 }
3837
3838 // Signal the end of file to other processors along the row
3839 std::fill_n(rcurptrs, rowneighs, std::numeric_limits<int>::max());
3840 MPI_Scatter(rcurptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, rankinrow, commGrid->rowWorld);
3841 delete [] rcurptrs;
3842 }
3843 else // r * (s-1) processors that only participate in the horizontal communication step
3844 {
3845 BcastEssentials(commGrid->commWorld, total_m, total_n, total_nnz, master);
3846 m_perproc = total_m / colneighs;
3847 n_perproc = total_n / rowneighs;
3848 while(total_n > 0 || total_m > 0) // otherwise input file does not exist !
3849 {
3850 // receive the receive count
3851 MPI_Scatter(rcurptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, rankinrow, commGrid->rowWorld);
3852 if( recvcount == std::numeric_limits<int>::max())
3853 break;
3854
3855 // create space for incoming data ...
3856 IT * temprows = new IT[recvcount];
3857 IT * tempcols = new IT[recvcount];
3858 NT * tempvals = new NT[recvcount];
3859
3860 MPI_Scatterv(rows, rcurptrs, rdispls, MPIType<IT>(), temprows, recvcount, MPIType<IT>(), rankinrow, commGrid->rowWorld);
3861 MPI_Scatterv(cols, rcurptrs, rdispls, MPIType<IT>(), tempcols, recvcount, MPIType<IT>(), rankinrow, commGrid->rowWorld);
3862 MPI_Scatterv(vals, rcurptrs, rdispls, MPIType<NT>(), tempvals, recvcount, MPIType<NT>(), rankinrow, commGrid->rowWorld);
3863
3864 // now push what is ours to tuples
3865 IT moffset = commGrid->myprocrow * m_perproc;
3866 IT noffset = commGrid->myproccol * n_perproc;
3867
3868 for(IT i=0; i< recvcount; ++i)
3869 {
3870 localtuples.push_back( std::make_tuple(temprows[i]-moffset, tempcols[i]-noffset, tempvals[i]) );
3871 }
3872 DeleteAll(temprows,tempcols,tempvals);
3873 }
3874 }
3875 DeleteAll(cdispls, rdispls);
3876 std::tuple<IT,IT,NT> * arrtuples = new std::tuple<IT,IT,NT>[localtuples.size()]; // the vector will go out of scope, make it stick !
3877 std::copy(localtuples.begin(), localtuples.end(), arrtuples);
3878
3879 IT localm = (commGrid->myprocrow != (commGrid->grrows-1))? m_perproc: (total_m - (m_perproc * (commGrid->grrows-1)));
3880 IT localn = (commGrid->myproccol != (commGrid->grcols-1))? n_perproc: (total_n - (n_perproc * (commGrid->grcols-1)));
3881 spSeq->Create( localtuples.size(), localm, localn, arrtuples); // the deletion of arrtuples[] is handled by SpMat::Create
3882
3883 #ifdef TAU_PROFILE
3884 TAU_PROFILE_STOP(rdtimer);
3885 #endif
3886 return;
3887 }
3888
3889 template <class IT, class NT, class DER>
AllocateSetBuffers(IT * & rows,IT * & cols,NT * & vals,int * & rcurptrs,int * & ccurptrs,int rowneighs,int colneighs,IT buffpercolneigh)3890 void SpParMat<IT,NT,DER>::AllocateSetBuffers(IT * & rows, IT * & cols, NT * & vals, int * & rcurptrs, int * & ccurptrs, int rowneighs, int colneighs, IT buffpercolneigh)
3891 {
3892 // allocate buffers on the heap as stack space is usually limited
3893 rows = new IT [ buffpercolneigh * colneighs ];
3894 cols = new IT [ buffpercolneigh * colneighs ];
3895 vals = new NT [ buffpercolneigh * colneighs ];
3896
3897 ccurptrs = new int[colneighs];
3898 rcurptrs = new int[rowneighs];
3899 std::fill_n(ccurptrs, colneighs, 0); // fill with zero
3900 std::fill_n(rcurptrs, rowneighs, 0);
3901 }
3902
3903 template <class IT, class NT, class DER>
BcastEssentials(MPI_Comm & world,IT & total_m,IT & total_n,IT & total_nnz,int master)3904 void SpParMat<IT,NT,DER>::BcastEssentials(MPI_Comm & world, IT & total_m, IT & total_n, IT & total_nnz, int master)
3905 {
3906 MPI_Bcast(&total_m, 1, MPIType<IT>(), master, world);
3907 MPI_Bcast(&total_n, 1, MPIType<IT>(), master, world);
3908 MPI_Bcast(&total_nnz, 1, MPIType<IT>(), master, world);
3909 }
3910
3911 /*
3912 * @post {rows, cols, vals are pre-allocated on the heap after this call}
3913 * @post {ccurptrs are set to zero; so that if another call is made to this function without modifying ccurptrs, no data will be send from this procesor}
3914 */
3915 template <class IT, class NT, class DER>
VerticalSend(IT * & rows,IT * & cols,NT * & vals,std::vector<std::tuple<IT,IT,NT>> & localtuples,int * rcurptrs,int * ccurptrs,int * rdispls,int * cdispls,IT m_perproc,IT n_perproc,int rowneighs,int colneighs,IT buffperrowneigh,IT buffpercolneigh,int rankinrow)3916 void SpParMat<IT,NT,DER>::VerticalSend(IT * & rows, IT * & cols, NT * & vals, std::vector< std::tuple<IT,IT,NT> > & localtuples, int * rcurptrs, int * ccurptrs, int * rdispls, int * cdispls,
3917 IT m_perproc, IT n_perproc, int rowneighs, int colneighs, IT buffperrowneigh, IT buffpercolneigh, int rankinrow)
3918 {
3919 // first, send/recv the counts ...
3920 int * colrecvdispls = new int[colneighs];
3921 int * colrecvcounts = new int[colneighs];
3922 MPI_Alltoall(ccurptrs, 1, MPI_INT, colrecvcounts, 1, MPI_INT, commGrid->colWorld); // share the request counts
3923 int totrecv = std::accumulate(colrecvcounts,colrecvcounts+colneighs,0);
3924 colrecvdispls[0] = 0; // receive displacements are exact whereas send displacements have slack
3925 for(int i=0; i<colneighs-1; ++i)
3926 colrecvdispls[i+1] = colrecvdispls[i] + colrecvcounts[i];
3927
3928 // generate space for own recv data ... (use arrays because vector<bool> is cripled, if NT=bool)
3929 IT * temprows = new IT[totrecv];
3930 IT * tempcols = new IT[totrecv];
3931 NT * tempvals = new NT[totrecv];
3932
3933 // then, exchange all buffers that to their recipients ...
3934 MPI_Alltoallv(rows, ccurptrs, cdispls, MPIType<IT>(), temprows, colrecvcounts, colrecvdispls, MPIType<IT>(), commGrid->colWorld);
3935 MPI_Alltoallv(cols, ccurptrs, cdispls, MPIType<IT>(), tempcols, colrecvcounts, colrecvdispls, MPIType<IT>(), commGrid->colWorld);
3936 MPI_Alltoallv(vals, ccurptrs, cdispls, MPIType<NT>(), tempvals, colrecvcounts, colrecvdispls, MPIType<NT>(), commGrid->colWorld);
3937
3938 // finally, reset current pointers !
3939 std::fill_n(ccurptrs, colneighs, 0);
3940 DeleteAll(colrecvdispls, colrecvcounts);
3941 DeleteAll(rows, cols, vals);
3942
3943 // rcurptrs/rdispls are zero initialized scratch space
3944 HorizontalSend(rows, cols, vals,temprows, tempcols, tempvals, localtuples, rcurptrs, rdispls, buffperrowneigh, rowneighs, totrecv, m_perproc, n_perproc, rankinrow);
3945
3946 // reuse these buffers for the next vertical communication
3947 rows = new IT [ buffpercolneigh * colneighs ];
3948 cols = new IT [ buffpercolneigh * colneighs ];
3949 vals = new NT [ buffpercolneigh * colneighs ];
3950 }
3951
3952
3953 /**
3954 * Private subroutine of ReadDistribute.
3955 * Executed by p_r processors on the first processor column.
3956 * @pre {rows, cols, vals are pre-allocated on the heap before this call}
3957 * @param[in] rankinrow {row head's rank in its processor row - determines the scatter person}
3958 */
3959 template <class IT, class NT, class DER>
3960 template <class HANDLER>
ReadAllMine(FILE * binfile,IT * & rows,IT * & cols,NT * & vals,std::vector<std::tuple<IT,IT,NT>> & localtuples,int * rcurptrs,int * ccurptrs,int * rdispls,int * cdispls,IT m_perproc,IT n_perproc,int rowneighs,int colneighs,IT buffperrowneigh,IT buffpercolneigh,IT entriestoread,HANDLER handler,int rankinrow,bool transpose)3961 void SpParMat<IT,NT,DER>::ReadAllMine(FILE * binfile, IT * & rows, IT * & cols, NT * & vals, std::vector< std::tuple<IT,IT,NT> > & localtuples, int * rcurptrs, int * ccurptrs, int * rdispls, int * cdispls,
3962 IT m_perproc, IT n_perproc, int rowneighs, int colneighs, IT buffperrowneigh, IT buffpercolneigh, IT entriestoread, HANDLER handler, int rankinrow, bool transpose)
3963 {
3964 assert(entriestoread != 0);
3965 IT cnz = 0;
3966 IT temprow, tempcol;
3967 NT tempval;
3968 int finishedglobal = 1;
3969 while(cnz < entriestoread && !feof(binfile)) // this loop will execute at least once
3970 {
3971 handler.binaryfill(binfile, temprow , tempcol, tempval);
3972 if (transpose)
3973 {
3974 IT swap = temprow;
3975 temprow = tempcol;
3976 tempcol = swap;
3977 }
3978 int colrec = std::min(static_cast<int>(temprow / m_perproc), colneighs-1); // precipient processor along the column
3979 size_t commonindex = colrec * buffpercolneigh + ccurptrs[colrec];
3980 rows[ commonindex ] = temprow;
3981 cols[ commonindex ] = tempcol;
3982 vals[ commonindex ] = tempval;
3983 ++ (ccurptrs[colrec]);
3984 if(ccurptrs[colrec] == buffpercolneigh || (cnz == (entriestoread-1)) ) // one buffer is full, or this processor's share is done !
3985 {
3986 #ifdef IODEBUG
3987 std::ofstream oput;
3988 commGrid->OpenDebugFile("Read", oput);
3989 oput << "To column neighbors: ";
3990 std::copy(ccurptrs, ccurptrs+colneighs, std::ostream_iterator<int>(oput, " ")); oput << std::endl;
3991 oput.close();
3992 #endif
3993
3994 VerticalSend(rows, cols, vals, localtuples, rcurptrs, ccurptrs, rdispls, cdispls, m_perproc, n_perproc,
3995 rowneighs, colneighs, buffperrowneigh, buffpercolneigh, rankinrow);
3996
3997 if(cnz == (entriestoread-1)) // last execution of the outer loop
3998 {
3999 int finishedlocal = 1; // I am done, but let me check others
4000 MPI_Allreduce( &finishedlocal, &finishedglobal, 1, MPI_INT, MPI_BAND, commGrid->colWorld);
4001 while(!finishedglobal)
4002 {
4003 #ifdef DEBUG
4004 std::ofstream oput;
4005 commGrid->OpenDebugFile("Read", oput);
4006 oput << "To column neighbors: ";
4007 std::copy(ccurptrs, ccurptrs+colneighs, std::ostream_iterator<int>(oput, " ")); oput << std::endl;
4008 oput.close();
4009 #endif
4010
4011 // postcondition of VerticalSend: ccurptrs are set to zero
4012 // if another call is made to this function without modifying ccurptrs, no data will be send from this procesor
4013 VerticalSend(rows, cols, vals, localtuples, rcurptrs, ccurptrs, rdispls, cdispls, m_perproc, n_perproc,
4014 rowneighs, colneighs, buffperrowneigh, buffpercolneigh, rankinrow);
4015
4016 MPI_Allreduce( &finishedlocal, &finishedglobal, 1, MPI_INT, MPI_BAND, commGrid->colWorld);
4017 }
4018 }
4019 else // the other loop will continue executing
4020 {
4021 int finishedlocal = 0;
4022 MPI_Allreduce( &finishedlocal, &finishedglobal, 1, MPI_INT, MPI_BAND, commGrid->colWorld);
4023 }
4024 } // end_if for "send buffer is full" case
4025 ++cnz;
4026 }
4027
4028 // signal the end to row neighbors
4029 std::fill_n(rcurptrs, rowneighs, std::numeric_limits<int>::max());
4030 int recvcount;
4031 MPI_Scatter(rcurptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, rankinrow, commGrid->rowWorld);
4032 }
4033
4034
4035 /**
4036 * Private subroutine of ReadDistribute
4037 * @param[in] rankinrow {Row head's rank in its processor row}
4038 * Initially temp_xxx arrays carry data received along the proc. column AND needs to be sent along the proc. row
4039 * After usage, function frees the memory of temp_xxx arrays and then creates and frees memory of all the six arrays itself
4040 */
4041 template <class IT, class NT, class DER>
HorizontalSend(IT * & rows,IT * & cols,NT * & vals,IT * & temprows,IT * & tempcols,NT * & tempvals,std::vector<std::tuple<IT,IT,NT>> & localtuples,int * rcurptrs,int * rdispls,IT buffperrowneigh,int rowneighs,int recvcount,IT m_perproc,IT n_perproc,int rankinrow)4042 void SpParMat<IT,NT,DER>::HorizontalSend(IT * & rows, IT * & cols, NT * & vals, IT * & temprows, IT * & tempcols, NT * & tempvals, std::vector < std::tuple <IT,IT,NT> > & localtuples,
4043 int * rcurptrs, int * rdispls, IT buffperrowneigh, int rowneighs, int recvcount, IT m_perproc, IT n_perproc, int rankinrow)
4044 {
4045 rows = new IT [ buffperrowneigh * rowneighs ];
4046 cols = new IT [ buffperrowneigh * rowneighs ];
4047 vals = new NT [ buffperrowneigh * rowneighs ];
4048
4049 // prepare to send the data along the horizontal
4050 for(int i=0; i< recvcount; ++i)
4051 {
4052 int rowrec = std::min(static_cast<int>(tempcols[i] / n_perproc), rowneighs-1);
4053 rows[ rowrec * buffperrowneigh + rcurptrs[rowrec] ] = temprows[i];
4054 cols[ rowrec * buffperrowneigh + rcurptrs[rowrec] ] = tempcols[i];
4055 vals[ rowrec * buffperrowneigh + rcurptrs[rowrec] ] = tempvals[i];
4056 ++ (rcurptrs[rowrec]);
4057 }
4058
4059 #ifdef IODEBUG
4060 std::ofstream oput;
4061 commGrid->OpenDebugFile("Read", oput);
4062 oput << "To row neighbors: ";
4063 std::copy(rcurptrs, rcurptrs+rowneighs, std::ostream_iterator<int>(oput, " ")); oput << std::endl;
4064 oput << "Row displacements were: ";
4065 std::copy(rdispls, rdispls+rowneighs, std::ostream_iterator<int>(oput, " ")); oput << std::endl;
4066 oput.close();
4067 #endif
4068
4069 MPI_Scatter(rcurptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, rankinrow, commGrid->rowWorld); // Send the receive counts for horizontal communication
4070
4071 // the data is now stored in rows/cols/vals, can reset temporaries
4072 // sets size and capacity to new recvcount
4073 DeleteAll(temprows, tempcols, tempvals);
4074 temprows = new IT[recvcount];
4075 tempcols = new IT[recvcount];
4076 tempvals = new NT[recvcount];
4077
4078 // then, send all buffers that to their recipients ...
4079 MPI_Scatterv(rows, rcurptrs, rdispls, MPIType<IT>(), temprows, recvcount, MPIType<IT>(), rankinrow, commGrid->rowWorld);
4080 MPI_Scatterv(cols, rcurptrs, rdispls, MPIType<IT>(), tempcols, recvcount, MPIType<IT>(), rankinrow, commGrid->rowWorld);
4081 MPI_Scatterv(vals, rcurptrs, rdispls, MPIType<NT>(), tempvals, recvcount, MPIType<NT>(), rankinrow, commGrid->rowWorld);
4082
4083 // now push what is ours to tuples
4084 IT moffset = commGrid->myprocrow * m_perproc;
4085 IT noffset = commGrid->myproccol * n_perproc;
4086
4087 for(int i=0; i< recvcount; ++i)
4088 {
4089 localtuples.push_back( std::make_tuple(temprows[i]-moffset, tempcols[i]-noffset, tempvals[i]) );
4090 }
4091
4092 std::fill_n(rcurptrs, rowneighs, 0);
4093 DeleteAll(rows, cols, vals, temprows, tempcols, tempvals);
4094 }
4095
4096
4097 //! The input parameters' identity (zero) elements as well as
4098 //! their communication grid is preserved while outputting
4099 template <class IT, class NT, class DER>
Find(FullyDistVec<IT,IT> & distrows,FullyDistVec<IT,IT> & distcols,FullyDistVec<IT,NT> & distvals) const4100 void SpParMat<IT,NT,DER>::Find (FullyDistVec<IT,IT> & distrows, FullyDistVec<IT,IT> & distcols, FullyDistVec<IT,NT> & distvals) const
4101 {
4102 if((*(distrows.commGrid) != *(distcols.commGrid)) || (*(distcols.commGrid) != *(distvals.commGrid)))
4103 {
4104 SpParHelper::Print("Grids are not comparable, Find() fails!", commGrid->GetWorld());
4105 MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
4106 }
4107 IT globallen = getnnz();
4108 SpTuples<IT,NT> Atuples(*spSeq);
4109
4110 FullyDistVec<IT,IT> nrows ( distrows.commGrid, globallen, 0);
4111 FullyDistVec<IT,IT> ncols ( distcols.commGrid, globallen, 0);
4112 FullyDistVec<IT,NT> nvals ( distvals.commGrid, globallen, NT());
4113
4114 IT prelen = Atuples.getnnz();
4115 //IT postlen = nrows.MyLocLength();
4116
4117 int rank = commGrid->GetRank();
4118 int nprocs = commGrid->GetSize();
4119 IT * prelens = new IT[nprocs];
4120 prelens[rank] = prelen;
4121 MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(), prelens, 1, MPIType<IT>(), commGrid->GetWorld());
4122 IT prelenuntil = std::accumulate(prelens, prelens+rank, static_cast<IT>(0));
4123
4124 int * sendcnt = new int[nprocs](); // zero initialize
4125 IT * rows = new IT[prelen];
4126 IT * cols = new IT[prelen];
4127 NT * vals = new NT[prelen];
4128
4129 int rowrank = commGrid->GetRankInProcRow();
4130 int colrank = commGrid->GetRankInProcCol();
4131 int rowneighs = commGrid->GetGridCols();
4132 int colneighs = commGrid->GetGridRows();
4133 IT * locnrows = new IT[colneighs]; // number of rows is calculated by a reduction among the processor column
4134 IT * locncols = new IT[rowneighs];
4135 locnrows[colrank] = getlocalrows();
4136 locncols[rowrank] = getlocalcols();
4137
4138 MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(),locnrows, 1, MPIType<IT>(), commGrid->GetColWorld());
4139 MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(),locncols, 1, MPIType<IT>(), commGrid->GetRowWorld());
4140
4141 IT roffset = std::accumulate(locnrows, locnrows+colrank, static_cast<IT>(0));
4142 IT coffset = std::accumulate(locncols, locncols+rowrank, static_cast<IT>(0));
4143
4144 DeleteAll(locnrows, locncols);
4145 for(int i=0; i< prelen; ++i)
4146 {
4147 IT locid; // ignore local id, data will come in order
4148 int owner = nrows.Owner(prelenuntil+i, locid);
4149 sendcnt[owner]++;
4150
4151 rows[i] = Atuples.rowindex(i) + roffset; // need the global row index
4152 cols[i] = Atuples.colindex(i) + coffset; // need the global col index
4153 vals[i] = Atuples.numvalue(i);
4154 }
4155
4156 int * recvcnt = new int[nprocs];
4157 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, commGrid->GetWorld()); // get the recv counts
4158
4159 int * sdpls = new int[nprocs](); // displacements (zero initialized pid)
4160 int * rdpls = new int[nprocs]();
4161 std::partial_sum(sendcnt, sendcnt+nprocs-1, sdpls+1);
4162 std::partial_sum(recvcnt, recvcnt+nprocs-1, rdpls+1);
4163
4164 MPI_Alltoallv(rows, sendcnt, sdpls, MPIType<IT>(), SpHelper::p2a(nrows.arr), recvcnt, rdpls, MPIType<IT>(), commGrid->GetWorld());
4165 MPI_Alltoallv(cols, sendcnt, sdpls, MPIType<IT>(), SpHelper::p2a(ncols.arr), recvcnt, rdpls, MPIType<IT>(), commGrid->GetWorld());
4166 MPI_Alltoallv(vals, sendcnt, sdpls, MPIType<NT>(), SpHelper::p2a(nvals.arr), recvcnt, rdpls, MPIType<NT>(), commGrid->GetWorld());
4167
4168 DeleteAll(sendcnt, recvcnt, sdpls, rdpls);
4169 DeleteAll(prelens, rows, cols, vals);
4170 distrows = nrows;
4171 distcols = ncols;
4172 distvals = nvals;
4173 }
4174
4175 //! The input parameters' identity (zero) elements as well as
4176 //! their communication grid is preserved while outputting
4177 template <class IT, class NT, class DER>
Find(FullyDistVec<IT,IT> & distrows,FullyDistVec<IT,IT> & distcols) const4178 void SpParMat<IT,NT,DER>::Find (FullyDistVec<IT,IT> & distrows, FullyDistVec<IT,IT> & distcols) const
4179 {
4180 if((*(distrows.commGrid) != *(distcols.commGrid)) )
4181 {
4182 SpParHelper::Print("Grids are not comparable, Find() fails!", commGrid->GetWorld());
4183 MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
4184 }
4185 IT globallen = getnnz();
4186 SpTuples<IT,NT> Atuples(*spSeq);
4187
4188 FullyDistVec<IT,IT> nrows ( distrows.commGrid, globallen, 0);
4189 FullyDistVec<IT,IT> ncols ( distcols.commGrid, globallen, 0);
4190
4191 IT prelen = Atuples.getnnz();
4192
4193 int rank = commGrid->GetRank();
4194 int nprocs = commGrid->GetSize();
4195 IT * prelens = new IT[nprocs];
4196 prelens[rank] = prelen;
4197 MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(), prelens, 1, MPIType<IT>(), commGrid->GetWorld());
4198 IT prelenuntil = std::accumulate(prelens, prelens+rank, static_cast<IT>(0));
4199
4200 int * sendcnt = new int[nprocs](); // zero initialize
4201 IT * rows = new IT[prelen];
4202 IT * cols = new IT[prelen];
4203 NT * vals = new NT[prelen];
4204
4205 int rowrank = commGrid->GetRankInProcRow();
4206 int colrank = commGrid->GetRankInProcCol();
4207 int rowneighs = commGrid->GetGridCols();
4208 int colneighs = commGrid->GetGridRows();
4209 IT * locnrows = new IT[colneighs]; // number of rows is calculated by a reduction among the processor column
4210 IT * locncols = new IT[rowneighs];
4211 locnrows[colrank] = getlocalrows();
4212 locncols[rowrank] = getlocalcols();
4213
4214 MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(),locnrows, 1, MPIType<IT>(), commGrid->GetColWorld());
4215 MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(),locncols, 1, MPIType<IT>(), commGrid->GetColWorld());
4216 IT roffset = std::accumulate(locnrows, locnrows+colrank, static_cast<IT>(0));
4217 IT coffset = std::accumulate(locncols, locncols+rowrank, static_cast<IT>(0));
4218
4219 DeleteAll(locnrows, locncols);
4220 for(int i=0; i< prelen; ++i)
4221 {
4222 IT locid; // ignore local id, data will come in order
4223 int owner = nrows.Owner(prelenuntil+i, locid);
4224 sendcnt[owner]++;
4225
4226 rows[i] = Atuples.rowindex(i) + roffset; // need the global row index
4227 cols[i] = Atuples.colindex(i) + coffset; // need the global col index
4228 }
4229
4230 int * recvcnt = new int[nprocs];
4231 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, commGrid->GetWorld()); // get the recv counts
4232
4233 int * sdpls = new int[nprocs](); // displacements (zero initialized pid)
4234 int * rdpls = new int[nprocs]();
4235 std::partial_sum(sendcnt, sendcnt+nprocs-1, sdpls+1);
4236 std::partial_sum(recvcnt, recvcnt+nprocs-1, rdpls+1);
4237
4238 MPI_Alltoallv(rows, sendcnt, sdpls, MPIType<IT>(), SpHelper::p2a(nrows.arr), recvcnt, rdpls, MPIType<IT>(), commGrid->GetWorld());
4239 MPI_Alltoallv(cols, sendcnt, sdpls, MPIType<IT>(), SpHelper::p2a(ncols.arr), recvcnt, rdpls, MPIType<IT>(), commGrid->GetWorld());
4240
4241 DeleteAll(sendcnt, recvcnt, sdpls, rdpls);
4242 DeleteAll(prelens, rows, cols, vals);
4243 distrows = nrows;
4244 distcols = ncols;
4245 }
4246
4247 template <class IT, class NT, class DER>
put(std::ofstream & outfile) const4248 std::ofstream& SpParMat<IT,NT,DER>::put(std::ofstream& outfile) const
4249 {
4250 outfile << (*spSeq) << std::endl;
4251 return outfile;
4252 }
4253
4254 template <class IU, class NU, class UDER>
operator <<(std::ofstream & outfile,const SpParMat<IU,NU,UDER> & s)4255 std::ofstream& operator<<(std::ofstream& outfile, const SpParMat<IU, NU, UDER> & s)
4256 {
4257 return s.put(outfile) ; // use the right put() function
4258
4259 }
4260
4261 /**
4262 * @param[in] grow {global row index}
4263 * @param[in] gcol {global column index}
4264 * @param[out] lrow {row index local to the owner}
4265 * @param[out] lcol {col index local to the owner}
4266 * @returns {owner processor id}
4267 **/
4268 template <class IT, class NT,class DER>
4269 template <typename LIT>
Owner(IT total_m,IT total_n,IT grow,IT gcol,LIT & lrow,LIT & lcol) const4270 int SpParMat<IT,NT,DER>::Owner(IT total_m, IT total_n, IT grow, IT gcol, LIT & lrow, LIT & lcol) const
4271 {
4272 int procrows = commGrid->GetGridRows();
4273 int proccols = commGrid->GetGridCols();
4274 IT m_perproc = total_m / procrows;
4275 IT n_perproc = total_n / proccols;
4276
4277 int own_procrow; // owner's processor row
4278 if(m_perproc != 0)
4279 {
4280 own_procrow = std::min(static_cast<int>(grow / m_perproc), procrows-1); // owner's processor row
4281 }
4282 else // all owned by the last processor row
4283 {
4284 own_procrow = procrows -1;
4285 }
4286 int own_proccol;
4287 if(n_perproc != 0)
4288 {
4289 own_proccol = std::min(static_cast<int>(gcol / n_perproc), proccols-1);
4290 }
4291 else
4292 {
4293 own_proccol = proccols-1;
4294 }
4295 lrow = grow - (own_procrow * m_perproc);
4296 lcol = gcol - (own_proccol * n_perproc);
4297 return commGrid->GetRank(own_procrow, own_proccol);
4298 }
4299
4300 /**
4301 * @param[out] rowOffset {Row offset imposed by process grid. Global row index = rowOffset + local row index.}
4302 * @param[out] colOffset {Column offset imposed by process grid. Global column index = colOffset + local column index.}
4303 **/
4304 template <class IT, class NT,class DER>
GetPlaceInGlobalGrid(IT & rowOffset,IT & colOffset) const4305 void SpParMat<IT,NT,DER>::GetPlaceInGlobalGrid(IT& rowOffset, IT& colOffset) const
4306 {
4307 IT total_rows = getnrow();
4308 IT total_cols = getncol();
4309
4310 int procrows = commGrid->GetGridRows();
4311 int proccols = commGrid->GetGridCols();
4312 IT rows_perproc = total_rows / procrows;
4313 IT cols_perproc = total_cols / proccols;
4314
4315 rowOffset = commGrid->GetRankInProcCol()*rows_perproc;
4316 colOffset = commGrid->GetRankInProcRow()*cols_perproc;
4317 }
4318
4319 }
4320