1 /****************************************************************/
2 /* Parallel Combinatorial BLAS Library (for Graph Computations) */
3 /* version 1.6 -------------------------------------------------*/
4 /* date: 6/15/2017 ---------------------------------------------*/
5 /* authors: Ariful Azad, Aydin Buluc  --------------------------*/
6 /****************************************************************/
7 /*
8  Copyright (c) 2010-2017, The Regents of the University of California
9 
10  Permission is hereby granted, free of charge, to any person obtaining a copy
11  of this software and associated documentation files (the "Software"), to deal
12  in the Software without restriction, including without limitation the rights
13  to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14  copies of the Software, and to permit persons to whom the Software is
15  furnished to do so, subject to the following conditions:
16 
17  The above copyright notice and this permission notice shall be included in
18  all copies or substantial portions of the Software.
19 
20  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25  OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26  THE SOFTWARE.
27  */
28 
29 
30 
31 #include "SpParMat.h"
32 #include "ParFriends.h"
33 #include "Operations.h"
34 #include "FileHeader.h"
35 extern "C" {
36 #include "mmio.h"
37 }
38 #include <sys/types.h>
39 #include <sys/stat.h>
40 
41 #include <mpi.h>
42 #include <fstream>
43 #include <algorithm>
44 #include <set>
45 #include <stdexcept>
46 
47 namespace combblas {
48 
49 /**
50   * If every processor has a distinct triples file such as {A_0, A_1, A_2,... A_p} for p processors
51  **/
52 template <class IT, class NT, class DER>
SpParMat(std::ifstream & input,MPI_Comm & world)53 SpParMat< IT,NT,DER >::SpParMat (std::ifstream & input, MPI_Comm & world)
54 {
55 	assert( (sizeof(IT) >= sizeof(typename DER::LocalIT)) );
56 	if(!input.is_open())
57 	{
58 		perror("Input file doesn't exist\n");
59 		exit(-1);
60 	}
61 	commGrid.reset(new CommGrid(world, 0, 0));
62 	input >> (*spSeq);
63 }
64 
65 template <class IT, class NT, class DER>
SpParMat(DER * myseq,MPI_Comm & world)66 SpParMat< IT,NT,DER >::SpParMat (DER * myseq, MPI_Comm & world): spSeq(myseq)
67 {
68 	assert( (sizeof(IT) >= sizeof(typename DER::LocalIT)) );
69 	commGrid.reset(new CommGrid(world, 0, 0));
70 }
71 
72 template <class IT, class NT, class DER>
SpParMat(DER * myseq,std::shared_ptr<CommGrid> grid)73 SpParMat< IT,NT,DER >::SpParMat (DER * myseq, std::shared_ptr<CommGrid> grid): spSeq(myseq)
74 {
75 	assert( (sizeof(IT) >= sizeof(typename DER::LocalIT)) );
76 	commGrid = grid;
77 }
78 
79 template <class IT, class NT, class DER>
SpParMat(std::shared_ptr<CommGrid> grid)80 SpParMat< IT,NT,DER >::SpParMat (std::shared_ptr<CommGrid> grid)
81 {
82 	assert( (sizeof(IT) >= sizeof(typename DER::LocalIT)) );
83 	spSeq = new DER();
84 	commGrid = grid;
85 }
86 
87 //! Deprecated. Don't call the default constructor
88 template <class IT, class NT, class DER>
SpParMat()89 SpParMat< IT,NT,DER >::SpParMat ()
90 {
91 	SpParHelper::Print("COMBBLAS Warning: It is dangerous to create (matrix) objects without specifying the communicator, are you sure you want to create this object in MPI_COMM_WORLD?\n");
92 	assert( (sizeof(IT) >= sizeof(typename DER::LocalIT)) );
93 	spSeq = new DER();
94 	commGrid.reset(new CommGrid(MPI_COMM_WORLD, 0, 0));
95 }
96 
97 /**
98 * If there is a single file read by the master process only, use this and then call ReadDistribute()
99 **/
100 template <class IT, class NT, class DER>
SpParMat(MPI_Comm world)101 SpParMat< IT,NT,DER >::SpParMat (MPI_Comm world)
102 {
103 
104     assert( (sizeof(IT) >= sizeof(typename DER::LocalIT)) );
105     spSeq = new DER();
106     commGrid.reset(new CommGrid(world, 0, 0));
107 }
108 
109 template <class IT, class NT, class DER>
~SpParMat()110 SpParMat< IT,NT,DER >::~SpParMat ()
111 {
112 	if(spSeq != NULL) delete spSeq;
113 }
114 
115 template <class IT, class NT, class DER>
FreeMemory()116 void SpParMat< IT,NT,DER >::FreeMemory ()
117 {
118 	if(spSeq != NULL) delete spSeq;
119 	spSeq = NULL;
120 }
121 
122 
123 /**
124  * Private function to guide Select2 communication and avoid code duplication due to loop ends
125  * @param[int, out] klimits {per column k limit gets updated for the next iteration}
126  * @param[out] converged {items to remove from actcolsmap at next iteration{
127  **/
128 template <class IT, class NT, class DER>
129 template <typename VT, typename GIT>	// GIT: global index type of vector
TopKGather(std::vector<NT> & all_medians,std::vector<IT> & nnz_per_col,int & thischunk,int & chunksize,const std::vector<NT> & activemedians,const std::vector<IT> & activennzperc,int itersuntil,std::vector<std::vector<NT>> & localmat,const std::vector<IT> & actcolsmap,std::vector<IT> & klimits,std::vector<IT> & toretain,std::vector<std::vector<std::pair<IT,NT>>> & tmppair,IT coffset,const FullyDistVec<GIT,VT> & rvec) const130 void SpParMat<IT,NT,DER>::TopKGather(std::vector<NT> & all_medians, std::vector<IT> & nnz_per_col, int & thischunk, int & chunksize,
131                                      const std::vector<NT> & activemedians, const std::vector<IT> & activennzperc, int itersuntil,
132                                      std::vector< std::vector<NT> > & localmat, const std::vector<IT> & actcolsmap, std::vector<IT> & klimits,
133                                      std::vector<IT> & toretain, std::vector<std::vector<std::pair<IT,NT>>> & tmppair, IT coffset, const FullyDistVec<GIT,VT> & rvec) const
134 {
135     int rankincol = commGrid->GetRankInProcCol();
136     int colneighs = commGrid->GetGridRows();
137     int nprocs = commGrid->GetSize();
138     std::vector<double> finalWeightedMedians(thischunk, 0.0);
139 
140     MPI_Gather(activemedians.data() + itersuntil*chunksize, thischunk, MPIType<NT>(), all_medians.data(), thischunk, MPIType<NT>(), 0, commGrid->GetColWorld());
141     MPI_Gather(activennzperc.data() + itersuntil*chunksize, thischunk, MPIType<IT>(), nnz_per_col.data(), thischunk, MPIType<IT>(), 0, commGrid->GetColWorld());
142 
143     if(rankincol == 0)
144     {
145         std::vector<double> columnCounts(thischunk, 0.0);
146         std::vector< std::pair<NT, double> > mediansNweights(colneighs);  // (median,weight) pairs    [to be reused at each iteration]
147 
148         for(int j = 0; j < thischunk; ++j)  // for each column
149         {
150             for(int k = 0; k<colneighs; ++k)
151             {
152                 size_t fetchindex = k*thischunk+j;
153                 columnCounts[j] += static_cast<double>(nnz_per_col[fetchindex]);
154             }
155             for(int k = 0; k<colneighs; ++k)
156             {
157                 size_t fetchindex = k*thischunk+j;
158                 mediansNweights[k] = std::make_pair(all_medians[fetchindex], static_cast<double>(nnz_per_col[fetchindex]) / columnCounts[j]);
159             }
160             sort(mediansNweights.begin(), mediansNweights.end());   // sort by median
161 
162             double sumofweights = 0;
163             int k = 0;
164             while( k<colneighs && sumofweights < 0.5)
165             {
166                 sumofweights += mediansNweights[k++].second;
167             }
168             finalWeightedMedians[j] = mediansNweights[k-1].first;
169         }
170     }
171     MPI_Bcast(finalWeightedMedians.data(), thischunk, MPIType<double>(), 0, commGrid->GetColWorld());
172 
173     std::vector<IT> larger(thischunk, 0);
174     std::vector<IT> smaller(thischunk, 0);
175     std::vector<IT> equal(thischunk, 0);
176 
177 #ifdef THREADED
178 #pragma omp parallel for
179 #endif
180     for(int j = 0; j < thischunk; ++j)  // for each active column
181     {
182         size_t fetchindex = actcolsmap[j+itersuntil*chunksize];
183         for(size_t k = 0; k < localmat[fetchindex].size(); ++k)
184         {
185             // count those above/below/equal to the median
186             if(localmat[fetchindex][k] > finalWeightedMedians[j])
187                 larger[j]++;
188             else if(localmat[fetchindex][k] < finalWeightedMedians[j])
189                 smaller[j]++;
190             else
191                 equal[j]++;
192         }
193     }
194     MPI_Allreduce(MPI_IN_PLACE, larger.data(), thischunk, MPIType<IT>(), MPI_SUM, commGrid->GetColWorld());
195     MPI_Allreduce(MPI_IN_PLACE, smaller.data(), thischunk, MPIType<IT>(), MPI_SUM, commGrid->GetColWorld());
196     MPI_Allreduce(MPI_IN_PLACE, equal.data(), thischunk, MPIType<IT>(), MPI_SUM, commGrid->GetColWorld());
197 
198     int numThreads = 1;	// default case
199 #ifdef THREADED
200     omp_lock_t lock[nprocs];    // a lock per recipient
201     for (int i=0; i<nprocs; i++)
202         omp_init_lock(&(lock[i]));
203 #pragma omp parallel
204     {
205         numThreads = omp_get_num_threads();
206     }
207 #endif
208 
209     std::vector < std::vector<IT> > perthread2retain(numThreads);
210 
211 #ifdef THREADED
212 #pragma omp parallel for
213 #endif
214     for(int j = 0; j < thischunk; ++j)  // for each active column
215     {
216 #ifdef THREADED
217         int myThread = omp_get_thread_num();
218 #else
219         int myThread = 0;
220 #endif
221 
222         // both clmapindex and fetchindex are unique for a given j (hence not shared among threads)
223         size_t clmapindex = j+itersuntil*chunksize;     // klimits is of the same length as actcolsmap
224         size_t fetchindex = actcolsmap[clmapindex];     // localmat can only be dereferenced using the original indices.
225 
226         // these following if/else checks are the same (because klimits/large/equal vectors are mirrored) on every processor along ColWorld
227         if(klimits[clmapindex] <= larger[j]) // the entries larger than Weighted-Median are plentiful, we can discard all the smaller/equal guys
228         {
229             std::vector<NT> survivors;
230             for(size_t k = 0; k < localmat[fetchindex].size(); ++k)
231             {
232                 if(localmat[fetchindex][k] > finalWeightedMedians[j])  // keep only the large guys (even equal guys go)
233                     survivors.push_back(localmat[fetchindex][k]);
234             }
235             localmat[fetchindex].swap(survivors);
236             perthread2retain[myThread].push_back(clmapindex);    // items to retain in actcolsmap
237         }
238         else if (klimits[clmapindex] > larger[j] + equal[j]) // the elements that are either larger or equal-to are surely keepers, no need to reprocess them
239         {
240             std::vector<NT> survivors;
241             for(size_t k = 0; k < localmat[fetchindex].size(); ++k)
242             {
243                 if(localmat[fetchindex][k] < finalWeightedMedians[j])  // keep only the small guys (even equal guys go)
244                     survivors.push_back(localmat[fetchindex][k]);
245             }
246             localmat[fetchindex].swap(survivors);
247 
248             klimits[clmapindex] -= (larger[j] + equal[j]);   // update the k limit for this column only
249             perthread2retain[myThread].push_back(clmapindex);    // items to retain in actcolsmap
250         }
251         else  	// larger[j] < klimits[clmapindex] &&  klimits[clmapindex] <= larger[j] + equal[j]
252         {	// we will always have equal[j] > 0 because the weighted median is part of the dataset so it has to be equal to itself.
253             std::vector<NT> survivors;
254             for(size_t k = 0; k < localmat[fetchindex].size(); ++k)
255             {
256                 if(localmat[fetchindex][k] >= finalWeightedMedians[j])  // keep the larger and equal to guys (might exceed k-limit but that's fine according to MCL)
257                     survivors.push_back(localmat[fetchindex][k]);
258             }
259             localmat[fetchindex].swap(survivors);
260 
261             // We found it: the kth largest element in column (coffset + fetchindex) is finalWeightedMedians[j]
262             // But everyone in the same processor column has the information, only one of them should send it
263             IT n_perproc = getlocalcols() / colneighs;  // find a typical processor's share
264             int assigned = std::max(static_cast<int>(fetchindex/n_perproc), colneighs-1);
265             if( assigned == rankincol)
266             {
267                 IT locid;
268                 int owner = rvec.Owner(coffset + fetchindex, locid);
269 
270             #ifdef THREADED
271                 omp_set_lock(&(lock[owner]));
272             #endif
273                 tmppair[owner].emplace_back(std::make_pair(locid, finalWeightedMedians[j]));
274             #ifdef THREADED
275                 omp_unset_lock(&(lock[owner]));
276             #endif
277             }
278         } // end_else
279     } // end_for
280     // ------ concatenate toretain "indices" processed by threads ------
281     std::vector<IT> tdisp(numThreads+1);
282     tdisp[0] = 0;
283     for(int i=0; i<numThreads; ++i)
284     {
285         tdisp[i+1] = tdisp[i] + perthread2retain[i].size();
286     }
287     toretain.resize(tdisp[numThreads]);
288 
289 #pragma omp parallel for
290     for(int i=0; i< numThreads; i++)
291     {
292         std::copy(perthread2retain[i].data() , perthread2retain[i].data()+ perthread2retain[i].size(), toretain.data() + tdisp[i]);
293     }
294 
295 #ifdef THREADED
296     for (int i=0; i<nprocs; i++)    // destroy the locks
297         omp_destroy_lock(&(lock[i]));
298 #endif
299 }
300 
301 
302 //! identify the k-th maximum element in each column of a matrix
303 //! if the number of nonzeros in a column is less than k, return the numeric_limits<NT>::min()
304 //! This is an efficient implementation of the Saukas/Song algorithm
305 //! http://www.ime.usp.br/~einar/select/INDEX.HTM
306 //! Preferred for large k values
307 template <class IT, class NT, class DER>
308 template <typename VT, typename GIT>	// GIT: global index type of vector
Kselect2(FullyDistVec<GIT,VT> & rvec,IT k_limit) const309 bool SpParMat<IT,NT,DER>::Kselect2(FullyDistVec<GIT,VT> & rvec, IT k_limit) const
310 {
311     if(*rvec.commGrid != *commGrid)
312     {
313         SpParHelper::Print("Grids are not comparable, SpParMat::Kselect() fails!", commGrid->GetWorld());
314         MPI_Abort(MPI_COMM_WORLD,GRIDMISMATCH);
315     }
316 
317 
318     int rankincol = commGrid->GetRankInProcCol();
319     int rankinrow = commGrid->GetRankInProcRow();
320     int rowneighs = commGrid->GetGridCols();	// get # of processors on the row
321     int colneighs = commGrid->GetGridRows();
322     int myrank = commGrid->GetRank();
323     int nprocs = commGrid->GetSize();
324 
325     FullyDistVec<GIT,IT> colcnt(commGrid);
326     Reduce(colcnt, Column, std::plus<IT>(), (IT) 0, [](NT i){ return (IT) 1;});
327 
328     // <begin> Gather vector along columns (Logic copied from DimApply)
329     int xsize = (int) colcnt.LocArrSize();
330     int trxsize = 0;
331     int diagneigh = colcnt.commGrid->GetComplementRank();
332     MPI_Status status;
333     MPI_Sendrecv(&xsize, 1, MPI_INT, diagneigh, TRX, &trxsize, 1, MPI_INT, diagneigh, TRX, commGrid->GetWorld(), &status);
334 
335     IT * trxnums = new IT[trxsize];
336     MPI_Sendrecv(const_cast<IT*>(SpHelper::p2a(colcnt.arr)), xsize, MPIType<IT>(), diagneigh, TRX, trxnums, trxsize, MPIType<IT>(), diagneigh, TRX, commGrid->GetWorld(), &status);
337 
338     int * colsize = new int[colneighs];
339     colsize[rankincol] = trxsize;
340     MPI_Allgather(MPI_IN_PLACE, 1, MPI_INT, colsize, 1, MPI_INT, commGrid->GetColWorld());
341     int * dpls = new int[colneighs]();	// displacements (zero initialized pid)
342     std::partial_sum(colsize, colsize+colneighs-1, dpls+1);
343     int accsize = std::accumulate(colsize, colsize+colneighs, 0);
344     std::vector<IT> percsum(accsize);	// per column sum of the number of entries
345 
346     MPI_Allgatherv(trxnums, trxsize, MPIType<IT>(), percsum.data(), colsize, dpls, MPIType<VT>(), commGrid->GetColWorld());
347     DeleteAll(trxnums,colsize, dpls);
348     // <end> Gather vector along columns
349 
350     IT locm = getlocalcols();   // length (number of columns) assigned to this processor (and processor column)
351     std::vector< std::vector<NT> > localmat(locm);    // some sort of minimal local copy of matrix
352 
353 #ifdef COMBBLAS_DEBUG
354     if(accsize != locm) 	std::cout << "Gather vector along columns logic is wrong" << std::endl;
355 #endif
356 
357     for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit)	// iterate over columns
358     {
359 	if(percsum[colit.colid()] >= k_limit)	// don't make a copy of an inactive column
360 	{
361 		localmat[colit.colid()].reserve(colit.nnz());
362         	for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit < spSeq->endnz(colit); ++nzit)
363        	 	{
364             		localmat[colit.colid()].push_back(nzit.value());
365         	}
366 	}
367     }
368 
369     int64_t activecols = std::count_if(percsum.begin(), percsum.end(), [k_limit](IT i){ return i >= k_limit;});
370     int64_t activennz = std::accumulate(percsum.begin(), percsum.end(), (int64_t) 0);
371 
372     int64_t totactcols, totactnnzs;
373     MPI_Allreduce(&activecols, &totactcols, 1, MPIType<int64_t>(), MPI_SUM, commGrid->GetRowWorld());
374     if(myrank == 0)   std::cout << "Number of initial active columns are " << totactcols << std::endl;
375 
376     MPI_Allreduce(&activennz, &totactnnzs, 1, MPIType<int64_t>(), MPI_SUM, commGrid->GetRowWorld());
377     if(myrank == 0)   std::cout << "Number of initial nonzeros are " << totactnnzs << std::endl;
378 
379 #ifdef COMBBLAS_DEBUG
380     IT glactcols = colcnt.Count([k_limit](IT i){ return i >= k_limit;});
381     if(myrank == 0)   std::cout << "Number of initial active columns are " << glactcols << std::endl;
382     if(glactcols != totactcols)  if(myrank == 0) std::cout << "Wrong number of active columns are computed" << std::endl;
383 #endif
384 
385 
386     rvec = FullyDistVec<GIT,VT> ( rvec.getcommgrid(), getncol(), std::numeric_limits<NT>::min());	// set length of rvec correctly
387 
388 #ifdef COMBBLAS_DEBUG
389     PrintInfo();
390     rvec.PrintInfo("rvector");
391 #endif
392 
393     if(totactcols == 0)
394     {
395         std::ostringstream ss;
396         ss << "TopK: k_limit (" << k_limit <<")" << " >= maxNnzInColumn. Returning the result of Reduce(Column, minimum<NT>()) instead..." << std::endl;
397         SpParHelper::Print(ss.str());
398         return false;   // no prune needed
399     }
400 
401 
402     std::vector<IT> actcolsmap(activecols);  // the map that gives the original index of that active column (this map will shrink over iterations)
403     for (IT i=0, j=0; i< locm; ++i) {
404         if(percsum[i] >= k_limit)
405             actcolsmap[j++] = i;
406     }
407 
408     std::vector<NT> all_medians;
409     std::vector<IT> nnz_per_col;
410     std::vector<IT> klimits(activecols, k_limit); // is distributed management of this vector needed?
411     int activecols_lowerbound = 10*colneighs;
412 
413 
414     IT * locncols = new IT[rowneighs];
415     locncols[rankinrow] = locm;
416     MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(),locncols, 1, MPIType<IT>(), commGrid->GetRowWorld());
417     IT coffset = std::accumulate(locncols, locncols+rankinrow, static_cast<IT>(0));
418     delete [] locncols;
419 
420     /* Create/allocate variables for vector assignment */
421     MPI_Datatype MPI_pair;
422     MPI_Type_contiguous(sizeof(std::pair<IT,NT>), MPI_CHAR, &MPI_pair);
423     MPI_Type_commit(&MPI_pair);
424 
425     int * sendcnt = new int[nprocs];
426     int * recvcnt = new int[nprocs];
427     int * sdispls = new int[nprocs]();
428     int * rdispls = new int[nprocs]();
429 
430     while(totactcols > 0)
431     {
432         int chunksize, iterations, lastchunk;
433         if(activecols > activecols_lowerbound)
434         {
435             // two reasons for chunking:
436             // (1) keep memory limited to activecols (<= n/sqrt(p))
437             // (2) avoid overflow in sentcount
438             chunksize = static_cast<int>(activecols/colneighs); // invariant chunksize >= 10 (by activecols_lowerbound)
439             iterations = std::max(static_cast<int>(activecols/chunksize), 1);
440             lastchunk = activecols - (iterations-1)*chunksize; // lastchunk >= chunksize by construction
441         }
442         else
443         {
444             chunksize = activecols;
445             iterations = 1;
446             lastchunk = activecols;
447         }
448         std::vector<NT> activemedians(activecols);   // one per "active" column
449         std::vector<IT> activennzperc(activecols);
450 
451 #ifdef THREADED
452 #pragma omp parallel for
453 #endif
454         for(IT i=0; i< activecols; ++i) // recompute the medians and nnzperc
455         {
456             size_t orgindex = actcolsmap[i];	// assert: no two threads will share the same "orgindex"
457             if(localmat[orgindex].empty())
458             {
459                 activemedians[i] = (NT) 0;
460                 activennzperc[i] = 0;
461             }
462             else
463             {
464                 // this actually *sorts* increasing but doesn't matter as long we solely care about the median as opposed to a general nth element
465                 auto entriesincol(localmat[orgindex]);   // create a temporary vector as nth_element modifies the vector
466                 std::nth_element(entriesincol.begin(), entriesincol.begin() + entriesincol.size()/2, entriesincol.end());
467                 activemedians[i] = entriesincol[entriesincol.size()/2];
468                 activennzperc[i] = entriesincol.size();
469             }
470         }
471 
472         percsum.resize(activecols, 0);
473         MPI_Allreduce(activennzperc.data(), percsum.data(), activecols, MPIType<IT>(), MPI_SUM, commGrid->GetColWorld());
474         activennz = std::accumulate(percsum.begin(), percsum.end(), (int64_t) 0);
475 
476 #ifdef COMBBLAS_DEBUG
477         MPI_Allreduce(&activennz, &totactnnzs, 1, MPIType<int64_t>(), MPI_SUM, commGrid->GetRowWorld());
478         if(myrank == 0)   std::cout << "Number of active nonzeros are " << totactnnzs << std::endl;
479 #endif
480 
481         std::vector<IT> toretain;
482         if(rankincol == 0)
483         {
484             all_medians.resize(lastchunk*colneighs);
485             nnz_per_col.resize(lastchunk*colneighs);
486         }
487         std::vector< std::vector< std::pair<IT,NT> > > tmppair(nprocs);
488         for(int i=0; i< iterations-1; ++i)  // this loop should not be parallelized if we want to keep storage small
489         {
490             TopKGather(all_medians, nnz_per_col, chunksize, chunksize, activemedians, activennzperc, i, localmat, actcolsmap, klimits, toretain, tmppair, coffset, rvec);
491         }
492         TopKGather(all_medians, nnz_per_col, lastchunk, chunksize, activemedians, activennzperc, iterations-1, localmat, actcolsmap, klimits, toretain, tmppair, coffset, rvec);
493 
494         /* Set the newly found vector entries */
495         IT totsend = 0;
496         for(IT i=0; i<nprocs; ++i)
497         {
498             sendcnt[i] = tmppair[i].size();
499             totsend += tmppair[i].size();
500         }
501 
502         MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, commGrid->GetWorld());
503 
504         std::partial_sum(sendcnt, sendcnt+nprocs-1, sdispls+1);
505         std::partial_sum(recvcnt, recvcnt+nprocs-1, rdispls+1);
506         IT totrecv = std::accumulate(recvcnt,recvcnt+nprocs, static_cast<IT>(0));
507 	assert((totsend < std::numeric_limits<int>::max()));
508 	assert((totrecv < std::numeric_limits<int>::max()));
509 
510         std::pair<IT,NT> * sendpair = new std::pair<IT,NT>[totsend];
511         for(int i=0; i<nprocs; ++i)
512         {
513             std::copy(tmppair[i].begin(), tmppair[i].end(), sendpair+sdispls[i]);
514             std::vector< std::pair<IT,NT> >().swap(tmppair[i]);	// clear memory
515         }
516         std::vector< std::pair<IT,NT> > recvpair(totrecv);
517         MPI_Alltoallv(sendpair, sendcnt, sdispls, MPI_pair, recvpair.data(), recvcnt, rdispls, MPI_pair, commGrid->GetWorld());
518         delete [] sendpair;
519 
520         IT updated = 0;
521         for(auto & update : recvpair )    // Now, write these to rvec
522         {
523             updated++;
524             rvec.arr[update.first] =  update.second;
525         }
526 #ifdef COMBBLAS_DEBUG
527         MPI_Allreduce(MPI_IN_PLACE, &updated, 1, MPIType<IT>(), MPI_SUM, commGrid->GetWorld());
528         if(myrank  == 0) std::cout << "Total vector entries updated " << updated << std::endl;
529 #endif
530 
531         /* End of setting up the newly found vector entries */
532 
533         std::vector<IT> newactivecols(toretain.size());
534         std::vector<IT> newklimits(toretain.size());
535         IT newindex = 0;
536         for(auto & retind : toretain )
537         {
538             newactivecols[newindex] = actcolsmap[retind];
539             newklimits[newindex++] = klimits[retind];
540         }
541         actcolsmap.swap(newactivecols);
542         klimits.swap(newklimits);
543         activecols = actcolsmap.size();
544 
545         MPI_Allreduce(&activecols, &totactcols, 1, MPIType<int64_t>(), MPI_SUM, commGrid->GetRowWorld());
546 #ifdef COMBBLAS_DEBUG
547         if(myrank  == 0) std::cout << "Number of active columns are " << totactcols << std::endl;
548 #endif
549     }
550     DeleteAll(sendcnt, recvcnt, sdispls, rdispls);
551     MPI_Type_free(&MPI_pair);
552 
553 #ifdef COMBBLAS_DEBUG
554     if(myrank == 0)   std::cout << "Exiting kselect2"<< std::endl;
555 #endif
556     return true;    // prune needed
557 }
558 
559 
560 
561 template <class IT, class NT, class DER>
Dump(std::string filename) const562 void SpParMat< IT,NT,DER >::Dump(std::string filename) const
563 {
564 	MPI_Comm World = commGrid->GetWorld();
565 	int rank = commGrid->GetRank();
566 	int nprocs = commGrid->GetSize();
567 
568 	MPI_File thefile;
569     char * _fn = const_cast<char*>(filename.c_str());
570 	MPI_File_open(World, _fn, MPI_MODE_CREATE | MPI_MODE_WRONLY, MPI_INFO_NULL, &thefile);
571 
572 	int rankinrow = commGrid->GetRankInProcRow();
573 	int rankincol = commGrid->GetRankInProcCol();
574 	int rowneighs = commGrid->GetGridCols();	// get # of processors on the row
575 	int colneighs = commGrid->GetGridRows();
576 
577 	IT * colcnts = new IT[rowneighs];
578 	IT * rowcnts = new IT[colneighs];
579 	rowcnts[rankincol] = getlocalrows();
580 	colcnts[rankinrow] = getlocalcols();
581 
582 	MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(), colcnts, 1, MPIType<IT>(), commGrid->GetRowWorld());
583 	IT coloffset = std::accumulate(colcnts, colcnts+rankinrow, static_cast<IT>(0));
584 
585 	MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(), rowcnts, 1, MPIType<IT>(), commGrid->GetColWorld());
586 	IT rowoffset = std::accumulate(rowcnts, rowcnts+rankincol, static_cast<IT>(0));
587 	DeleteAll(colcnts, rowcnts);
588 
589 	IT * prelens = new IT[nprocs];
590 	prelens[rank] = 2*getlocalnnz();
591 	MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(), prelens, 1, MPIType<IT>(), commGrid->GetWorld());
592 	IT lengthuntil = std::accumulate(prelens, prelens+rank, static_cast<IT>(0));
593 
594 	// The disp displacement argument specifies the position
595 	// (absolute offset in bytes from the beginning of the file)
596 	MPI_Offset disp = lengthuntil * sizeof(uint32_t);
597 	char native[] = "native";
598 	MPI_File_set_view(thefile, disp, MPI_UNSIGNED, MPI_UNSIGNED, native, MPI_INFO_NULL); // AL: the second-to-last argument is a non-const char* (looks like poor MPI standardization, the C++ bindings list it as const), C++ string literals MUST be const (especially in c++11).
599 	uint32_t * gen_edges = new uint32_t[prelens[rank]];
600 
601 	IT k = 0;
602 	for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit)	// iterate over columns
603 	{
604 		for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit)
605 		{
606 			gen_edges[k++] = (uint32_t) (nzit.rowid() + rowoffset);
607 			gen_edges[k++] = (uint32_t) (colit.colid() +  coloffset);
608 		}
609 	}
610 	assert(k == prelens[rank]);
611 	MPI_File_write(thefile, gen_edges, prelens[rank], MPI_UNSIGNED, NULL);
612 	MPI_File_close(&thefile);
613 
614 	delete [] prelens;
615 	delete [] gen_edges;
616 }
617 
618 template <class IT, class NT, class DER>
SpParMat(const SpParMat<IT,NT,DER> & rhs)619 SpParMat< IT,NT,DER >::SpParMat (const SpParMat< IT,NT,DER > & rhs)
620 {
621 	if(rhs.spSeq != NULL)
622 		spSeq = new DER(*(rhs.spSeq));  	// Deep copy of local block
623 
624 	commGrid =  rhs.commGrid;
625 }
626 
627 template <class IT, class NT, class DER>
operator =(const SpParMat<IT,NT,DER> & rhs)628 SpParMat< IT,NT,DER > & SpParMat< IT,NT,DER >::operator=(const SpParMat< IT,NT,DER > & rhs)
629 {
630 	if(this != &rhs)
631 	{
632 		//! Check agains NULL is probably unneccessary, delete won't fail on NULL
633 		//! But useful in the presence of a user defined "operator delete" which fails to check NULL
634 		if(spSeq != NULL) delete spSeq;
635 		if(rhs.spSeq != NULL)
636 			spSeq = new DER(*(rhs.spSeq));  // Deep copy of local block
637 
638 		commGrid = rhs.commGrid;
639 	}
640 	return *this;
641 }
642 
643 template <class IT, class NT, class DER>
operator +=(const SpParMat<IT,NT,DER> & rhs)644 SpParMat< IT,NT,DER > & SpParMat< IT,NT,DER >::operator+=(const SpParMat< IT,NT,DER > & rhs)
645 {
646 	if(this != &rhs)
647 	{
648 		if(*commGrid == *rhs.commGrid)
649 		{
650 			(*spSeq) += (*(rhs.spSeq));
651 		}
652 		else
653 		{
654 			std::cout << "Grids are not comparable for parallel addition (A+B)" << std::endl;
655 		}
656 	}
657 	else
658 	{
659 		std::cout<< "Missing feature (A+A): Use multiply with 2 instead !"<<std::endl;
660 	}
661 	return *this;
662 }
663 
664 template <class IT, class NT, class DER>
LoadImbalance() const665 float SpParMat< IT,NT,DER >::LoadImbalance() const
666 {
667 	IT totnnz = getnnz();	// collective call
668 	IT maxnnz = 0;
669 	IT localnnz = (IT) spSeq->getnnz();
670 	MPI_Allreduce( &localnnz, &maxnnz, 1, MPIType<IT>(), MPI_MAX, commGrid->GetWorld());
671 	if(totnnz == 0) return 1;
672  	return static_cast<float>((commGrid->GetSize() * maxnnz)) / static_cast<float>(totnnz);
673 }
674 
675 template <class IT, class NT, class DER>
676 IT SpParMat< IT,NT,DER >::getnnz() const
677 {
678 	IT totalnnz = 0;
679 	IT localnnz = spSeq->getnnz();
680 	MPI_Allreduce( &localnnz, &totalnnz, 1, MPIType<IT>(), MPI_SUM, commGrid->GetWorld());
681  	return totalnnz;
682 }
683 
684 template <class IT, class NT, class DER>
685 IT SpParMat< IT,NT,DER >::getnrow() const
686 {
687 	IT totalrows = 0;
688 	IT localrows = spSeq->getnrow();
689 	MPI_Allreduce( &localrows, &totalrows, 1, MPIType<IT>(), MPI_SUM, commGrid->GetColWorld());
690  	return totalrows;
691 }
692 
693 template <class IT, class NT, class DER>
694 IT SpParMat< IT,NT,DER >::getncol() const
695 {
696 	IT totalcols = 0;
697 	IT localcols = spSeq->getncol();
698 	MPI_Allreduce( &localcols, &totalcols, 1, MPIType<IT>(), MPI_SUM, commGrid->GetRowWorld());
699  	return totalcols;
700 }
701 
702 template <class IT, class NT, class DER>
703 template <typename _BinaryOperation>
DimApply(Dim dim,const FullyDistVec<IT,NT> & x,_BinaryOperation __binary_op)704 void SpParMat<IT,NT,DER>::DimApply(Dim dim, const FullyDistVec<IT, NT>& x, _BinaryOperation __binary_op)
705 {
706 
707 	if(!(*commGrid == *(x.commGrid)))
708 	{
709 		std::cout << "Grids are not comparable for SpParMat::DimApply" << std::endl;
710 		MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
711 	}
712 
713 	MPI_Comm World = x.commGrid->GetWorld();
714 	MPI_Comm ColWorld = x.commGrid->GetColWorld();
715 	MPI_Comm RowWorld = x.commGrid->GetRowWorld();
716 	switch(dim)
717 	{
718 		case Column:	// scale each column
719 		{
720 			int xsize = (int) x.LocArrSize();
721 			int trxsize = 0;
722 			int diagneigh = x.commGrid->GetComplementRank();
723 			MPI_Status status;
724 			MPI_Sendrecv(&xsize, 1, MPI_INT, diagneigh, TRX, &trxsize, 1, MPI_INT, diagneigh, TRX, World, &status);
725 
726 			NT * trxnums = new NT[trxsize];
727 			MPI_Sendrecv(const_cast<NT*>(SpHelper::p2a(x.arr)), xsize, MPIType<NT>(), diagneigh, TRX, trxnums, trxsize, MPIType<NT>(), diagneigh, TRX, World, &status);
728 
729 			int colneighs, colrank;
730 			MPI_Comm_size(ColWorld, &colneighs);
731 			MPI_Comm_rank(ColWorld, &colrank);
732 			int * colsize = new int[colneighs];
733 			colsize[colrank] = trxsize;
734 
735 			MPI_Allgather(MPI_IN_PLACE, 1, MPI_INT, colsize, 1, MPI_INT, ColWorld);
736 			int * dpls = new int[colneighs]();	// displacements (zero initialized pid)
737 			std::partial_sum(colsize, colsize+colneighs-1, dpls+1);
738 			int accsize = std::accumulate(colsize, colsize+colneighs, 0);
739 			NT * scaler = new NT[accsize];
740 
741 			MPI_Allgatherv(trxnums, trxsize, MPIType<NT>(), scaler, colsize, dpls, MPIType<NT>(), ColWorld);
742 			DeleteAll(trxnums,colsize, dpls);
743 
744 			for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit)	// iterate over columns
745 			{
746 				for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit)
747 				{
748 					nzit.value() = __binary_op(nzit.value(), scaler[colit.colid()]);
749 				}
750 			}
751 			delete [] scaler;
752 			break;
753 		}
754 		case Row:
755 		{
756 			int xsize = (int) x.LocArrSize();
757 			int rowneighs, rowrank;
758 			MPI_Comm_size(RowWorld, &rowneighs);
759 			MPI_Comm_rank(RowWorld, &rowrank);
760 			int * rowsize = new int[rowneighs];
761 			rowsize[rowrank] = xsize;
762 			MPI_Allgather(MPI_IN_PLACE, 1, MPI_INT, rowsize, 1, MPI_INT, RowWorld);
763 			int * dpls = new int[rowneighs]();	// displacements (zero initialized pid)
764 			std::partial_sum(rowsize, rowsize+rowneighs-1, dpls+1);
765 			int accsize = std::accumulate(rowsize, rowsize+rowneighs, 0);
766 			NT * scaler = new NT[accsize];
767 
768 			MPI_Allgatherv(const_cast<NT*>(SpHelper::p2a(x.arr)), xsize, MPIType<NT>(), scaler, rowsize, dpls, MPIType<NT>(), RowWorld);
769 			DeleteAll(rowsize, dpls);
770 
771 			for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit)
772 			{
773 				for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit)
774 				{
775 					nzit.value() = __binary_op(nzit.value(), scaler[nzit.rowid()]);
776 				}
777 			}
778 			delete [] scaler;
779 			break;
780 		}
781 		default:
782 		{
783 			std::cout << "Unknown scaling dimension, returning..." << std::endl;
784 			break;
785 		}
786 	}
787 }
788 
789 template <class IT, class NT, class DER>
790 template <typename _BinaryOperation, typename _UnaryOperation >
Reduce(Dim dim,_BinaryOperation __binary_op,NT id,_UnaryOperation __unary_op) const791 FullyDistVec<IT,NT> SpParMat<IT,NT,DER>::Reduce(Dim dim, _BinaryOperation __binary_op, NT id, _UnaryOperation __unary_op) const
792 {
793     IT length;
794     switch(dim)
795     {
796         case Column:
797         {
798             length = getncol();
799             break;
800         }
801         case Row:
802         {
803             length = getnrow();
804             break;
805         }
806         default:
807         {
808             std::cout << "Unknown reduction dimension, returning empty vector" << std::endl;
809             break;
810         }
811     }
812 	FullyDistVec<IT,NT> parvec(commGrid, length, id);
813 	Reduce(parvec, dim, __binary_op, id, __unary_op);
814 	return parvec;
815 }
816 
817 template <class IT, class NT, class DER>
818 template <typename _BinaryOperation>
Reduce(Dim dim,_BinaryOperation __binary_op,NT id) const819 FullyDistVec<IT,NT> SpParMat<IT,NT,DER>::Reduce(Dim dim, _BinaryOperation __binary_op, NT id) const
820 {
821     IT length;
822     switch(dim)
823     {
824         case Column:
825         {
826             length = getncol();
827             break;
828         }
829         case Row:
830         {
831             length = getnrow();
832             break;
833         }
834         default:
835         {
836             std::cout << "Unknown reduction dimension, returning empty vector" << std::endl;
837             break;
838         }
839     }
840 	FullyDistVec<IT,NT> parvec(commGrid, length, id);
841 	Reduce(parvec, dim, __binary_op, id, myidentity<NT>()); // myidentity<NT>() is a no-op function
842 	return parvec;
843 }
844 
845 
846 template <class IT, class NT, class DER>
847 template <typename VT, typename GIT, typename _BinaryOperation>
Reduce(FullyDistVec<GIT,VT> & rvec,Dim dim,_BinaryOperation __binary_op,VT id) const848 void SpParMat<IT,NT,DER>::Reduce(FullyDistVec<GIT,VT> & rvec, Dim dim, _BinaryOperation __binary_op, VT id) const
849 {
850 	Reduce(rvec, dim, __binary_op, id, myidentity<NT>() );
851 }
852 
853 
854 template <class IT, class NT, class DER>
855 template <typename VT, typename GIT, typename _BinaryOperation, typename _UnaryOperation>	// GIT: global index type of vector
Reduce(FullyDistVec<GIT,VT> & rvec,Dim dim,_BinaryOperation __binary_op,VT id,_UnaryOperation __unary_op) const856 void SpParMat<IT,NT,DER>::Reduce(FullyDistVec<GIT,VT> & rvec, Dim dim, _BinaryOperation __binary_op, VT id, _UnaryOperation __unary_op) const
857 {
858     Reduce(rvec, dim, __binary_op, id, __unary_op, MPIOp<_BinaryOperation, VT>::op() );
859 }
860 
861 
862 template <class IT, class NT, class DER>
863 template <typename VT, typename GIT, typename _BinaryOperation, typename _UnaryOperation>	// GIT: global index type of vector
Reduce(FullyDistVec<GIT,VT> & rvec,Dim dim,_BinaryOperation __binary_op,VT id,_UnaryOperation __unary_op,MPI_Op mympiop) const864 void SpParMat<IT,NT,DER>::Reduce(FullyDistVec<GIT,VT> & rvec, Dim dim, _BinaryOperation __binary_op, VT id, _UnaryOperation __unary_op, MPI_Op mympiop) const
865 {
866 	if(*rvec.commGrid != *commGrid)
867 	{
868 		SpParHelper::Print("Grids are not comparable, SpParMat::Reduce() fails!", commGrid->GetWorld());
869 		MPI_Abort(MPI_COMM_WORLD,GRIDMISMATCH);
870 	}
871 	switch(dim)
872 	{
873 		case Column:	// pack along the columns, result is a vector of size n
874 		{
875 			// We can't use rvec's distribution (rows first, columns later) here
876 			// because the ownership model of the vector has the order P(0,0), P(0,1),...
877 			// column reduction will first generate vector distribution in P(0,0), P(1,0),... order.
878 
879 			IT n_thiscol = getlocalcols();   // length assigned to this processor column
880 			int colneighs = commGrid->GetGridRows();	// including oneself
881             		int colrank = commGrid->GetRankInProcCol();
882 
883 			GIT * loclens = new GIT[colneighs];
884 			GIT * lensums = new GIT[colneighs+1]();	// begin/end points of local lengths
885 
886             		GIT n_perproc = n_thiscol / colneighs;    // length on a typical processor
887             		if(colrank == colneighs-1)
888                 		loclens[colrank] = n_thiscol - (n_perproc*colrank);
889             		else
890                 		loclens[colrank] = n_perproc;
891 
892 			MPI_Allgather(MPI_IN_PLACE, 0, MPIType<GIT>(), loclens, 1, MPIType<GIT>(), commGrid->GetColWorld());
893 			std::partial_sum(loclens, loclens+colneighs, lensums+1);	// loclens and lensums are different, but both would fit in 32-bits
894 
895 			std::vector<VT> trarr;
896 			typename DER::SpColIter colit = spSeq->begcol();
897 			for(int i=0; i< colneighs; ++i)
898 			{
899 				VT * sendbuf = new VT[loclens[i]];
900 				std::fill(sendbuf, sendbuf+loclens[i], id);	// fill with identity
901 
902 				for(; colit != spSeq->endcol() && colit.colid() < lensums[i+1]; ++colit)	// iterate over a portion of columns
903 				{
904 					for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit)	// all nonzeros in this column
905 					{
906 						sendbuf[colit.colid()-lensums[i]] = __binary_op(static_cast<VT>(__unary_op(nzit.value())), sendbuf[colit.colid()-lensums[i]]);
907 					}
908 				}
909 
910 				VT * recvbuf = NULL;
911 				if(colrank == i)
912 				{
913 					trarr.resize(loclens[i]);
914 					recvbuf = SpHelper::p2a(trarr);
915 				}
916 				MPI_Reduce(sendbuf, recvbuf, loclens[i], MPIType<VT>(), mympiop, i, commGrid->GetColWorld()); // root  = i
917 				delete [] sendbuf;
918 			}
919 			DeleteAll(loclens, lensums);
920 
921 			GIT reallen;	// Now we have to transpose the vector
922 			GIT trlen = trarr.size();
923 			int diagneigh = commGrid->GetComplementRank();
924 			MPI_Status status;
925 			MPI_Sendrecv(&trlen, 1, MPIType<IT>(), diagneigh, TRNNZ, &reallen, 1, MPIType<IT>(), diagneigh, TRNNZ, commGrid->GetWorld(), &status);
926 
927 			rvec.arr.resize(reallen);
928 			MPI_Sendrecv(SpHelper::p2a(trarr), trlen, MPIType<VT>(), diagneigh, TRX, SpHelper::p2a(rvec.arr), reallen, MPIType<VT>(), diagneigh, TRX, commGrid->GetWorld(), &status);
929 			rvec.glen = getncol();	// ABAB: Put a sanity check here
930 			break;
931 
932 		}
933 		case Row:	// pack along the rows, result is a vector of size m
934 		{
935 			rvec.glen = getnrow();
936 			int rowneighs = commGrid->GetGridCols();
937 			int rowrank = commGrid->GetRankInProcRow();
938 			GIT * loclens = new GIT[rowneighs];
939 			GIT * lensums = new GIT[rowneighs+1]();	// begin/end points of local lengths
940 			loclens[rowrank] = rvec.MyLocLength();
941 			MPI_Allgather(MPI_IN_PLACE, 0, MPIType<GIT>(), loclens, 1, MPIType<GIT>(), commGrid->GetRowWorld());
942 			std::partial_sum(loclens, loclens+rowneighs, lensums+1);
943 			try
944 			{
945 				rvec.arr.resize(loclens[rowrank], id);
946 
947 				// keeping track of all nonzero iterators within columns at once is unscalable w.r.t. memory (due to sqrt(p) scaling)
948 				// thus we'll do batches of column as opposed to all columns at once. 5 million columns take 80MB (two pointers per column)
949 				#define MAXCOLUMNBATCH 5 * 1024 * 1024
950 				typename DER::SpColIter begfinger = spSeq->begcol();	// beginning finger to columns
951 
952 				// Each processor on the same processor row should execute the SAME number of reduce calls
953 				int numreducecalls = (int) ceil(static_cast<float>(spSeq->getnzc()) / static_cast<float>(MAXCOLUMNBATCH));
954 				int maxreducecalls;
955 				MPI_Allreduce( &numreducecalls, &maxreducecalls, 1, MPI_INT, MPI_MAX, commGrid->GetRowWorld());
956 
957 				for(int k=0; k< maxreducecalls; ++k)
958 				{
959 					std::vector<typename DER::SpColIter::NzIter> nziters;
960 					typename DER::SpColIter curfinger = begfinger;
961 					for(; curfinger != spSeq->endcol() && nziters.size() < MAXCOLUMNBATCH ; ++curfinger)
962 					{
963 						nziters.push_back(spSeq->begnz(curfinger));
964 					}
965 					for(int i=0; i< rowneighs; ++i)		// step by step to save memory
966 					{
967 						VT * sendbuf = new VT[loclens[i]];
968 						std::fill(sendbuf, sendbuf+loclens[i], id);	// fill with identity
969 
970 						typename DER::SpColIter colit = begfinger;
971 						IT colcnt = 0;	// "processed column" counter
972 						for(; colit != curfinger; ++colit, ++colcnt)	// iterate over this batch of columns until curfinger
973 						{
974 							typename DER::SpColIter::NzIter nzit = nziters[colcnt];
975 							for(; nzit != spSeq->endnz(colit) && nzit.rowid() < lensums[i+1]; ++nzit)	// a portion of nonzeros in this column
976 							{
977 								sendbuf[nzit.rowid()-lensums[i]] = __binary_op(static_cast<VT>(__unary_op(nzit.value())), sendbuf[nzit.rowid()-lensums[i]]);
978 							}
979 							nziters[colcnt] = nzit;	// set the new finger
980 						}
981 
982 						VT * recvbuf = NULL;
983 						if(rowrank == i)
984 						{
985 							for(int j=0; j< loclens[i]; ++j)
986 							{
987 								sendbuf[j] = __binary_op(rvec.arr[j], sendbuf[j]);	// rvec.arr will be overriden with MPI_Reduce, save its contents
988 							}
989 							recvbuf = SpHelper::p2a(rvec.arr);
990 						}
991 						MPI_Reduce(sendbuf, recvbuf, loclens[i], MPIType<VT>(), mympiop, i, commGrid->GetRowWorld()); // root = i
992 						delete [] sendbuf;
993 					}
994 					begfinger = curfinger;	// set the next begfilter
995 				}
996 				DeleteAll(loclens, lensums);
997 			}
998 			catch (std::length_error& le)
999 			{
1000 	 			 std::cerr << "Length error: " << le.what() << std::endl;
1001   			}
1002 			break;
1003 		}
1004 		default:
1005 		{
1006 			std::cout << "Unknown reduction dimension, returning empty vector" << std::endl;
1007 			break;
1008 		}
1009 	}
1010 }
1011 
1012 #ifndef KSELECTLIMIT
1013 #define KSELECTLIMIT 10000
1014 #endif
1015 
1016 //! Kselect wrapper for a select columns of the matrix
1017 //! Indices of the input sparse vectors kth denote the queried columns of the matrix
1018 //! Upon return, values of kth stores the kth entries of the queried columns
1019 //! Returns true if Kselect algorithm is invoked for at least one column
1020 //! Otherwise, returns false
1021 template <class IT, class NT, class DER>
1022 template <typename VT, typename GIT>
Kselect(FullyDistSpVec<GIT,VT> & kth,IT k_limit,int kselectVersion) const1023 bool SpParMat<IT,NT,DER>::Kselect(FullyDistSpVec<GIT,VT> & kth, IT k_limit, int kselectVersion) const
1024 {
1025 #ifdef COMBBLAS_DEBUG
1026     FullyDistVec<GIT,VT> test1(kth.getcommgrid());
1027     FullyDistVec<GIT,VT> test2(kth.getcommgrid());
1028     Kselect1(test1, k_limit, myidentity<NT>());
1029     Kselect2(test2, k_limit);
1030     if(test1 == test2)
1031         SpParHelper::Print("Kselect1 and Kselect2 producing same results\n");
1032     else
1033     {
1034         SpParHelper::Print("WARNING: Kselect1 and Kselect2 producing DIFFERENT results\n");
1035         test1.PrintToFile("test1");
1036         test2.PrintToFile("test2");
1037     }
1038 #endif
1039 
1040     if(kselectVersion==1 || k_limit < KSELECTLIMIT)
1041     {
1042         return Kselect1(kth, k_limit, myidentity<NT>());
1043     }
1044     else
1045     {
1046         FullyDistVec<GIT,VT> kthAll ( getcommgrid());
1047         bool ret = Kselect2(kthAll, k_limit);
1048         FullyDistSpVec<GIT,VT> temp = EWiseApply<VT>(kth, kthAll,
1049                                                      [](VT spval, VT dval){return dval;},
1050                                                      [](VT spval, VT dval){return true;},
1051                                                      false, NT());
1052         kth = temp;
1053         return ret;
1054     }
1055 }
1056 
1057 
1058 //! Returns true if Kselect algorithm is invoked for at least one column
1059 //! Otherwise, returns false
1060 //! if false, rvec contains either the minimum entry in each column or zero
1061 template <class IT, class NT, class DER>
1062 template <typename VT, typename GIT>
Kselect(FullyDistVec<GIT,VT> & rvec,IT k_limit,int kselectVersion) const1063 bool SpParMat<IT,NT,DER>::Kselect(FullyDistVec<GIT,VT> & rvec, IT k_limit, int kselectVersion) const
1064 {
1065 #ifdef COMBBLAS_DEBUG
1066     FullyDistVec<GIT,VT> test1(rvec.getcommgrid());
1067     FullyDistVec<GIT,VT> test2(rvec.getcommgrid());
1068     Kselect1(test1, k_limit, myidentity<NT>());
1069     Kselect2(test2, k_limit);
1070     if(test1 == test2)
1071         SpParHelper::Print("Kselect1 and Kselect2 producing same results\n");
1072     else
1073     {
1074         SpParHelper::Print("WARNING: Kselect1 and Kselect2 producing DIFFERENT results\n");
1075         //test1.PrintToFile("test1");
1076         //test2.PrintToFile("test2");
1077     }
1078 #endif
1079 
1080     if(kselectVersion==1 || k_limit < KSELECTLIMIT)
1081         return Kselect1(rvec, k_limit, myidentity<NT>());
1082     else
1083         return Kselect2(rvec, k_limit);
1084 
1085 }
1086 
1087 /* identify the k-th maximum element in each column of a matrix
1088 ** if the number of nonzeros in a column is less than or equal to k, return minimum entry
1089 ** Caution: this is a preliminary implementation: needs 3*(n/sqrt(p))*k memory per processor
1090 ** this memory requirement is too high for larger k
1091  */
1092 template <class IT, class NT, class DER>
1093 template <typename VT, typename GIT, typename _UnaryOperation>	// GIT: global index type of vector
Kselect1(FullyDistVec<GIT,VT> & rvec,IT k,_UnaryOperation __unary_op) const1094 bool SpParMat<IT,NT,DER>::Kselect1(FullyDistVec<GIT,VT> & rvec, IT k, _UnaryOperation __unary_op) const
1095 {
1096     if(*rvec.commGrid != *commGrid)
1097     {
1098         SpParHelper::Print("Grids are not comparable, SpParMat::Kselect() fails!", commGrid->GetWorld());
1099         MPI_Abort(MPI_COMM_WORLD,GRIDMISMATCH);
1100     }
1101 
1102     FullyDistVec<IT, IT> nnzPerColumn (getcommgrid());
1103     Reduce(nnzPerColumn, Column, std::plus<IT>(), (IT)0, [](NT val){return (IT)1;});
1104     IT maxnnzPerColumn = nnzPerColumn.Reduce(maximum<IT>(), (IT)0);
1105     if(k>maxnnzPerColumn)
1106     {
1107         SpParHelper::Print("Kselect: k is greater then maxNnzInColumn. Calling Reduce instead...\n");
1108         Reduce(rvec, Column, minimum<NT>(), static_cast<NT>(0));
1109         return false;
1110     }
1111 
1112     IT n_thiscol = getlocalcols();   // length (number of columns) assigned to this processor (and processor column)
1113 
1114     // check, memory should be min(n_thiscol*k, local nnz)
1115     // hence we will not overflow for very large k
1116     std::vector<VT> sendbuf(n_thiscol*k);
1117     std::vector<IT> send_coldisp(n_thiscol+1,0);
1118     std::vector<IT> local_coldisp(n_thiscol+1,0);
1119 
1120 
1121     //displacement of local columns
1122     //local_coldisp is the displacement of all nonzeros per column
1123     //send_coldisp is the displacement of k nonzeros per column
1124     IT nzc = 0;
1125     if(spSeq->getnnz()>0)
1126     {
1127         typename DER::SpColIter colit = spSeq->begcol();
1128         for(IT i=0; i<n_thiscol; ++i)
1129         {
1130             local_coldisp[i+1] = local_coldisp[i];
1131             send_coldisp[i+1] = send_coldisp[i];
1132             if(i==colit.colid())
1133             {
1134                 local_coldisp[i+1] += colit.nnz();
1135                 if(colit.nnz()>=k)
1136                     send_coldisp[i+1] += k;
1137                 else
1138                     send_coldisp[i+1] += colit.nnz();
1139                 colit++;
1140                 nzc++;
1141             }
1142         }
1143     }
1144     assert(local_coldisp[n_thiscol] == spSeq->getnnz());
1145 
1146     // a copy of local part of the matrix
1147     // this can be avoided if we write our own local kselect function instead of using partial_sort
1148     std::vector<VT> localmat(spSeq->getnnz());
1149 
1150 
1151 #ifdef THREADED
1152 #pragma omp parallel for
1153 #endif
1154     for(IT i=0; i<nzc; i++)
1155     //for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit)	// iterate over columns
1156     {
1157         typename DER::SpColIter colit = spSeq->begcol() + i;
1158         IT colid = colit.colid();
1159         IT idx = local_coldisp[colid];
1160         for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit < spSeq->endnz(colit); ++nzit)
1161         {
1162             localmat[idx++] = static_cast<VT>(__unary_op(nzit.value()));
1163         }
1164 
1165         if(colit.nnz()<=k)
1166         {
1167             std::sort(localmat.begin()+local_coldisp[colid], localmat.begin()+local_coldisp[colid+1], std::greater<VT>());
1168             std::copy(localmat.begin()+local_coldisp[colid], localmat.begin()+local_coldisp[colid+1], sendbuf.begin()+send_coldisp[colid]);
1169         }
1170         else
1171         {
1172             std::partial_sort(localmat.begin()+local_coldisp[colid], localmat.begin()+local_coldisp[colid]+k, localmat.begin()+local_coldisp[colid+1], std::greater<VT>());
1173             std::copy(localmat.begin()+local_coldisp[colid], localmat.begin()+local_coldisp[colid]+k, sendbuf.begin()+send_coldisp[colid]);
1174         }
1175     }
1176 
1177     std::vector<VT>().swap(localmat);
1178     std::vector<IT>().swap(local_coldisp);
1179 
1180     std::vector<VT> recvbuf(n_thiscol*k);
1181     std::vector<VT> tempbuf(n_thiscol*k);
1182     std::vector<IT> recv_coldisp(n_thiscol+1);
1183     std::vector<IT> templen(n_thiscol);
1184 
1185     int colneighs = commGrid->GetGridRows();
1186     int colrank = commGrid->GetRankInProcCol();
1187 
1188     for(int p=2; p <= colneighs; p*=2)
1189     {
1190 
1191         if(colrank%p == p/2) // this processor is a sender in this round
1192         {
1193             int receiver = colrank - ceil(p/2);
1194             MPI_Send(send_coldisp.data(), n_thiscol+1, MPIType<IT>(), receiver, 0, commGrid->GetColWorld());
1195             MPI_Send(sendbuf.data(), send_coldisp[n_thiscol], MPIType<VT>(), receiver, 1, commGrid->GetColWorld());
1196             //break;
1197         }
1198         else if(colrank%p == 0) // this processor is a receiver in this round
1199         {
1200             int sender = colrank + ceil(p/2);
1201             if(sender < colneighs)
1202             {
1203 
1204                 MPI_Recv(recv_coldisp.data(), n_thiscol+1, MPIType<IT>(), sender, 0, commGrid->GetColWorld(), MPI_STATUS_IGNORE);
1205                 MPI_Recv(recvbuf.data(), recv_coldisp[n_thiscol], MPIType<VT>(), sender, 1, commGrid->GetColWorld(), MPI_STATUS_IGNORE);
1206 
1207 
1208 
1209 #ifdef THREADED
1210 #pragma omp parallel for
1211 #endif
1212                 for(IT i=0; i<n_thiscol; ++i)
1213                 {
1214                     // partial merge until first k elements
1215                     IT j=send_coldisp[i], l=recv_coldisp[i];
1216                     //IT templen[i] = k*i;
1217                     IT offset = k*i;
1218                     IT lid = 0;
1219                     for(; j<send_coldisp[i+1] && l<recv_coldisp[i+1] && lid<k;)
1220                     {
1221                         if(sendbuf[j] > recvbuf[l])  // decision
1222                             tempbuf[offset+lid++] = sendbuf[j++];
1223                         else
1224                             tempbuf[offset+lid++] = recvbuf[l++];
1225                     }
1226                     while(j<send_coldisp[i+1] && lid<k) tempbuf[offset+lid++] = sendbuf[j++];
1227                     while(l<recv_coldisp[i+1] && lid<k) tempbuf[offset+lid++] = recvbuf[l++];
1228                     templen[i] = lid;
1229                 }
1230 
1231                 send_coldisp[0] = 0;
1232                 for(IT i=0; i<n_thiscol; i++)
1233                 {
1234                     send_coldisp[i+1] = send_coldisp[i] + templen[i];
1235                 }
1236 
1237 
1238 #ifdef THREADED
1239 #pragma omp parallel for
1240 #endif
1241                 for(IT i=0; i<n_thiscol; i++) // direct copy
1242                 {
1243                     IT offset = k*i;
1244                     std::copy(tempbuf.begin()+offset, tempbuf.begin()+offset+templen[i], sendbuf.begin() + send_coldisp[i]);
1245                 }
1246             }
1247         }
1248     }
1249     MPI_Barrier(commGrid->GetWorld());
1250     std::vector<VT> kthItem(n_thiscol);
1251 
1252     int root = commGrid->GetDiagOfProcCol();
1253     if(root==0 && colrank==0) // rank 0
1254     {
1255 #ifdef THREADED
1256 #pragma omp parallel for
1257 #endif
1258         for(IT i=0; i<n_thiscol; i++)
1259         {
1260             IT nitems = send_coldisp[i+1]-send_coldisp[i];
1261             if(nitems >= k)
1262                 kthItem[i] = sendbuf[send_coldisp[i]+k-1];
1263             else
1264                 kthItem[i] = std::numeric_limits<VT>::min(); // return minimum possible value if a column is empty or has less than k elements
1265         }
1266     }
1267     else if(root>0 && colrank==0) // send to the diagonl processor of this processor column
1268     {
1269 #ifdef THREADED
1270 #pragma omp parallel for
1271 #endif
1272         for(IT i=0; i<n_thiscol; i++)
1273         {
1274             IT nitems = send_coldisp[i+1]-send_coldisp[i];
1275             if(nitems >= k)
1276                 kthItem[i] = sendbuf[send_coldisp[i]+k-1];
1277             else
1278                 kthItem[i] = std::numeric_limits<VT>::min(); // return minimum possible value if a column is empty or has less than k elements
1279         }
1280         MPI_Send(kthItem.data(), n_thiscol, MPIType<VT>(), root, 0, commGrid->GetColWorld());
1281     }
1282     else if(root>0 && colrank==root)
1283     {
1284         MPI_Recv(kthItem.data(), n_thiscol, MPIType<VT>(), 0, 0, commGrid->GetColWorld(), MPI_STATUS_IGNORE);
1285     }
1286 
1287     std::vector <int> sendcnts;
1288     std::vector <int> dpls;
1289     if(colrank==root)
1290     {
1291         int proccols = commGrid->GetGridCols();
1292         IT n_perproc = n_thiscol / proccols;
1293         sendcnts.resize(proccols);
1294         std::fill(sendcnts.data(), sendcnts.data()+proccols-1, n_perproc);
1295         sendcnts[proccols-1] = n_thiscol - (n_perproc * (proccols-1));
1296         dpls.resize(proccols,0);	// displacements (zero initialized pid)
1297         std::partial_sum(sendcnts.data(), sendcnts.data()+proccols-1, dpls.data()+1);
1298     }
1299 
1300     int rowroot = commGrid->GetDiagOfProcRow();
1301     int recvcnts = 0;
1302     // scatter received data size
1303     MPI_Scatter(sendcnts.data(),1, MPI_INT, & recvcnts, 1, MPI_INT, rowroot, commGrid->GetRowWorld());
1304 
1305     rvec.arr.resize(recvcnts);
1306     MPI_Scatterv(kthItem.data(),sendcnts.data(), dpls.data(), MPIType<VT>(), rvec.arr.data(), rvec.arr.size(), MPIType<VT>(),rowroot, commGrid->GetRowWorld());
1307     rvec.glen = getncol();
1308     return true;
1309 }
1310 
1311 
1312 
1313 template <class IT, class NT, class DER>
1314 template <typename VT, typename GIT, typename _UnaryOperation>	// GIT: global index type of vector
Kselect1(FullyDistSpVec<GIT,VT> & rvec,IT k,_UnaryOperation __unary_op) const1315 bool SpParMat<IT,NT,DER>::Kselect1(FullyDistSpVec<GIT,VT> & rvec, IT k, _UnaryOperation __unary_op) const
1316 {
1317     if(*rvec.commGrid != *commGrid)
1318     {
1319         SpParHelper::Print("Grids are not comparable, SpParMat::Kselect() fails!", commGrid->GetWorld());
1320         MPI_Abort(MPI_COMM_WORLD,GRIDMISMATCH);
1321     }
1322 
1323     /*
1324      FullyDistVec<IT, IT> nnzPerColumn (getcommgrid());
1325      Reduce(nnzPerColumn, Column, plus<IT>(), (IT)0, [](NT val){return (IT)1;});
1326      IT maxnnzPerColumn = nnzPerColumn.Reduce(maximum<IT>(), (IT)0);
1327      if(k>maxnnzPerColumn)
1328      {
1329      SpParHelper::Print("Kselect: k is greater then maxNnzInColumn. Calling Reduce instead...\n");
1330      Reduce(rvec, Column, minimum<NT>(), static_cast<NT>(0));
1331      return false;
1332      }
1333      */
1334 
1335     IT n_thiscol = getlocalcols();   // length (number of columns) assigned to this processor (and processor column)
1336     MPI_Comm World = rvec.commGrid->GetWorld();
1337     MPI_Comm ColWorld = rvec.commGrid->GetColWorld();
1338     MPI_Comm RowWorld = rvec.commGrid->GetRowWorld();
1339     int colneighs = commGrid->GetGridRows();
1340     int colrank = commGrid->GetRankInProcCol();
1341     int coldiagrank = commGrid->GetDiagOfProcCol();
1342 
1343 
1344     //replicate sparse indices along processor column
1345     int accnz;
1346     int32_t trxlocnz;
1347     GIT lenuntil;
1348     int32_t *trxinds, *activeCols;
1349     VT *trxnums, *numacc;
1350     TransposeVector(World, rvec, trxlocnz, lenuntil, trxinds, trxnums, true);
1351 
1352     if(rvec.commGrid->GetGridRows() > 1)
1353     {
1354         //TODO: we only need to communicate indices
1355         AllGatherVector(ColWorld, trxlocnz, lenuntil, trxinds, trxnums, activeCols, numacc, accnz, true);  // trxindS/trxnums deallocated, indacc/numacc allocated, accnz set
1356     }
1357     else
1358     {
1359         accnz = trxlocnz;
1360         activeCols = trxinds;     //aliasing ptr
1361         numacc = trxnums;     //aliasing ptr
1362     }
1363 
1364 
1365     std::vector<bool> isactive(n_thiscol,false);
1366     for(int i=0; i<accnz ; i++)
1367     {
1368         isactive[activeCols[i]] = true;
1369         //cout << indacc[i] <<  " ";
1370     }
1371     IT nActiveCols = accnz;//count_if(isactive.begin(), isactive.end(), [](bool ac){return ac;});
1372     // check, memory should be min(n_thiscol*k, local nnz)
1373     // hence we will not overflow for very large k
1374 
1375     std::vector<IT> send_coldisp(n_thiscol+1,0);
1376     std::vector<IT> local_coldisp(n_thiscol+1,0);
1377     //vector<VT> sendbuf(nActiveCols*k);
1378     VT * sendbuf = static_cast<VT *> (::operator new (n_thiscol*k*sizeof(VT)));
1379 
1380 
1381     //displacement of local columns
1382     //local_coldisp is the displacement of all nonzeros per column
1383     //send_coldisp is the displacement of k nonzeros per column
1384     IT nzc = 0;
1385     if(spSeq->getnnz()>0)
1386     {
1387         typename DER::SpColIter colit = spSeq->begcol();
1388         for(IT i=0; i<n_thiscol; ++i)
1389         {
1390             local_coldisp[i+1] = local_coldisp[i];
1391             send_coldisp[i+1] = send_coldisp[i];
1392             if(i==colit.colid())
1393             {
1394                 if(isactive[i])
1395                 {
1396                     local_coldisp[i+1] += colit.nnz();
1397                     if(colit.nnz()>=k)
1398                         send_coldisp[i+1] += k;
1399                     else
1400                         send_coldisp[i+1] += colit.nnz();
1401                 }
1402                 colit++;
1403                 nzc++;
1404             }
1405         }
1406     }
1407 
1408     // a copy of local part of the matrix
1409     // this can be avoided if we write our own local kselect function instead of using partial_sort
1410     //vector<VT> localmat(local_coldisp[n_thiscol]);
1411     VT * localmat = static_cast<VT *> (::operator new (local_coldisp[n_thiscol]*sizeof(VT)));
1412 
1413 
1414 #ifdef THREADED
1415 #pragma omp parallel for
1416 #endif
1417     for(IT i=0; i<nzc; i++)
1418         //for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit)	// iterate over columns
1419     {
1420         typename DER::SpColIter colit = spSeq->begcol() + i;
1421         IT colid = colit.colid();
1422         if(isactive[colid])
1423         {
1424             IT idx = local_coldisp[colid];
1425             for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit < spSeq->endnz(colit); ++nzit)
1426             {
1427                 localmat[idx++] = static_cast<VT>(__unary_op(nzit.value()));
1428             }
1429 
1430             if(colit.nnz()<=k)
1431             {
1432                 sort(localmat+local_coldisp[colid], localmat+local_coldisp[colid+1], std::greater<VT>());
1433                 std::copy(localmat+local_coldisp[colid], localmat+local_coldisp[colid+1], sendbuf+send_coldisp[colid]);
1434             }
1435             else
1436             {
1437                 partial_sort(localmat+local_coldisp[colid], localmat+local_coldisp[colid]+k, localmat+local_coldisp[colid+1], std::greater<VT>());
1438                 std::copy(localmat+local_coldisp[colid], localmat+local_coldisp[colid]+k, sendbuf+send_coldisp[colid]);
1439             }
1440         }
1441     }
1442 
1443 
1444     //vector<VT>().swap(localmat);
1445     ::operator delete(localmat);
1446     std::vector<IT>().swap(local_coldisp);
1447 
1448     VT * recvbuf = static_cast<VT *> (::operator new (n_thiscol*k*sizeof(VT)));
1449     VT * tempbuf = static_cast<VT *> (::operator new (n_thiscol*k*sizeof(VT)));
1450     //vector<VT> recvbuf(n_thiscol*k);
1451     //vector<VT> tempbuf(n_thiscol*k);
1452     std::vector<IT> recv_coldisp(n_thiscol+1);
1453     std::vector<IT> templen(n_thiscol);
1454 
1455 
1456 
1457     for(int p=2; p <= colneighs; p*=2)
1458     {
1459 
1460         if(colrank%p == p/2) // this processor is a sender in this round
1461         {
1462             int receiver = colrank - ceil(p/2);
1463             MPI_Send(send_coldisp.data(), n_thiscol+1, MPIType<IT>(), receiver, 0, commGrid->GetColWorld());
1464             MPI_Send(sendbuf, send_coldisp[n_thiscol], MPIType<VT>(), receiver, 1, commGrid->GetColWorld());
1465             //break;
1466         }
1467         else if(colrank%p == 0) // this processor is a receiver in this round
1468         {
1469             int sender = colrank + ceil(p/2);
1470             if(sender < colneighs)
1471             {
1472 
1473                 MPI_Recv(recv_coldisp.data(), n_thiscol+1, MPIType<IT>(), sender, 0, commGrid->GetColWorld(), MPI_STATUS_IGNORE);
1474                 MPI_Recv(recvbuf, recv_coldisp[n_thiscol], MPIType<VT>(), sender, 1, commGrid->GetColWorld(), MPI_STATUS_IGNORE);
1475 
1476 
1477 
1478 #ifdef THREADED
1479 #pragma omp parallel for
1480 #endif
1481                 for(IT i=0; i<n_thiscol; ++i)
1482                 {
1483                     // partial merge until first k elements
1484                     IT j=send_coldisp[i], l=recv_coldisp[i];
1485                     //IT templen[i] = k*i;
1486                     IT offset = k*i;
1487                     IT lid = 0;
1488                     for(; j<send_coldisp[i+1] && l<recv_coldisp[i+1] && lid<k;)
1489                     {
1490                         if(sendbuf[j] > recvbuf[l])  // decision
1491                             tempbuf[offset+lid++] = sendbuf[j++];
1492                         else
1493                             tempbuf[offset+lid++] = recvbuf[l++];
1494                     }
1495                     while(j<send_coldisp[i+1] && lid<k) tempbuf[offset+lid++] = sendbuf[j++];
1496                     while(l<recv_coldisp[i+1] && lid<k) tempbuf[offset+lid++] = recvbuf[l++];
1497                     templen[i] = lid;
1498                 }
1499 
1500                 send_coldisp[0] = 0;
1501                 for(IT i=0; i<n_thiscol; i++)
1502                 {
1503                     send_coldisp[i+1] = send_coldisp[i] + templen[i];
1504                 }
1505 
1506 
1507 #ifdef THREADED
1508 #pragma omp parallel for
1509 #endif
1510                 for(IT i=0; i<n_thiscol; i++) // direct copy
1511                 {
1512                     IT offset = k*i;
1513                     std::copy(tempbuf+offset, tempbuf+offset+templen[i], sendbuf + send_coldisp[i]);
1514                 }
1515             }
1516         }
1517     }
1518     MPI_Barrier(commGrid->GetWorld());
1519 
1520 
1521 
1522 
1523     /*--------------------------------------------------------
1524      At this point, top k elements in every active column
1525      are gathered on the first processor row, P(0,:).
1526 
1527      Next step: At P(0,i) find the kth largest element in
1528      active columns belonging to P(0,i).
1529      If nnz in a column is less than k, keep the largest nonzero.
1530      If a column is empty, keep the lowest numeric value.
1531      --------------------------------------------------------*/
1532 
1533     std::vector<VT> kthItem(nActiveCols); // kth elements of local active columns
1534     if(colrank==0)
1535     {
1536 #ifdef THREADED
1537 #pragma omp parallel for
1538 #endif
1539         for(IT i=0; i<nActiveCols; i++)
1540         {
1541             IT ai = activeCols[i]; // active column index
1542             IT nitems = send_coldisp[ai+1]-send_coldisp[ai];
1543             if(nitems >= k)
1544                 kthItem[i] = sendbuf[send_coldisp[ai]+k-1];
1545             else if (nitems==0)
1546                 kthItem[i] = std::numeric_limits<VT>::min(); // return minimum possible value if a column is empty
1547             else
1548                 kthItem[i] = sendbuf[send_coldisp[ai+1]-1]; // returning the last entry if nnz in this column is less than k
1549 
1550         }
1551     }
1552 
1553     /*--------------------------------------------------------
1554      At this point, kth largest elements in every active column
1555      are gathered on the first processor row, P(0,:).
1556 
1557      Next step: Send the kth largest elements from P(0,i) to P(i,i)
1558      Nothing to do for P(0,0)
1559      --------------------------------------------------------*/
1560     if(coldiagrank>0 && colrank==0)
1561     {
1562         MPI_Send(kthItem.data(), nActiveCols, MPIType<VT>(), coldiagrank, 0, commGrid->GetColWorld());
1563     }
1564     else if(coldiagrank>0 && colrank==coldiagrank) // receive in the diagonal processor
1565     {
1566         MPI_Recv(kthItem.data(), nActiveCols, MPIType<VT>(), 0, 0, commGrid->GetColWorld(), MPI_STATUS_IGNORE);
1567     }
1568 
1569     /*--------------------------------------------------------
1570      At this point, kth largest elements in every active column
1571      are gathered on the diagonal processors P(i,i).
1572 
1573      Next step: Scatter the kth largest elements from P(i,i)
1574      to all processors in the ith row, P(i,:).
1575      Each processor recevies exactly local nnz of rvec entries
1576      so that the received data can be directly put in rvec.
1577      --------------------------------------------------------*/
1578     int rowroot = commGrid->GetDiagOfProcRow();
1579     int proccols = commGrid->GetGridCols();
1580     std::vector <int> sendcnts(proccols,0);
1581     std::vector <int> dpls(proccols,0);
1582     int lsize = rvec.ind.size();
1583     // local sizes of the input vecotor will be sent from the doagonal processor
1584     MPI_Gather(&lsize,1, MPI_INT, sendcnts.data(), 1, MPI_INT, rowroot, RowWorld);
1585     std::partial_sum(sendcnts.data(), sendcnts.data()+proccols-1, dpls.data()+1);
1586     MPI_Scatterv(kthItem.data(),sendcnts.data(), dpls.data(), MPIType<VT>(), rvec.num.data(), rvec.num.size(), MPIType<VT>(),rowroot, RowWorld);
1587 
1588     ::operator delete(sendbuf);
1589     ::operator delete(recvbuf);
1590     ::operator delete(tempbuf);
1591 
1592 
1593     return true;
1594 }
1595 
1596 // only defined for symmetric matrix
1597 template <class IT, class NT, class DER>
1598 IT SpParMat<IT,NT,DER>::Bandwidth() const
1599 {
1600     IT upperlBW = -1;
1601     IT lowerlBW = -1;
1602     IT m_perproc = getnrow() / commGrid->GetGridRows();
1603     IT n_perproc = getncol() / commGrid->GetGridCols();
1604     IT moffset = commGrid->GetRankInProcCol() * m_perproc;
1605     IT noffset = commGrid->GetRankInProcRow() * n_perproc;
1606 
1607     for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit)	// iterate over columns
1608     {
1609         IT diagrow = colit.colid() + noffset;
1610         typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit);
1611         if(nzit != spSeq->endnz(colit)) // nonempty column
1612         {
1613             IT firstrow = nzit.rowid() + moffset;
1614             IT lastrow = (nzit+ colit.nnz()-1).rowid() + moffset;
1615 
1616             if(firstrow <= diagrow) // upper diagonal
1617             {
1618                 IT dev = diagrow - firstrow;
1619                 if(upperlBW < dev) upperlBW = dev;
1620             }
1621             if(lastrow >= diagrow) // lower diagonal
1622             {
1623                 IT dev = lastrow - diagrow;
1624                 if(lowerlBW < dev) lowerlBW = dev;
1625             }
1626         }
1627     }
1628     IT upperBW;
1629     //IT lowerBW;
1630     MPI_Allreduce( &upperlBW, &upperBW, 1, MPIType<IT>(), MPI_MAX, commGrid->GetWorld());
1631     //MPI_Allreduce( &lowerlBW, &lowerBW, 1, MPIType<IT>(), MPI_MAX, commGrid->GetWorld());
1632 
1633     //return (upperBW + lowerBW + 1);
1634     return (upperBW);
1635 }
1636 
1637 
1638 
1639 // only defined for symmetric matrix
1640 template <class IT, class NT, class DER>
1641 IT SpParMat<IT,NT,DER>::Profile() const
1642 {
1643     int colrank = commGrid->GetRankInProcRow();
1644     IT cols = getncol();
1645     IT rows = getnrow();
1646     IT m_perproc = cols / commGrid->GetGridRows();
1647     IT n_perproc = rows / commGrid->GetGridCols();
1648     IT moffset = commGrid->GetRankInProcCol() * m_perproc;
1649     IT noffset = colrank * n_perproc;
1650 
1651 
1652     int pc = commGrid->GetGridCols();
1653     IT n_thisproc;
1654     if(colrank!=pc-1 ) n_thisproc = n_perproc;
1655     else n_thisproc =  cols - (pc-1)*n_perproc;
1656 
1657 
1658     std::vector<IT> firstRowInCol(n_thisproc,getnrow());
1659     std::vector<IT> lastRowInCol(n_thisproc,-1);
1660 
1661     for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit)	// iterate over columns
1662     {
1663         IT diagrow = colit.colid() + noffset;
1664         typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit);
1665         if(nzit != spSeq->endnz(colit)) // nonempty column
1666         {
1667             IT firstrow = nzit.rowid() + moffset;
1668             IT lastrow = (nzit+ colit.nnz()-1).rowid() + moffset;
1669             if(firstrow <= diagrow) // upper diagonal
1670             {
1671                 firstRowInCol[colit.colid()] = firstrow;
1672             }
1673             if(lastrow >= diagrow) // lower diagonal
1674             {
1675                 lastRowInCol[colit.colid()] = lastrow;
1676             }
1677         }
1678     }
1679 
1680     std::vector<IT> firstRowInCol_global(n_thisproc,getnrow());
1681     //vector<IT> lastRowInCol_global(n_thisproc,-1);
1682     MPI_Allreduce( firstRowInCol.data(), firstRowInCol_global.data(), n_thisproc, MPIType<IT>(), MPI_MIN, commGrid->colWorld);
1683     //MPI_Allreduce( lastRowInCol.data(), lastRowInCol_global.data(), n_thisproc, MPIType<IT>(), MPI_MAX, commGrid->GetColWorld());
1684 
1685     IT profile = 0;
1686     for(IT i=0; i<n_thisproc; i++)
1687     {
1688         if(firstRowInCol_global[i]==rows) // empty column
1689             profile++;
1690         else
1691             profile += (i + noffset - firstRowInCol_global[i]);
1692     }
1693 
1694     IT profile_global = 0;
1695     MPI_Allreduce( &profile, &profile_global, 1, MPIType<IT>(), MPI_SUM, commGrid->rowWorld);
1696 
1697     return (profile_global);
1698 }
1699 
1700 
1701 
1702 template <class IT, class NT, class DER>
1703 template <typename VT, typename GIT, typename _BinaryOperation>
MaskedReduce(FullyDistVec<GIT,VT> & rvec,FullyDistSpVec<GIT,VT> & mask,Dim dim,_BinaryOperation __binary_op,VT id,bool exclude) const1704 void SpParMat<IT,NT,DER>::MaskedReduce(FullyDistVec<GIT,VT> & rvec, FullyDistSpVec<GIT,VT> & mask, Dim dim, _BinaryOperation __binary_op, VT id, bool exclude) const
1705 {
1706     if (dim!=Column)
1707     {
1708         SpParHelper::Print("SpParMat::MaskedReduce() is only implemented for Colum\n");
1709         MPI_Abort(MPI_COMM_WORLD,GRIDMISMATCH);
1710     }
1711     MaskedReduce(rvec, mask, dim, __binary_op, id, myidentity<NT>(), exclude);
1712 }
1713 
1714 /**
1715  * Reduce along the column into a vector
1716  * @param[in] mask {A sparse vector indicating row indices included/excluded (based on exclude argument) in the reduction }
1717  * @param[in] __binary_op {the operation used for reduction; examples: max, min, plus, multiply, and, or. Its parameters and return type are all VT}
1718  * @param[in] id {scalar that is used as the identity for __binary_op; examples: zero, infinity}
1719  * @param[in] __unary_op {optional unary operation applied to nonzeros *before* the __binary_op; examples: 1/x, x^2}
1720  * @param[in] exclude {if true, masked row indices are included in the reduction}
1721  * @param[out] rvec {the return vector, specified as an output parameter to allow arbitrary return types via VT}
1722  **/
1723 template <class IT, class NT, class DER>
1724 template <typename VT, typename GIT, typename _BinaryOperation, typename _UnaryOperation>	// GIT: global index type of vector
MaskedReduce(FullyDistVec<GIT,VT> & rvec,FullyDistSpVec<GIT,VT> & mask,Dim dim,_BinaryOperation __binary_op,VT id,_UnaryOperation __unary_op,bool exclude) const1725 void SpParMat<IT,NT,DER>::MaskedReduce(FullyDistVec<GIT,VT> & rvec, FullyDistSpVec<GIT,VT> & mask, Dim dim, _BinaryOperation __binary_op, VT id, _UnaryOperation __unary_op, bool exclude) const
1726 {
1727     MPI_Comm World = commGrid->GetWorld();
1728     MPI_Comm ColWorld = commGrid->GetColWorld();
1729     MPI_Comm RowWorld = commGrid->GetRowWorld();
1730 
1731     if (dim!=Column)
1732     {
1733         SpParHelper::Print("SpParMat::MaskedReduce() is only implemented for Colum\n");
1734         MPI_Abort(MPI_COMM_WORLD,GRIDMISMATCH);
1735     }
1736     if(*rvec.commGrid != *commGrid)
1737     {
1738         SpParHelper::Print("Grids are not comparable, SpParMat::MaskedReduce() fails!", commGrid->GetWorld());
1739         MPI_Abort(MPI_COMM_WORLD,GRIDMISMATCH);
1740     }
1741 
1742     int rowneighs = commGrid->GetGridCols();
1743     int rowrank = commGrid->GetRankInProcRow();
1744     std::vector<int> rownz(rowneighs);
1745     int locnnzMask = static_cast<int> (mask.getlocnnz());
1746     rownz[rowrank] = locnnzMask;
1747     MPI_Allgather(MPI_IN_PLACE, 1, MPI_INT, rownz.data(), 1, MPI_INT, RowWorld);
1748     std::vector<int> dpls(rowneighs+1,0);
1749     std::partial_sum(rownz.begin(), rownz.end(), dpls.data()+1);
1750     int accnz = std::accumulate(rownz.begin(), rownz.end(), 0);
1751     std::vector<GIT> sendInd(locnnzMask);
1752     std::transform(mask.ind.begin(), mask.ind.end(),sendInd.begin(), bind2nd(std::plus<GIT>(), mask.RowLenUntil()));
1753 
1754     std::vector<GIT> indMask(accnz);
1755     MPI_Allgatherv(sendInd.data(), rownz[rowrank], MPIType<GIT>(), indMask.data(), rownz.data(), dpls.data(), MPIType<GIT>(), RowWorld);
1756 
1757 
1758     // We can't use rvec's distribution (rows first, columns later) here
1759     IT n_thiscol = getlocalcols();   // length assigned to this processor column
1760     int colneighs = commGrid->GetGridRows();	// including oneself
1761     int colrank = commGrid->GetRankInProcCol();
1762 
1763     GIT * loclens = new GIT[colneighs];
1764     GIT * lensums = new GIT[colneighs+1]();	// begin/end points of local lengths
1765 
1766     GIT n_perproc = n_thiscol / colneighs;    // length on a typical processor
1767     if(colrank == colneighs-1)
1768         loclens[colrank] = n_thiscol - (n_perproc*colrank);
1769     else
1770         loclens[colrank] = n_perproc;
1771 
1772     MPI_Allgather(MPI_IN_PLACE, 0, MPIType<GIT>(), loclens, 1, MPIType<GIT>(), commGrid->GetColWorld());
1773     std::partial_sum(loclens, loclens+colneighs, lensums+1);	// loclens and lensums are different, but both would fit in 32-bits
1774 
1775     std::vector<VT> trarr;
1776     typename DER::SpColIter colit = spSeq->begcol();
1777     for(int i=0; i< colneighs; ++i)
1778     {
1779         VT * sendbuf = new VT[loclens[i]];
1780         std::fill(sendbuf, sendbuf+loclens[i], id);	// fill with identity
1781 
1782         for(; colit != spSeq->endcol() && colit.colid() < lensums[i+1]; ++colit)	// iterate over a portion of columns
1783         {
1784             int k=0;
1785             typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit);
1786 
1787             for(; nzit != spSeq->endnz(colit) && k < indMask.size(); )	// all nonzeros in this column
1788             {
1789                 if(nzit.rowid() < indMask[k])
1790                 {
1791                     if(exclude)
1792                     {
1793                         sendbuf[colit.colid()-lensums[i]] = __binary_op(static_cast<VT>(__unary_op(nzit.value())), sendbuf[colit.colid()-lensums[i]]);
1794                     }
1795                     ++nzit;
1796                 }
1797                 else if(nzit.rowid() > indMask[k]) ++k;
1798                 else
1799                 {
1800                     if(!exclude)
1801                     {
1802                         sendbuf[colit.colid()-lensums[i]] = __binary_op(static_cast<VT>(__unary_op(nzit.value())), sendbuf[colit.colid()-lensums[i]]);
1803                     }
1804                     ++k;
1805                     ++nzit;
1806                 }
1807 
1808             }
1809             if(exclude)
1810             {
1811                 while(nzit != spSeq->endnz(colit))
1812                 {
1813                     sendbuf[colit.colid()-lensums[i]] = __binary_op(static_cast<VT>(__unary_op(nzit.value())), sendbuf[colit.colid()-lensums[i]]);
1814                     ++nzit;
1815                 }
1816             }
1817         }
1818 
1819         VT * recvbuf = NULL;
1820         if(colrank == i)
1821         {
1822             trarr.resize(loclens[i]);
1823             recvbuf = SpHelper::p2a(trarr);
1824         }
1825         MPI_Reduce(sendbuf, recvbuf, loclens[i], MPIType<VT>(), MPIOp<_BinaryOperation, VT>::op(), i, commGrid->GetColWorld()); // root  = i
1826         delete [] sendbuf;
1827     }
1828     DeleteAll(loclens, lensums);
1829 
1830     GIT reallen;	// Now we have to transpose the vector
1831     GIT trlen = trarr.size();
1832     int diagneigh = commGrid->GetComplementRank();
1833     MPI_Status status;
1834     MPI_Sendrecv(&trlen, 1, MPIType<IT>(), diagneigh, TRNNZ, &reallen, 1, MPIType<IT>(), diagneigh, TRNNZ, commGrid->GetWorld(), &status);
1835 
1836     rvec.arr.resize(reallen);
1837     MPI_Sendrecv(SpHelper::p2a(trarr), trlen, MPIType<VT>(), diagneigh, TRX, SpHelper::p2a(rvec.arr), reallen, MPIType<VT>(), diagneigh, TRX, commGrid->GetWorld(), &status);
1838     rvec.glen = getncol();	// ABAB: Put a sanity check here
1839 
1840 }
1841 
1842 
1843 
1844 
1845 template <class IT, class NT, class DER>
1846 template <typename NNT,typename NDER>
operator SpParMat<IT,NNT,NDER>() const1847 SpParMat<IT,NT,DER>::operator SpParMat<IT,NNT,NDER> () const
1848 {
1849 	NDER * convert = new NDER(*spSeq);
1850 	return SpParMat<IT,NNT,NDER> (convert, commGrid);
1851 }
1852 
1853 //! Change index type as well
1854 template <class IT, class NT, class DER>
1855 template <typename NIT, typename NNT,typename NDER>
operator SpParMat<NIT,NNT,NDER>() const1856 SpParMat<IT,NT,DER>::operator SpParMat<NIT,NNT,NDER> () const
1857 {
1858 	NDER * convert = new NDER(*spSeq);
1859 	return SpParMat<NIT,NNT,NDER> (convert, commGrid);
1860 }
1861 
1862 /**
1863  * Create a submatrix of size m x (size(ci) * s) on a r x s processor grid
1864  * Essentially fetches the columns ci[0], ci[1],... ci[size(ci)] from every submatrix
1865  */
1866 template <class IT, class NT, class DER>
SubsRefCol(const std::vector<IT> & ci) const1867 SpParMat<IT,NT,DER> SpParMat<IT,NT,DER>::SubsRefCol (const std::vector<IT> & ci) const
1868 {
1869 	std::vector<IT> ri;
1870 	DER * tempseq = new DER((*spSeq)(ri, ci));
1871 	return SpParMat<IT,NT,DER> (tempseq, commGrid);
1872 }
1873 
1874 /**
1875  * Generalized sparse matrix indexing (ri/ci are 0-based indexed)
1876  * Both the storage and the actual values in FullyDistVec should be IT
1877  * The index vectors are dense and FULLY distributed on all processors
1878  * We can use this function to apply a permutation like A(p,q)
1879  * Sequential indexing subroutine (via multiplication) is general enough.
1880  */
1881 template <class IT, class NT, class DER>
1882 template <typename PTNTBOOL, typename PTBOOLNT>
SubsRef_SR(const FullyDistVec<IT,IT> & ri,const FullyDistVec<IT,IT> & ci,bool inplace)1883 SpParMat<IT,NT,DER> SpParMat<IT,NT,DER>::SubsRef_SR (const FullyDistVec<IT,IT> & ri, const FullyDistVec<IT,IT> & ci, bool inplace)
1884 {
1885 	// infer the concrete type SpMat<IT,IT>
1886 	typedef typename create_trait<DER, IT, bool>::T_inferred DER_IT;
1887 
1888 	if((*(ri.commGrid) != *(commGrid)) || (*(ci.commGrid) != *(commGrid)))
1889 	{
1890 		SpParHelper::Print("Grids are not comparable, SpRef fails !");
1891 		MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
1892 	}
1893 
1894 	// Safety check
1895 	IT locmax_ri = 0;
1896 	IT locmax_ci = 0;
1897 	if(!ri.arr.empty())
1898 		locmax_ri = *std::max_element(ri.arr.begin(), ri.arr.end());
1899 	if(!ci.arr.empty())
1900 		locmax_ci = *std::max_element(ci.arr.begin(), ci.arr.end());
1901 
1902 	IT total_m = getnrow();
1903 	IT total_n = getncol();
1904 	if(locmax_ri > total_m || locmax_ci > total_n)
1905 	{
1906 		throw outofrangeexception();
1907 	}
1908 
1909 	// The indices for FullyDistVec are offset'd to 1/p pieces
1910 	// The matrix indices are offset'd to 1/sqrt(p) pieces
1911 	// Add the corresponding offset before sending the data
1912 	IT roffset = ri.RowLenUntil();
1913 	IT rrowlen = ri.MyRowLength();
1914 	IT coffset = ci.RowLenUntil();
1915 	IT crowlen = ci.MyRowLength();
1916 
1917 	// We create two boolean matrices P and Q
1918 	// Dimensions:  P is size(ri) x m
1919 	//		Q is n x size(ci)
1920 	// Range(ri) = {0,...,m-1}
1921 	// Range(ci) = {0,...,n-1}
1922 
1923 	IT rowneighs = commGrid->GetGridCols();	// number of neighbors along this processor row (including oneself)
1924 	IT totalm = getnrow();	// collective call
1925 	IT totaln = getncol();
1926 	IT m_perproccol = totalm / rowneighs;
1927 	IT n_perproccol = totaln / rowneighs;
1928 
1929 	// Get the right local dimensions
1930 	IT diagneigh = commGrid->GetComplementRank();
1931 	IT mylocalrows = getlocalrows();
1932 	IT mylocalcols = getlocalcols();
1933 	IT trlocalrows;
1934 	MPI_Status status;
1935 	MPI_Sendrecv(&mylocalrows, 1, MPIType<IT>(), diagneigh, TRROWX, &trlocalrows, 1, MPIType<IT>(), diagneigh, TRROWX, commGrid->GetWorld(), &status);
1936 	// we don't need trlocalcols because Q.Transpose() will take care of it
1937 
1938 	std::vector< std::vector<IT> > rowid(rowneighs);	// reuse for P and Q
1939 	std::vector< std::vector<IT> > colid(rowneighs);
1940 
1941 	// Step 1: Create P
1942 	IT locvec = ri.arr.size();	// nnz in local vector
1943 	for(typename std::vector<IT>::size_type i=0; i< (unsigned)locvec; ++i)
1944 	{
1945 		// numerical values (permutation indices) are 0-based
1946 		// recipient alone progessor row
1947 		IT rowrec = (m_perproccol!=0) ? std::min(ri.arr[i] / m_perproccol, rowneighs-1) : (rowneighs-1);
1948 
1949 		// ri's numerical values give the colids and its local indices give rowids
1950 		rowid[rowrec].push_back( i + roffset);
1951 		colid[rowrec].push_back(ri.arr[i] - (rowrec * m_perproccol));
1952 	}
1953 
1954 	int * sendcnt = new int[rowneighs];	// reuse in Q as well
1955 	int * recvcnt = new int[rowneighs];
1956 	for(IT i=0; i<rowneighs; ++i)
1957 		sendcnt[i] = rowid[i].size();
1958 
1959 	MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, commGrid->GetRowWorld()); // share the counts
1960 	int * sdispls = new int[rowneighs]();
1961 	int * rdispls = new int[rowneighs]();
1962 	std::partial_sum(sendcnt, sendcnt+rowneighs-1, sdispls+1);
1963 	std::partial_sum(recvcnt, recvcnt+rowneighs-1, rdispls+1);
1964 	IT p_nnz = std::accumulate(recvcnt,recvcnt+rowneighs, static_cast<IT>(0));
1965 
1966 	// create space for incoming data ...
1967 	IT * p_rows = new IT[p_nnz];
1968 	IT * p_cols = new IT[p_nnz];
1969   	IT * senddata = new IT[locvec];	// re-used for both rows and columns
1970 	for(int i=0; i<rowneighs; ++i)
1971 	{
1972     std::copy(rowid[i].begin(), rowid[i].end(), senddata+sdispls[i]);
1973 		std::vector<IT>().swap(rowid[i]);	// clear memory of rowid
1974 	}
1975 	MPI_Alltoallv(senddata, sendcnt, sdispls, MPIType<IT>(), p_rows, recvcnt, rdispls, MPIType<IT>(), commGrid->GetRowWorld());
1976 
1977 	for(int i=0; i<rowneighs; ++i)
1978 	{
1979     std::copy(colid[i].begin(), colid[i].end(), senddata+sdispls[i]);
1980 		std::vector<IT>().swap(colid[i]);	// clear memory of colid
1981 	}
1982 	MPI_Alltoallv(senddata, sendcnt, sdispls, MPIType<IT>(), p_cols, recvcnt, rdispls, MPIType<IT>(), commGrid->GetRowWorld());
1983 	delete [] senddata;
1984 
1985 	std::tuple<IT,IT,bool> * p_tuples = new std::tuple<IT,IT,bool>[p_nnz];
1986 	for(IT i=0; i< p_nnz; ++i)
1987 	{
1988 		p_tuples[i] = std::make_tuple(p_rows[i], p_cols[i], 1);
1989 	}
1990 	DeleteAll(p_rows, p_cols);
1991 
1992 	DER_IT * PSeq = new DER_IT();
1993 	PSeq->Create( p_nnz, rrowlen, trlocalrows, p_tuples);		// deletion of tuples[] is handled by SpMat::Create
1994 
1995 	SpParMat<IT,NT,DER> PA(commGrid);
1996 	if(&ri == &ci)	// Symmetric permutation
1997 	{
1998 		DeleteAll(sendcnt, recvcnt, sdispls, rdispls);
1999 		#ifdef SPREFDEBUG
2000 		SpParHelper::Print("Symmetric permutation\n", commGrid->GetWorld());
2001 		#endif
2002 		SpParMat<IT,bool,DER_IT> P (PSeq, commGrid);
2003 		if(inplace)
2004 		{
2005 			#ifdef SPREFDEBUG
2006 			SpParHelper::Print("In place multiplication\n", commGrid->GetWorld());
2007 			#endif
2008         		*this = Mult_AnXBn_DoubleBuff<PTBOOLNT, NT, DER>(P, *this, false, true);	// clear the memory of *this
2009 
2010 			//ostringstream outb;
2011 			//outb << "P_after_" << commGrid->myrank;
2012 			//ofstream ofb(outb.str().c_str());
2013 			//P.put(ofb);
2014 
2015 			P.Transpose();
2016        	 		*this = Mult_AnXBn_DoubleBuff<PTNTBOOL, NT, DER>(*this, P, true, true);	// clear the memory of both *this and P
2017 			return SpParMat<IT,NT,DER>(commGrid);	// dummy return to match signature
2018 		}
2019 		else
2020 		{
2021 			PA = Mult_AnXBn_DoubleBuff<PTBOOLNT, NT, DER>(P,*this);
2022 			P.Transpose();
2023 			return Mult_AnXBn_DoubleBuff<PTNTBOOL, NT, DER>(PA, P);
2024 		}
2025 	}
2026 	else
2027 	{
2028 		// Intermediate step (to save memory): Form PA and store it in P
2029 		// Distributed matrix generation (collective call)
2030 		SpParMat<IT,bool,DER_IT> P (PSeq, commGrid);
2031 
2032 		// Do parallel matrix-matrix multiply
2033         	PA = Mult_AnXBn_DoubleBuff<PTBOOLNT, NT, DER>(P, *this);
2034 	}	// P is destructed here
2035 #ifndef NDEBUG
2036 	PA.PrintInfo();
2037 #endif
2038 	// Step 2: Create Q  (use the same row-wise communication and transpose at the end)
2039 	// This temporary to-be-transposed Q is size(ci) x n
2040 	locvec = ci.arr.size();	// nnz in local vector (reset variable)
2041 	for(typename std::vector<IT>::size_type i=0; i< (unsigned)locvec; ++i)
2042 	{
2043 		// numerical values (permutation indices) are 0-based
2044 		IT rowrec = (n_perproccol!=0) ? std::min(ci.arr[i] / n_perproccol, rowneighs-1) : (rowneighs-1);
2045 
2046 		// ri's numerical values give the colids and its local indices give rowids
2047 		rowid[rowrec].push_back( i + coffset);
2048 		colid[rowrec].push_back(ci.arr[i] - (rowrec * n_perproccol));
2049 	}
2050 
2051 	for(IT i=0; i<rowneighs; ++i)
2052 		sendcnt[i] = rowid[i].size();	// update with new sizes
2053 
2054 	MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, commGrid->GetRowWorld()); // share the counts
2055 	std::fill(sdispls, sdispls+rowneighs, 0);	// reset
2056 	std::fill(rdispls, rdispls+rowneighs, 0);
2057 	std::partial_sum(sendcnt, sendcnt+rowneighs-1, sdispls+1);
2058 	std::partial_sum(recvcnt, recvcnt+rowneighs-1, rdispls+1);
2059 	IT q_nnz = std::accumulate(recvcnt,recvcnt+rowneighs, static_cast<IT>(0));
2060 
2061 	// create space for incoming data ...
2062 	IT * q_rows = new IT[q_nnz];
2063 	IT * q_cols = new IT[q_nnz];
2064   	senddata = new IT[locvec];
2065 	for(int i=0; i<rowneighs; ++i)
2066 	{
2067     std::copy(rowid[i].begin(), rowid[i].end(), senddata+sdispls[i]);
2068 		std::vector<IT>().swap(rowid[i]);	// clear memory of rowid
2069 	}
2070 	MPI_Alltoallv(senddata, sendcnt, sdispls, MPIType<IT>(), q_rows, recvcnt, rdispls, MPIType<IT>(), commGrid->GetRowWorld());
2071 
2072 	for(int i=0; i<rowneighs; ++i)
2073 	{
2074     std::copy(colid[i].begin(), colid[i].end(), senddata+sdispls[i]);
2075 		std::vector<IT>().swap(colid[i]);	// clear memory of colid
2076 	}
2077 	MPI_Alltoallv(senddata, sendcnt, sdispls, MPIType<IT>(), q_cols, recvcnt, rdispls, MPIType<IT>(), commGrid->GetRowWorld());
2078 	DeleteAll(senddata, sendcnt, recvcnt, sdispls, rdispls);
2079 
2080 	std::tuple<IT,IT,bool> * q_tuples = new std::tuple<IT,IT,bool>[q_nnz];
2081 	for(IT i=0; i< q_nnz; ++i)
2082 	{
2083 		q_tuples[i] = std::make_tuple(q_rows[i], q_cols[i], 1);
2084 	}
2085 	DeleteAll(q_rows, q_cols);
2086 	DER_IT * QSeq = new DER_IT();
2087 	QSeq->Create( q_nnz, crowlen, mylocalcols, q_tuples);		// Creating Q' instead
2088 
2089 	// Step 3: Form PAQ
2090 	// Distributed matrix generation (collective call)
2091 	SpParMat<IT,bool,DER_IT> Q (QSeq, commGrid);
2092 	Q.Transpose();
2093 	if(inplace)
2094 	{
2095        		*this = Mult_AnXBn_DoubleBuff<PTNTBOOL, NT, DER>(PA, Q, true, true);	// clear the memory of both PA and P
2096 		return SpParMat<IT,NT,DER>(commGrid);	// dummy return to match signature
2097 	}
2098 	else
2099 	{
2100         	return Mult_AnXBn_DoubleBuff<PTNTBOOL, NT, DER>(PA, Q);
2101 	}
2102 }
2103 
2104 
2105 template <class IT, class NT, class DER>
SpAsgn(const FullyDistVec<IT,IT> & ri,const FullyDistVec<IT,IT> & ci,SpParMat<IT,NT,DER> & B)2106 void SpParMat<IT,NT,DER>::SpAsgn(const FullyDistVec<IT,IT> & ri, const FullyDistVec<IT,IT> & ci, SpParMat<IT,NT,DER> & B)
2107 {
2108 	typedef PlusTimesSRing<NT, NT> PTRing;
2109 
2110 	if((*(ri.commGrid) != *(B.commGrid)) || (*(ci.commGrid) != *(B.commGrid)))
2111 	{
2112 		SpParHelper::Print("Grids are not comparable, SpAsgn fails !", commGrid->GetWorld());
2113 		MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2114 	}
2115 	IT total_m_A = getnrow();
2116 	IT total_n_A = getncol();
2117 	IT total_m_B = B.getnrow();
2118 	IT total_n_B = B.getncol();
2119 
2120 	if(total_m_B != ri.TotalLength())
2121 	{
2122 		SpParHelper::Print("First dimension of B does NOT match the length of ri, SpAsgn fails !", commGrid->GetWorld());
2123 		MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
2124 	}
2125 	if(total_n_B != ci.TotalLength())
2126 	{
2127 		SpParHelper::Print("Second dimension of B does NOT match the length of ci, SpAsgn fails !", commGrid->GetWorld());
2128 		MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
2129 	}
2130 	Prune(ri, ci);	// make a hole
2131 
2132 	// embed B to the size of A
2133 	FullyDistVec<IT,IT> * rvec = new FullyDistVec<IT,IT>(ri.commGrid);
2134 	rvec->iota(total_m_B, 0);	// sparse() expects a zero based index
2135 
2136 	SpParMat<IT,NT,DER> R(total_m_A, total_m_B, ri, *rvec, 1);
2137 	delete rvec;	// free memory
2138 	SpParMat<IT,NT,DER> RB = Mult_AnXBn_DoubleBuff<PTRing, NT, DER>(R, B, true, false); // clear memory of R but not B
2139 
2140 	FullyDistVec<IT,IT> * qvec = new FullyDistVec<IT,IT>(ri.commGrid);
2141 	qvec->iota(total_n_B, 0);
2142 	SpParMat<IT,NT,DER> Q(total_n_B, total_n_A, *qvec, ci, 1);
2143 	delete qvec;	// free memory
2144 	SpParMat<IT,NT,DER> RBQ = Mult_AnXBn_DoubleBuff<PTRing, NT, DER>(RB, Q, true, true); // clear memory of RB and Q
2145 	*this += RBQ;	// extend-add
2146 }
2147 
2148 template <class IT, class NT, class DER>
Prune(const FullyDistVec<IT,IT> & ri,const FullyDistVec<IT,IT> & ci)2149 void SpParMat<IT,NT,DER>::Prune(const FullyDistVec<IT,IT> & ri, const FullyDistVec<IT,IT> & ci)
2150 {
2151 	typedef PlusTimesSRing<NT, NT> PTRing;
2152 
2153 	if((*(ri.commGrid) != *(commGrid)) || (*(ci.commGrid) != *(commGrid)))
2154 	{
2155 		SpParHelper::Print("Grids are not comparable, Prune fails!\n", commGrid->GetWorld());
2156 		MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2157 	}
2158 
2159 	// Safety check
2160 	IT locmax_ri = 0;
2161 	IT locmax_ci = 0;
2162 	if(!ri.arr.empty())
2163 		locmax_ri = *std::max_element(ri.arr.begin(), ri.arr.end());
2164 	if(!ci.arr.empty())
2165 		locmax_ci = *std::max_element(ci.arr.begin(), ci.arr.end());
2166 
2167 	IT total_m = getnrow();
2168 	IT total_n = getncol();
2169 	if(locmax_ri > total_m || locmax_ci > total_n)
2170 	{
2171 		throw outofrangeexception();
2172 	}
2173 
2174 	SpParMat<IT,NT,DER> S(total_m, total_m, ri, ri, 1);
2175 	SpParMat<IT,NT,DER> SA = Mult_AnXBn_DoubleBuff<PTRing, NT, DER>(S, *this, true, false); // clear memory of S but not *this
2176 
2177 	SpParMat<IT,NT,DER> T(total_n, total_n, ci, ci, 1);
2178 	SpParMat<IT,NT,DER> SAT = Mult_AnXBn_DoubleBuff<PTRing, NT, DER>(SA, T, true, true); // clear memory of SA and T
2179 	EWiseMult(SAT, true);	// In-place EWiseMult with not(SAT)
2180 }
2181 
2182 //! Prune every column of a sparse matrix based on pvals
2183 template <class IT, class NT, class DER>
2184 template <typename _BinaryOperation>
PruneColumn(const FullyDistVec<IT,NT> & pvals,_BinaryOperation __binary_op,bool inPlace)2185 SpParMat<IT,NT,DER> SpParMat<IT,NT,DER>::PruneColumn(const FullyDistVec<IT,NT> & pvals, _BinaryOperation __binary_op, bool inPlace)
2186 {
2187     MPI_Barrier(MPI_COMM_WORLD);
2188     if(getncol() != pvals.TotalLength())
2189     {
2190         std::ostringstream outs;
2191         outs << "Can not prune column-by-column, dimensions does not match"<< std::endl;
2192         outs << getncol() << " != " << pvals.TotalLength() << std::endl;
2193         SpParHelper::Print(outs.str());
2194         MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
2195     }
2196     if(! ( *(getcommgrid()) == *(pvals.getcommgrid())) )
2197     {
2198         std::cout << "Grids are not comparable for PurneColumn" << std::endl;
2199         MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2200     }
2201 
2202     MPI_Comm World = pvals.commGrid->GetWorld();
2203     MPI_Comm ColWorld = pvals.commGrid->GetColWorld();
2204 
2205     int xsize = (int) pvals.LocArrSize();
2206     int trxsize = 0;
2207 
2208 
2209     int diagneigh = pvals.commGrid->GetComplementRank();
2210     MPI_Status status;
2211     MPI_Sendrecv(&xsize, 1, MPI_INT, diagneigh, TRX, &trxsize, 1, MPI_INT, diagneigh, TRX, World, &status);
2212 
2213 
2214     NT * trxnums = new NT[trxsize];
2215     MPI_Sendrecv(const_cast<NT*>(SpHelper::p2a(pvals.arr)), xsize, MPIType<NT>(), diagneigh, TRX, trxnums, trxsize, MPIType<NT>(), diagneigh, TRX, World, &status);
2216 
2217     int colneighs, colrank;
2218     MPI_Comm_size(ColWorld, &colneighs);
2219     MPI_Comm_rank(ColWorld, &colrank);
2220     int * colsize = new int[colneighs];
2221     colsize[colrank] = trxsize;
2222     MPI_Allgather(MPI_IN_PLACE, 1, MPI_INT, colsize, 1, MPI_INT, ColWorld);
2223     int * dpls = new int[colneighs]();	// displacements (zero initialized pid)
2224     std::partial_sum(colsize, colsize+colneighs-1, dpls+1);
2225     int accsize = std::accumulate(colsize, colsize+colneighs, 0);
2226     std::vector<NT> numacc(accsize);
2227 
2228 #ifdef COMBBLAS_DEBUG
2229     std::ostringstream outs2;
2230     outs2 << "PruneColumn displacements: ";
2231     for(int i=0; i< colneighs; ++i)
2232     {
2233 	outs2 << dpls[i] << " ";
2234     }
2235     outs2 << std::endl;
2236     SpParHelper::Print(outs2.str());
2237     MPI_Barrier(World);
2238 #endif
2239 
2240 
2241     MPI_Allgatherv(trxnums, trxsize, MPIType<NT>(), numacc.data(), colsize, dpls, MPIType<NT>(), ColWorld);
2242     delete [] trxnums;
2243     delete [] colsize;
2244     delete [] dpls;
2245 
2246     //sanity check
2247     assert(accsize == getlocalcols());
2248     if (inPlace)
2249     {
2250         spSeq->PruneColumn(numacc.data(), __binary_op, inPlace);
2251         return SpParMat<IT,NT,DER>(getcommgrid()); // return blank to match signature
2252     }
2253     else
2254     {
2255         return SpParMat<IT,NT,DER>(spSeq->PruneColumn(numacc.data(), __binary_op, inPlace), commGrid);
2256     }
2257 }
2258 
2259 
2260 //! Prune columns of a sparse matrix selected by nonzero indices of pvals
2261 //! Each selected column is pruned by corresponding values in pvals
2262 template <class IT, class NT, class DER>
2263 template <typename _BinaryOperation>
PruneColumn(const FullyDistSpVec<IT,NT> & pvals,_BinaryOperation __binary_op,bool inPlace)2264 SpParMat<IT,NT,DER> SpParMat<IT,NT,DER>::PruneColumn(const FullyDistSpVec<IT,NT> & pvals, _BinaryOperation __binary_op, bool inPlace)
2265 {
2266     MPI_Barrier(MPI_COMM_WORLD);
2267     if(getncol() != pvals.TotalLength())
2268     {
2269         std::ostringstream outs;
2270         outs << "Can not prune column-by-column, dimensions does not match"<< std::endl;
2271         outs << getncol() << " != " << pvals.TotalLength() << std::endl;
2272         SpParHelper::Print(outs.str());
2273         MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
2274     }
2275     if(! ( *(getcommgrid()) == *(pvals.getcommgrid())) )
2276     {
2277         std::cout << "Grids are not comparable for PurneColumn" << std::endl;
2278         MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2279     }
2280 
2281     MPI_Comm World = pvals.commGrid->GetWorld();
2282     MPI_Comm ColWorld = pvals.commGrid->GetColWorld();
2283     int diagneigh = pvals.commGrid->GetComplementRank();
2284 
2285     IT xlocnz = pvals.getlocnnz();
2286     IT roffst = pvals.RowLenUntil();
2287     IT roffset;
2288     IT trxlocnz = 0;
2289 
2290     MPI_Status status;
2291     MPI_Sendrecv(&roffst, 1, MPIType<IT>(), diagneigh, TROST, &roffset, 1, MPIType<IT>(), diagneigh, TROST, World, &status);
2292     MPI_Sendrecv(&xlocnz, 1, MPIType<IT>(), diagneigh, TRNNZ, &trxlocnz, 1, MPIType<IT>(), diagneigh, TRNNZ, World, &status);
2293 
2294     std::vector<IT> trxinds (trxlocnz);
2295     std::vector<NT> trxnums (trxlocnz);
2296     MPI_Sendrecv(pvals.ind.data(), xlocnz, MPIType<IT>(), diagneigh, TRI, trxinds.data(), trxlocnz, MPIType<IT>(), diagneigh, TRI, World, &status);
2297     MPI_Sendrecv(pvals.num.data(), xlocnz, MPIType<NT>(), diagneigh, TRX, trxnums.data(), trxlocnz, MPIType<NT>(), diagneigh, TRX, World, &status);
2298     std::transform(trxinds.data(), trxinds.data()+trxlocnz, trxinds.data(), std::bind2nd(std::plus<IT>(), roffset));
2299 
2300 
2301     int colneighs, colrank;
2302     MPI_Comm_size(ColWorld, &colneighs);
2303     MPI_Comm_rank(ColWorld, &colrank);
2304     int * colnz = new int[colneighs];
2305     colnz[colrank] = trxlocnz;
2306     MPI_Allgather(MPI_IN_PLACE, 1, MPI_INT, colnz, 1, MPI_INT, ColWorld);
2307     int * dpls = new int[colneighs]();	// displacements (zero initialized pid)
2308     std::partial_sum(colnz, colnz+colneighs-1, dpls+1);
2309     IT accnz = std::accumulate(colnz, colnz+colneighs, 0);
2310 
2311     std::vector<IT> indacc(accnz);
2312     std::vector<NT> numacc(accnz);
2313     MPI_Allgatherv(trxinds.data(), trxlocnz, MPIType<IT>(), indacc.data(), colnz, dpls, MPIType<IT>(), ColWorld);
2314     MPI_Allgatherv(trxnums.data(), trxlocnz, MPIType<NT>(), numacc.data(), colnz, dpls, MPIType<NT>(), ColWorld);
2315 
2316     delete [] colnz;
2317     delete [] dpls;
2318 
2319 
2320     if (inPlace)
2321     {
2322         spSeq->PruneColumn(indacc.data(), numacc.data(), __binary_op, inPlace);
2323         return SpParMat<IT,NT,DER>(getcommgrid()); // return blank to match signature
2324     }
2325     else
2326     {
2327         return SpParMat<IT,NT,DER>(spSeq->PruneColumn(indacc.data(), numacc.data(), __binary_op, inPlace), commGrid);
2328     }
2329 }
2330 
2331 
2332 
2333 // In-place version where rhs type is the same (no need for type promotion)
2334 template <class IT, class NT, class DER>
EWiseMult(const SpParMat<IT,NT,DER> & rhs,bool exclude)2335 void SpParMat<IT,NT,DER>::EWiseMult (const SpParMat< IT,NT,DER >  & rhs, bool exclude)
2336 {
2337 	if(*commGrid == *rhs.commGrid)
2338 	{
2339 		spSeq->EWiseMult(*(rhs.spSeq), exclude);		// Dimension compatibility check performed by sequential function
2340 	}
2341 	else
2342 	{
2343 		std::cout << "Grids are not comparable, EWiseMult() fails !" << std::endl;
2344 		MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2345 	}
2346 }
2347 
2348 
2349 template <class IT, class NT, class DER>
EWiseScale(const DenseParMat<IT,NT> & rhs)2350 void SpParMat<IT,NT,DER>::EWiseScale(const DenseParMat<IT, NT> & rhs)
2351 {
2352 	if(*commGrid == *rhs.commGrid)
2353 	{
2354 		spSeq->EWiseScale(rhs.array, rhs.m, rhs.n);	// Dimension compatibility check performed by sequential function
2355 	}
2356 	else
2357 	{
2358 		std::cout << "Grids are not comparable, EWiseScale() fails !" << std::endl;
2359 		MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2360 	}
2361 }
2362 
2363 template <class IT, class NT, class DER>
2364 template <typename _BinaryOperation>
UpdateDense(DenseParMat<IT,NT> & rhs,_BinaryOperation __binary_op) const2365 void SpParMat<IT,NT,DER>::UpdateDense(DenseParMat<IT, NT> & rhs, _BinaryOperation __binary_op) const
2366 {
2367 	if(*commGrid == *rhs.commGrid)
2368 	{
2369 		if(getlocalrows() == rhs.m  && getlocalcols() == rhs.n)
2370 		{
2371 			spSeq->UpdateDense(rhs.array, __binary_op);
2372 		}
2373 		else
2374 		{
2375 			std::cout << "Matrices have different dimensions, UpdateDense() fails !" << std::endl;
2376 			MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
2377 		}
2378 	}
2379 	else
2380 	{
2381 		std::cout << "Grids are not comparable, UpdateDense() fails !" << std::endl;
2382 		MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2383 	}
2384 }
2385 
2386 template <class IT, class NT, class DER>
PrintInfo() const2387 void SpParMat<IT,NT,DER>::PrintInfo() const
2388 {
2389 	IT mm = getnrow();
2390 	IT nn = getncol();
2391 	IT nznz = getnnz();
2392 
2393 	if (commGrid->myrank == 0)
2394 		std::cout << "As a whole: " << mm << " rows and "<< nn <<" columns and "<<  nznz << " nonzeros" << std::endl;
2395 
2396 #ifdef DEBUG
2397 	IT allprocs = commGrid->grrows * commGrid->grcols;
2398 	for(IT i=0; i< allprocs; ++i)
2399 	{
2400 		if (commGrid->myrank == i)
2401 		{
2402       std::cout << "Processor (" << commGrid->GetRankInProcRow() << "," << commGrid->GetRankInProcCol() << ")'s data: " << std::endl;
2403 			spSeq->PrintInfo();
2404 		}
2405 		MPI_Barrier(commGrid->GetWorld());
2406 	}
2407 #endif
2408 }
2409 
2410 template <class IT, class NT, class DER>
operator ==(const SpParMat<IT,NT,DER> & rhs) const2411 bool SpParMat<IT,NT,DER>::operator== (const SpParMat<IT,NT,DER> & rhs) const
2412 {
2413 	int local = static_cast<int>((*spSeq) == (*(rhs.spSeq)));
2414 	int whole = 1;
2415 	MPI_Allreduce( &local, &whole, 1, MPI_INT, MPI_BAND, commGrid->GetWorld());
2416 	return static_cast<bool>(whole);
2417 }
2418 
2419 
2420 /**
2421  ** Private function that carries code common to different sparse() constructors
2422  ** Before this call, commGrid is already set
2423  **/
2424 template <class IT, class NT, class DER>
2425 template <typename _BinaryOperation, typename LIT>
SparseCommon(std::vector<std::vector<std::tuple<LIT,LIT,NT>>> & data,LIT locsize,IT total_m,IT total_n,_BinaryOperation BinOp)2426 void SpParMat< IT,NT,DER >::SparseCommon(std::vector< std::vector < std::tuple<LIT,LIT,NT> > > & data, LIT locsize, IT total_m, IT total_n, _BinaryOperation BinOp)
2427 {
2428 	int nprocs = commGrid->GetSize();
2429 	int * sendcnt = new int[nprocs];
2430 	int * recvcnt = new int[nprocs];
2431 	for(int i=0; i<nprocs; ++i)
2432 		sendcnt[i] = data[i].size();	// sizes are all the same
2433 
2434 	MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, commGrid->GetWorld()); // share the counts
2435 	int * sdispls = new int[nprocs]();
2436 	int * rdispls = new int[nprocs]();
2437 	std::partial_sum(sendcnt, sendcnt+nprocs-1, sdispls+1);
2438 	std::partial_sum(recvcnt, recvcnt+nprocs-1, rdispls+1);
2439 	IT totrecv = std::accumulate(recvcnt,recvcnt+nprocs, static_cast<IT>(0));
2440 	IT totsent = std::accumulate(sendcnt,sendcnt+nprocs, static_cast<IT>(0));
2441 
2442 	assert((totsent < std::numeric_limits<int>::max()));
2443 	assert((totrecv < std::numeric_limits<int>::max()));
2444 
2445 
2446 #if 0
2447 	ofstream oput;
2448         commGrid->OpenDebugFile("Displacements", oput);
2449 	copy(sdispls, sdispls+nprocs, ostream_iterator<int>(oput, " "));   oput << endl;
2450 	copy(rdispls, rdispls+nprocs, ostream_iterator<int>(oput, " "));   oput << endl;
2451 	oput.close();
2452 
2453 	IT * gsizes;
2454 	if(commGrid->GetRank() == 0) gsizes = new IT[nprocs];
2455     	MPI_Gather(&totrecv, 1, MPIType<IT>(), gsizes, 1, MPIType<IT>(), 0, commGrid->GetWorld());
2456 	if(commGrid->GetRank() == 0) { std::copy(gsizes, gsizes+nprocs, std::ostream_iterator<IT>(std::cout, " "));   std::cout << std::endl; }
2457 	MPI_Barrier(commGrid->GetWorld());
2458     	MPI_Gather(&totsent, 1, MPIType<IT>(), gsizes, 1, MPIType<IT>(), 0, commGrid->GetWorld());
2459 	if(commGrid->GetRank() == 0) { copy(gsizes, gsizes+nprocs, ostream_iterator<IT>(cout, " "));   cout << endl; }
2460 	MPI_Barrier(commGrid->GetWorld());
2461 	if(commGrid->GetRank() == 0) delete [] gsizes;
2462 #endif
2463 
2464   	std::tuple<LIT,LIT,NT> * senddata = new std::tuple<LIT,LIT,NT>[locsize];	// re-used for both rows and columns
2465 	for(int i=0; i<nprocs; ++i)
2466 	{
2467 		std::copy(data[i].begin(), data[i].end(), senddata+sdispls[i]);
2468 		data[i].clear();	// clear memory
2469 		data[i].shrink_to_fit();
2470 	}
2471 	MPI_Datatype MPI_triple;
2472 	MPI_Type_contiguous(sizeof(std::tuple<LIT,LIT,NT>), MPI_CHAR, &MPI_triple);
2473 	MPI_Type_commit(&MPI_triple);
2474 
2475 	std::tuple<LIT,LIT,NT> * recvdata = new std::tuple<LIT,LIT,NT>[totrecv];
2476 	MPI_Alltoallv(senddata, sendcnt, sdispls, MPI_triple, recvdata, recvcnt, rdispls, MPI_triple, commGrid->GetWorld());
2477 
2478 	DeleteAll(senddata, sendcnt, recvcnt, sdispls, rdispls);
2479 	MPI_Type_free(&MPI_triple);
2480 
2481 	int r = commGrid->GetGridRows();
2482 	int s = commGrid->GetGridCols();
2483 	IT m_perproc = total_m / r;
2484 	IT n_perproc = total_n / s;
2485 	int myprocrow = commGrid->GetRankInProcCol();
2486 	int myproccol = commGrid->GetRankInProcRow();
2487 	IT locrows, loccols;
2488 	if(myprocrow != r-1)	locrows = m_perproc;
2489 	else 	locrows = total_m - myprocrow * m_perproc;
2490 	if(myproccol != s-1)	loccols = n_perproc;
2491 	else	loccols = total_n - myproccol * n_perproc;
2492 
2493 	SpTuples<LIT,NT> A(totrecv, locrows, loccols, recvdata);	// It is ~SpTuples's job to deallocate
2494 
2495     	// the previous constructor sorts based on columns-first (but that doesn't matter as long as they are sorted one way or another)
2496     	A.RemoveDuplicates(BinOp);
2497 
2498   	spSeq = new DER(A,false);        // Convert SpTuples to DER
2499 }
2500 
2501 
2502 //! All vectors are zero-based indexed (as usual)
2503 template <class IT, class NT, class DER>
SpParMat(IT total_m,IT total_n,const FullyDistVec<IT,IT> & distrows,const FullyDistVec<IT,IT> & distcols,const FullyDistVec<IT,NT> & distvals,bool SumDuplicates)2504 SpParMat< IT,NT,DER >::SpParMat (IT total_m, IT total_n, const FullyDistVec<IT,IT> & distrows,
2505 				const FullyDistVec<IT,IT> & distcols, const FullyDistVec<IT,NT> & distvals, bool SumDuplicates)
2506 {
2507 	if((*(distrows.commGrid) != *(distcols.commGrid)) || (*(distcols.commGrid) != *(distvals.commGrid)))
2508 	{
2509 		SpParHelper::Print("Grids are not comparable, Sparse() fails!\n");  // commGrid is not initialized yet
2510 		MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2511 	}
2512 	if((distrows.TotalLength() != distcols.TotalLength()) || (distcols.TotalLength() != distvals.TotalLength()))
2513 	{
2514 		SpParHelper::Print("Vectors have different sizes, Sparse() fails!");
2515 		MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
2516 	}
2517 
2518 	commGrid = distrows.commGrid;
2519 	int nprocs = commGrid->GetSize();
2520 	std::vector< std::vector < std::tuple<IT,IT,NT> > > data(nprocs);
2521 
2522 	IT locsize = distrows.LocArrSize();
2523 	for(IT i=0; i<locsize; ++i)
2524 	{
2525 		IT lrow, lcol;
2526 		int owner = Owner(total_m, total_n, distrows.arr[i], distcols.arr[i], lrow, lcol);
2527 		data[owner].push_back(std::make_tuple(lrow,lcol,distvals.arr[i]));
2528 	}
2529     if(SumDuplicates)
2530     {
2531         SparseCommon(data, locsize, total_m, total_n, std::plus<NT>());
2532     }
2533     else
2534     {
2535         SparseCommon(data, locsize, total_m, total_n, maximum<NT>());
2536     }
2537 }
2538 
2539 
2540 
2541 template <class IT, class NT, class DER>
SpParMat(IT total_m,IT total_n,const FullyDistVec<IT,IT> & distrows,const FullyDistVec<IT,IT> & distcols,const NT & val,bool SumDuplicates)2542 SpParMat< IT,NT,DER >::SpParMat (IT total_m, IT total_n, const FullyDistVec<IT,IT> & distrows,
2543 				const FullyDistVec<IT,IT> & distcols, const NT & val, bool SumDuplicates)
2544 {
2545 	if((*(distrows.commGrid) != *(distcols.commGrid)) )
2546 	{
2547 		SpParHelper::Print("Grids are not comparable, Sparse() fails!\n");
2548 		MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2549 	}
2550 	if((distrows.TotalLength() != distcols.TotalLength()) )
2551 	{
2552 		SpParHelper::Print("Vectors have different sizes, Sparse() fails!\n");
2553 		MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
2554 	}
2555 	commGrid = distrows.commGrid;
2556 	int nprocs = commGrid->GetSize();
2557 	std::vector< std::vector < std::tuple<IT,IT,NT> > > data(nprocs);
2558 
2559 	IT locsize = distrows.LocArrSize();
2560 	for(IT i=0; i<locsize; ++i)
2561 	{
2562 		IT lrow, lcol;
2563 		int owner = Owner(total_m, total_n, distrows.arr[i], distcols.arr[i], lrow, lcol);
2564 		data[owner].push_back(std::make_tuple(lrow,lcol,val));
2565 	}
2566     if(SumDuplicates)
2567     {
2568         SparseCommon(data, locsize, total_m, total_n, std::plus<NT>());
2569     }
2570     else
2571     {
2572         SparseCommon(data, locsize, total_m, total_n, maximum<NT>());
2573     }
2574 }
2575 
2576 template <class IT, class NT, class DER>
2577 template <class DELIT>
SpParMat(const DistEdgeList<DELIT> & DEL,bool removeloops)2578 SpParMat< IT,NT,DER >::SpParMat (const DistEdgeList<DELIT> & DEL, bool removeloops)
2579 {
2580 	commGrid = DEL.commGrid;
2581 	typedef typename DER::LocalIT LIT;
2582 
2583 	int nprocs = commGrid->GetSize();
2584 	int gridrows = commGrid->GetGridRows();
2585 	int gridcols = commGrid->GetGridCols();
2586 	std::vector< std::vector<LIT> > data(nprocs);	// enties are pre-converted to local indices before getting pushed into "data"
2587 
2588 	LIT m_perproc = DEL.getGlobalV() / gridrows;
2589 	LIT n_perproc = DEL.getGlobalV() / gridcols;
2590 
2591 	if(sizeof(LIT) < sizeof(DELIT))
2592 	{
2593 		std::ostringstream outs;
2594 		outs << "Warning: Using smaller indices for the matrix than DistEdgeList\n";
2595 		outs << "Local matrices are " << m_perproc << "-by-" << n_perproc << std::endl;
2596 		SpParHelper::Print(outs.str(), commGrid->GetWorld());   // commgrid initialized
2597 	}
2598 
2599     LIT stages = MEM_EFFICIENT_STAGES;		// to lower memory consumption, form sparse matrix in stages
2600 
2601 	// even if local indices (LIT) are 32-bits, we should work with 64-bits for global info
2602 	int64_t perstage = DEL.nedges / stages;
2603 	LIT totrecv = 0;
2604 	std::vector<LIT> alledges;
2605 
2606 	for(LIT s=0; s< stages; ++s)
2607 	{
2608 		int64_t n_befor = s*perstage;
2609 		int64_t n_after= ((s==(stages-1))? DEL.nedges : ((s+1)*perstage));
2610 
2611 		// clear the source vertex by setting it to -1
2612 		int realedges = 0;	// these are "local" realedges
2613 
2614 		if(DEL.pedges)
2615 		{
2616 			for (int64_t i = n_befor; i < n_after; i++)
2617 			{
2618 				int64_t fr = get_v0_from_edge(&(DEL.pedges[i]));
2619 				int64_t to = get_v1_from_edge(&(DEL.pedges[i]));
2620 
2621 				if(fr >= 0 && to >= 0)	// otherwise skip
2622 				{
2623                     IT lrow, lcol;
2624                     int owner = Owner(DEL.getGlobalV(), DEL.getGlobalV(), fr, to, lrow, lcol);
2625 					data[owner].push_back(lrow);	// row_id
2626 					data[owner].push_back(lcol);	// col_id
2627 					++realedges;
2628 				}
2629 			}
2630 		}
2631 		else
2632 		{
2633 			for (int64_t i = n_befor; i < n_after; i++)
2634 			{
2635 				if(DEL.edges[2*i+0] >= 0 && DEL.edges[2*i+1] >= 0)	// otherwise skip
2636 				{
2637                     IT lrow, lcol;
2638                     int owner = Owner(DEL.getGlobalV(), DEL.getGlobalV(), DEL.edges[2*i+0], DEL.edges[2*i+1], lrow, lcol);
2639 					data[owner].push_back(lrow);
2640 					data[owner].push_back(lcol);
2641 					++realedges;
2642 				}
2643 			}
2644 		}
2645 
2646   		LIT * sendbuf = new LIT[2*realedges];
2647 		int * sendcnt = new int[nprocs];
2648 		int * sdispls = new int[nprocs];
2649 		for(int i=0; i<nprocs; ++i)
2650 			sendcnt[i] = data[i].size();
2651 
2652 		int * rdispls = new int[nprocs];
2653 		int * recvcnt = new int[nprocs];
2654 		MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT,commGrid->GetWorld()); // share the counts
2655 
2656 		sdispls[0] = 0;
2657 		rdispls[0] = 0;
2658 		for(int i=0; i<nprocs-1; ++i)
2659 		{
2660 			sdispls[i+1] = sdispls[i] + sendcnt[i];
2661 			rdispls[i+1] = rdispls[i] + recvcnt[i];
2662 		}
2663 		for(int i=0; i<nprocs; ++i)
2664 			std::copy(data[i].begin(), data[i].end(), sendbuf+sdispls[i]);
2665 
2666 		// clear memory
2667 		for(int i=0; i<nprocs; ++i)
2668 			std::vector<LIT>().swap(data[i]);
2669 
2670 		// ABAB: Total number of edges received might not be LIT-addressible
2671 		// However, each edge_id is LIT-addressible
2672 		IT thisrecv = std::accumulate(recvcnt,recvcnt+nprocs, static_cast<IT>(0));	// thisrecv = 2*locedges
2673 		LIT * recvbuf = new LIT[thisrecv];
2674 		totrecv += thisrecv;
2675 
2676 		MPI_Alltoallv(sendbuf, sendcnt, sdispls, MPIType<LIT>(), recvbuf, recvcnt, rdispls, MPIType<LIT>(), commGrid->GetWorld());
2677 		DeleteAll(sendcnt, recvcnt, sdispls, rdispls,sendbuf);
2678     std::copy (recvbuf,recvbuf+thisrecv,std::back_inserter(alledges));	// copy to all edges
2679 		delete [] recvbuf;
2680 	}
2681 
2682 	int myprocrow = commGrid->GetRankInProcCol();
2683 	int myproccol = commGrid->GetRankInProcRow();
2684 	LIT locrows, loccols;
2685 	if(myprocrow != gridrows-1)	locrows = m_perproc;
2686 	else 	locrows = DEL.getGlobalV() - myprocrow * m_perproc;
2687 	if(myproccol != gridcols-1)	loccols = n_perproc;
2688 	else	loccols = DEL.getGlobalV() - myproccol * n_perproc;
2689 
2690   	SpTuples<LIT,NT> A(totrecv/2, locrows, loccols, alledges, removeloops);  	// alledges is empty upon return
2691   	spSeq = new DER(A,false);        // Convert SpTuples to DER
2692 }
2693 
2694 template <class IT, class NT, class DER>
2695 IT SpParMat<IT,NT,DER>::RemoveLoops()
2696 {
2697 	MPI_Comm DiagWorld = commGrid->GetDiagWorld();
2698 	IT totrem;
2699 	IT removed = 0;
2700 	if(DiagWorld != MPI_COMM_NULL) // Diagonal processors only
2701 	{
2702 		SpTuples<IT,NT> tuples(*spSeq);
2703 		delete spSeq;
2704 		removed  = tuples.RemoveLoops();
2705 		spSeq = new DER(tuples, false);	// Convert to DER
2706 	}
2707 	MPI_Allreduce( &removed, & totrem, 1, MPIType<IT>(), MPI_SUM, commGrid->GetWorld());
2708 	return totrem;
2709 }
2710 
2711 
2712 
2713 template <class IT, class NT, class DER>
AddLoops(NT loopval,bool replaceExisting)2714 void SpParMat<IT,NT,DER>::AddLoops(NT loopval, bool replaceExisting)
2715 {
2716 	MPI_Comm DiagWorld = commGrid->GetDiagWorld();
2717 	if(DiagWorld != MPI_COMM_NULL) // Diagonal processors only
2718 	{
2719     		typedef typename DER::LocalIT LIT;
2720 		SpTuples<LIT,NT> tuples(*spSeq);
2721 		delete spSeq;
2722 		tuples.AddLoops(loopval, replaceExisting);
2723         	tuples.SortColBased();
2724 		spSeq = new DER(tuples, false);	// Convert to DER
2725 	}
2726 }
2727 
2728 
2729 // Different values on the diagonal
2730 template <class IT, class NT, class DER>
AddLoops(FullyDistVec<IT,NT> loopvals,bool replaceExisting)2731 void SpParMat<IT,NT,DER>::AddLoops(FullyDistVec<IT,NT> loopvals, bool replaceExisting)
2732 {
2733 
2734 
2735     if(*loopvals.commGrid != *commGrid)
2736     {
2737         SpParHelper::Print("Grids are not comparable, SpParMat::AddLoops() fails!\n", commGrid->GetWorld());
2738         MPI_Abort(MPI_COMM_WORLD,GRIDMISMATCH);
2739     }
2740     if (getncol()!= loopvals.TotalLength())
2741     {
2742         SpParHelper::Print("The number of entries in loopvals is not equal to the number of diagonal entries.\n");
2743         MPI_Abort(MPI_COMM_WORLD,DIMMISMATCH);
2744     }
2745 
2746     // Gather data on the diagonal processor
2747     IT locsize = loopvals.LocArrSize();
2748     int rowProcs = commGrid->GetGridCols();
2749     std::vector<int> recvcnt(rowProcs, 0);
2750     std::vector<int> rdpls(rowProcs, 0);
2751     MPI_Gather(&locsize, 1, MPI_INT, recvcnt.data(), 1, MPI_INT, commGrid->GetDiagOfProcRow(), commGrid->GetRowWorld());
2752     std::partial_sum(recvcnt.data(), recvcnt.data()+rowProcs-1, rdpls.data()+1);
2753 
2754     IT totrecv = rdpls[rowProcs-1] + recvcnt[rowProcs-1];
2755     assert((totrecv < std::numeric_limits<int>::max()));
2756 
2757     std::vector<NT> rowvals(totrecv);
2758 	MPI_Gatherv(loopvals.arr.data(), locsize, MPIType<NT>(), rowvals.data(), recvcnt.data(), rdpls.data(),
2759                  MPIType<NT>(), commGrid->GetDiagOfProcRow(), commGrid->GetRowWorld());
2760 
2761 
2762     MPI_Comm DiagWorld = commGrid->GetDiagWorld();
2763     if(DiagWorld != MPI_COMM_NULL) // Diagonal processors only
2764     {
2765         typedef typename DER::LocalIT LIT;
2766         SpTuples<LIT,NT> tuples(*spSeq);
2767         delete spSeq;
2768         tuples.AddLoops(rowvals, replaceExisting);
2769         tuples.SortColBased();
2770         spSeq = new DER(tuples, false);	// Convert to DER
2771     }
2772 }
2773 
2774 
2775 //! Pre-allocates buffers for row communication
2776 //! additionally (if GATHERVOPT is defined, incomplete as of March 2016):
2777 //! - Splits the local column indices to sparse & dense pieces to avoid redundant AllGather (sparse pieces get p2p)
2778 template <class IT, class NT, class DER>
2779 template <typename LIT, typename OT>
OptimizeForGraph500(OptBuf<LIT,OT> & optbuf)2780 void SpParMat<IT,NT,DER>::OptimizeForGraph500(OptBuf<LIT,OT> & optbuf)
2781 {
2782 	if(spSeq->getnsplit() > 0)
2783 	{
2784 		SpParHelper::Print("Can not declare preallocated buffers for multithreaded execution\n", commGrid->GetWorld());
2785 		return;
2786     }
2787 
2788     typedef typename DER::LocalIT LocIT;    // ABAB: should match the type of LIT. Check?
2789 
2790     // Set up communication buffers, one for all
2791 	LocIT mA = spSeq->getnrow();
2792     LocIT nA = spSeq->getncol();
2793 
2794 	int p_c = commGrid->GetGridCols();
2795     int p_r = commGrid->GetGridRows();
2796 
2797     LocIT rwperproc = mA / p_c; // per processors in row-wise communication
2798     LocIT cwperproc = nA / p_r; // per processors in column-wise communication
2799 
2800 #ifdef GATHERVOPT
2801     LocIT * colinds = seq->GetDCSC()->jc;   // local nonzero column id's
2802     LocIT locnzc = seq->getnzc();
2803     LocIT cci = 0;  // index to column id's array (cci: current column index)
2804     int * gsizes = NULL;
2805     IT * ents = NULL;
2806     IT * dpls = NULL;
2807     std::vector<LocIT> pack2send;
2808 
2809     FullyDistSpVec<IT,IT> dummyRHS ( commGrid, getncol()); // dummy RHS vector to estimate index start position
2810     IT recveclen;
2811 
2812     for(int pid = 1; pid <= p_r; pid++)
2813     {
2814         IT diagoffset;
2815         MPI_Status status;
2816         IT offset = dummyRHS.RowLenUntil(pid-1);
2817         int diagneigh = commGrid->GetComplementRank();
2818         MPI_Sendrecv(&offset, 1, MPIType<IT>(), diagneigh, TRTAGNZ, &diagoffset, 1, MPIType<IT>(), diagneigh, TRTAGNZ, commGrid->GetWorld(), &status);
2819 
2820         LocIT endind = (pid == p_r)? nA : static_cast<LocIT>(pid) * cwperproc;     // the last one might have a larger share (is this fitting to the vector boundaries?)
2821         while(cci < locnzc && colinds[cci] < endind)
2822         {
2823             pack2send.push_back(colinds[cci++]-diagoffset);
2824         }
2825         if(pid-1 == myrank) gsizes = new int[p_r];
2826         MPI_Gather(&mysize, 1, MPI_INT, gsizes, 1, MPI_INT, pid-1, commGrid->GetColWorld());
2827         if(pid-1 == myrank)
2828         {
2829             IT colcnt = std::accumulate(gsizes, gsizes+p_r, static_cast<IT>(0));
2830             recvbuf = new IT[colcnt];
2831             dpls = new IT[p_r]();     // displacements (zero initialized pid)
2832             std::partial_sum(gsizes, gsizes+p_r-1, dpls+1);
2833         }
2834 
2835         // int MPI_Gatherv (void* sbuf, int scount, MPI_Datatype stype, void* rbuf, int *rcount, int* displs, MPI_Datatype rtype, int root, MPI_Comm comm)
2836         MPI_Gatherv(SpHelper::p2a(pack2send), mysize, MPIType<LocIT>(), recvbuf, gsizes, dpls, MPIType<LocIT>(), pid-1, commGrid->GetColWorld());
2837         std::vector<LocIT>().swap(pack2send);
2838 
2839        if(pid-1 == myrank)
2840        {
2841            recveclen = dummyRHS.MyLocLength();
2842            std::vector< std::vector<LocIT> > service(recveclen);
2843            for(int i=0; i< p_r; ++i)
2844            {
2845                for(int j=0; j< gsizes[i]; ++j)
2846                {
2847                    IT colid2update = recvbuf[dpls[i]+j];
2848                    if(service[colid2update].size() < GATHERVNEIGHLIMIT)
2849                    {
2850                        service.push_back(i);
2851                    }
2852                    // else don't increase any further and mark it done after the iterations are complete
2853                }
2854            }
2855        }
2856     }
2857 #endif
2858 
2859 
2860 	std::vector<bool> isthere(mA, false); // perhaps the only appropriate use of this crippled data structure
2861 	std::vector<int> maxlens(p_c,0);	// maximum data size to be sent to any neighbor along the processor row
2862 
2863 	for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit)
2864 	{
2865 		for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit)
2866 		{
2867 			LocIT rowid = nzit.rowid();
2868 			if(!isthere[rowid])
2869 			{
2870 				LocIT owner = std::min(nzit.rowid() / rwperproc, (LocIT) p_c-1);
2871 				maxlens[owner]++;
2872 				isthere[rowid] = true;
2873 			}
2874 		}
2875 	}
2876 	SpParHelper::Print("Optimization buffers set\n", commGrid->GetWorld());
2877 	optbuf.Set(maxlens,mA);
2878 }
2879 
2880 template <class IT, class NT, class DER>
ActivateThreading(int numsplits)2881 void SpParMat<IT,NT,DER>::ActivateThreading(int numsplits)
2882 {
2883 	spSeq->RowSplit(numsplits);
2884 }
2885 
2886 
2887 /**
2888  * Parallel routine that returns A*A on the semiring SR
2889  * Uses only MPI-1 features (relies on simple blocking broadcast)
2890  **/
2891 template <class IT, class NT, class DER>
2892 template <typename SR>
Square()2893 void SpParMat<IT,NT,DER>::Square ()
2894 {
2895 	int stages, dummy; 	// last two parameters of productgrid are ignored for synchronous multiplication
2896 	std::shared_ptr<CommGrid> Grid = ProductGrid(commGrid.get(), commGrid.get(), stages, dummy, dummy);
2897 
2898 	typedef typename DER::LocalIT LIT;
2899 
2900 	LIT AA_m = spSeq->getnrow();
2901 	LIT AA_n = spSeq->getncol();
2902 
2903 	DER seqTrn = spSeq->TransposeConst();	// will be automatically discarded after going out of scope
2904 
2905 	MPI_Barrier(commGrid->GetWorld());
2906 
2907 	LIT ** NRecvSizes = SpHelper::allocate2D<LIT>(DER::esscount, stages);
2908 	LIT ** TRecvSizes = SpHelper::allocate2D<LIT>(DER::esscount, stages);
2909 
2910 	SpParHelper::GetSetSizes( *spSeq, NRecvSizes, commGrid->GetRowWorld());
2911 	SpParHelper::GetSetSizes( seqTrn, TRecvSizes, commGrid->GetColWorld());
2912 
2913 	// Remotely fetched matrices are stored as pointers
2914 	DER * NRecv;
2915 	DER * TRecv;
2916 	std::vector< SpTuples<LIT,NT>  *> tomerge;
2917 
2918 	int Nself = commGrid->GetRankInProcRow();
2919 	int Tself = commGrid->GetRankInProcCol();
2920 
2921 	for(int i = 0; i < stages; ++i)
2922     {
2923 		std::vector<LIT> ess;
2924 		if(i == Nself)  NRecv = spSeq;	// shallow-copy
2925 		else
2926 		{
2927 			ess.resize(DER::esscount);
2928 			for(int j=0; j< DER::esscount; ++j)
2929 				ess[j] = NRecvSizes[j][i];		// essentials of the ith matrix in this row
2930 			NRecv = new DER();				// first, create the object
2931 		}
2932 
2933 		SpParHelper::BCastMatrix(Grid->GetRowWorld(), *NRecv, ess, i);	// then, broadcast its elements
2934 		ess.clear();
2935 
2936 		if(i == Tself)  TRecv = &seqTrn;	// shallow-copy
2937 		else
2938 		{
2939 			ess.resize(DER::esscount);
2940 			for(int j=0; j< DER::esscount; ++j)
2941 				ess[j] = TRecvSizes[j][i];
2942 			TRecv = new DER();
2943 		}
2944 		SpParHelper::BCastMatrix(Grid->GetColWorld(), *TRecv, ess, i);
2945 
2946 		SpTuples<LIT,NT> * AA_cont = MultiplyReturnTuples<SR, NT>(*NRecv, *TRecv, false, true);
2947 		if(!AA_cont->isZero())
2948 			tomerge.push_back(AA_cont);
2949 
2950 		if(i != Nself)	delete NRecv;
2951 		if(i != Tself)  delete TRecv;
2952 	}
2953 
2954 	SpHelper::deallocate2D(NRecvSizes, DER::esscount);
2955 	SpHelper::deallocate2D(TRecvSizes, DER::esscount);
2956 
2957 	delete spSeq;
2958 	spSeq = new DER(MergeAll<SR>(tomerge, AA_m, AA_n), false);	// First get the result in SpTuples, then convert to UDER
2959 	for(unsigned int i=0; i<tomerge.size(); ++i)
2960 		delete tomerge[i];
2961 }
2962 
2963 
2964 template <class IT, class NT, class DER>
Transpose()2965 void SpParMat<IT,NT,DER>::Transpose()
2966 {
2967 	if(commGrid->myproccol == commGrid->myprocrow)	// Diagonal
2968 	{
2969 		spSeq->Transpose();
2970 	}
2971 	else
2972 	{
2973 		typedef typename DER::LocalIT LIT;
2974 		SpTuples<LIT,NT> Atuples(*spSeq);
2975 		LIT locnnz = Atuples.getnnz();
2976 		LIT * rows = new LIT[locnnz];
2977 		LIT * cols = new LIT[locnnz];
2978 		NT * vals = new NT[locnnz];
2979 		for(LIT i=0; i < locnnz; ++i)
2980 		{
2981 			rows[i] = Atuples.colindex(i);	// swap (i,j) here
2982 			cols[i] = Atuples.rowindex(i);
2983 			vals[i] = Atuples.numvalue(i);
2984 		}
2985 		LIT locm = getlocalcols();
2986 		LIT locn = getlocalrows();
2987 		delete spSeq;
2988 
2989 		LIT remotem, remoten, remotennz;
2990 		std::swap(locm,locn);
2991 		int diagneigh = commGrid->GetComplementRank();
2992 
2993 		MPI_Status status;
2994 		MPI_Sendrecv(&locnnz, 1, MPIType<LIT>(), diagneigh, TRTAGNZ, &remotennz, 1, MPIType<LIT>(), diagneigh, TRTAGNZ, commGrid->GetWorld(), &status);
2995 		MPI_Sendrecv(&locn, 1, MPIType<LIT>(), diagneigh, TRTAGM, &remotem, 1, MPIType<LIT>(), diagneigh, TRTAGM, commGrid->GetWorld(), &status);
2996 		MPI_Sendrecv(&locm, 1, MPIType<LIT>(), diagneigh, TRTAGN, &remoten, 1, MPIType<LIT>(), diagneigh, TRTAGN, commGrid->GetWorld(), &status);
2997 
2998 		LIT * rowsrecv = new LIT[remotennz];
2999 		MPI_Sendrecv(rows, locnnz, MPIType<LIT>(), diagneigh, TRTAGROWS, rowsrecv, remotennz, MPIType<LIT>(), diagneigh, TRTAGROWS, commGrid->GetWorld(), &status);
3000 		delete [] rows;
3001 
3002 		LIT * colsrecv = new LIT[remotennz];
3003 		MPI_Sendrecv(cols, locnnz, MPIType<LIT>(), diagneigh, TRTAGCOLS, colsrecv, remotennz, MPIType<LIT>(), diagneigh, TRTAGCOLS, commGrid->GetWorld(), &status);
3004 		delete [] cols;
3005 
3006 		NT * valsrecv = new NT[remotennz];
3007 		MPI_Sendrecv(vals, locnnz, MPIType<NT>(), diagneigh, TRTAGVALS, valsrecv, remotennz, MPIType<NT>(), diagneigh, TRTAGVALS, commGrid->GetWorld(), &status);
3008 		delete [] vals;
3009 
3010 		std::tuple<LIT,LIT,NT> * arrtuples = new std::tuple<LIT,LIT,NT>[remotennz];
3011 		for(LIT i=0; i< remotennz; ++i)
3012 		{
3013 			arrtuples[i] = std::make_tuple(rowsrecv[i], colsrecv[i], valsrecv[i]);
3014 		}
3015 		DeleteAll(rowsrecv, colsrecv, valsrecv);
3016 		ColLexiCompare<LIT,NT> collexicogcmp;
3017 		sort(arrtuples , arrtuples+remotennz, collexicogcmp );	// sort w.r.t columns here
3018 
3019 		spSeq = new DER();
3020 		spSeq->Create( remotennz, remotem, remoten, arrtuples);		// the deletion of arrtuples[] is handled by SpMat::Create
3021 	}
3022 }
3023 
3024 
3025 template <class IT, class NT, class DER>
3026 template <class HANDLER>
SaveGathered(std::string filename,HANDLER handler,bool transpose) const3027 void SpParMat< IT,NT,DER >::SaveGathered(std::string filename, HANDLER handler, bool transpose) const
3028 {
3029 	int proccols = commGrid->GetGridCols();
3030 	int procrows = commGrid->GetGridRows();
3031 	IT totalm = getnrow();
3032 	IT totaln = getncol();
3033 	IT totnnz = getnnz();
3034 	int flinelen = 0;
3035 	std::ofstream out;
3036 	if(commGrid->GetRank() == 0)
3037 	{
3038 		std::string s;
3039 		std::stringstream strm;
3040 		strm << "%%MatrixMarket matrix coordinate real general" << std::endl;
3041 		strm << totalm << " " << totaln << " " << totnnz << std::endl;
3042 		s = strm.str();
3043 		out.open(filename.c_str(),std::ios_base::trunc);
3044 		flinelen = s.length();
3045 		out.write(s.c_str(), flinelen);
3046 		out.close();
3047 	}
3048 	int colrank = commGrid->GetRankInProcCol();
3049 	int colneighs = commGrid->GetGridRows();
3050 	IT * locnrows = new IT[colneighs];	// number of rows is calculated by a reduction among the processor column
3051 	locnrows[colrank] = (IT) getlocalrows();
3052 	MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(),locnrows, 1, MPIType<IT>(), commGrid->GetColWorld());
3053 	IT roffset = std::accumulate(locnrows, locnrows+colrank, 0);
3054 	delete [] locnrows;
3055 
3056 	MPI_Datatype datatype;
3057 	MPI_Type_contiguous(sizeof(std::pair<IT,NT>), MPI_CHAR, &datatype);
3058 	MPI_Type_commit(&datatype);
3059 
3060 	for(int i = 0; i < procrows; i++)	// for all processor row (in order)
3061 	{
3062 		if(commGrid->GetRankInProcCol() == i)	// only the ith processor row
3063 		{
3064 			IT localrows = spSeq->getnrow();    // same along the processor row
3065 			std::vector< std::vector< std::pair<IT,NT> > > csr(localrows);
3066 			if(commGrid->GetRankInProcRow() == 0)	// get the head of processor row
3067 			{
3068 				IT localcols = spSeq->getncol();    // might be different on the last processor on this processor row
3069 				MPI_Bcast(&localcols, 1, MPIType<IT>(), 0, commGrid->GetRowWorld());
3070 				for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit)	// iterate over nonempty subcolumns
3071 				{
3072 					for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit)
3073 					{
3074 						csr[nzit.rowid()].push_back( std::make_pair(colit.colid(), nzit.value()) );
3075 					}
3076 				}
3077 			}
3078 			else	// get the rest of the processors
3079 			{
3080 				IT n_perproc;
3081 				MPI_Bcast(&n_perproc, 1, MPIType<IT>(), 0, commGrid->GetRowWorld());
3082 				IT noffset = commGrid->GetRankInProcRow() * n_perproc;
3083 				for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit)	// iterate over nonempty subcolumns
3084 				{
3085 					for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit)
3086 					{
3087 						csr[nzit.rowid()].push_back( std::make_pair(colit.colid() + noffset, nzit.value()) );
3088 					}
3089 				}
3090 			}
3091 			std::pair<IT,NT> * ents = NULL;
3092 			int * gsizes = NULL, * dpls = NULL;
3093 			if(commGrid->GetRankInProcRow() == 0)	// only the head of processor row
3094 			{
3095 				out.open(filename.c_str(),std::ios_base::app);
3096 				gsizes = new int[proccols];
3097 				dpls = new int[proccols]();	// displacements (zero initialized pid)
3098 			}
3099 			for(int j = 0; j < localrows; ++j)
3100 			{
3101 				IT rowcnt = 0;
3102 				sort(csr[j].begin(), csr[j].end());
3103 				int mysize = csr[j].size();
3104 				MPI_Gather(&mysize, 1, MPI_INT, gsizes, 1, MPI_INT, 0, commGrid->GetRowWorld());
3105 				if(commGrid->GetRankInProcRow() == 0)
3106 				{
3107 					rowcnt = std::accumulate(gsizes, gsizes+proccols, static_cast<IT>(0));
3108 					std::partial_sum(gsizes, gsizes+proccols-1, dpls+1);
3109 					ents = new std::pair<IT,NT>[rowcnt];	// nonzero entries in the j'th local row
3110 				}
3111 
3112 				// int MPI_Gatherv (void* sbuf, int scount, MPI_Datatype stype,
3113 				// 		    void* rbuf, int *rcount, int* displs, MPI_Datatype rtype, int root, MPI_Comm comm)
3114 				MPI_Gatherv(SpHelper::p2a(csr[j]), mysize, datatype, ents, gsizes, dpls, datatype, 0, commGrid->GetRowWorld());
3115 				if(commGrid->GetRankInProcRow() == 0)
3116 				{
3117 					for(int k=0; k< rowcnt; ++k)
3118 					{
3119 						//out << j + roffset + 1 << "\t" << ents[k].first + 1 <<"\t" << ents[k].second << endl;
3120 						if (!transpose)
3121 							// regular
3122 							out << j + roffset + 1 << "\t" << ents[k].first + 1 << "\t";
3123 						else
3124 							// transpose row/column
3125 							out << ents[k].first + 1 << "\t" << j + roffset + 1 << "\t";
3126 						handler.save(out, ents[k].second, j + roffset, ents[k].first);
3127 						out << std::endl;
3128 					}
3129 					delete [] ents;
3130 				}
3131 			}
3132 			if(commGrid->GetRankInProcRow() == 0)
3133 			{
3134 				DeleteAll(gsizes, dpls);
3135 				out.close();
3136 			}
3137 		} // end_if the ith processor row
3138 		MPI_Barrier(commGrid->GetWorld());		// signal the end of ith processor row iteration (so that all processors block)
3139 	}
3140 }
3141 
3142 
3143 //! Private subroutine of ReadGeneralizedTuples
3144 //! totallength is the length of the dictionary, which we don't know in this labeled tuples format apriori
3145 template <class IT, class NT, class DER>
TupleRead1stPassNExchange(const std::string & filename,TYPE2SEND * & senddata,IT & totsend,FullyDistVec<IT,STRASARRAY> & distmapper,uint64_t & totallength)3146 MPI_File SpParMat< IT,NT,DER >::TupleRead1stPassNExchange (const std::string & filename, TYPE2SEND * & senddata, IT & totsend,
3147 							FullyDistVec<IT,STRASARRAY> & distmapper, uint64_t & totallength)
3148 {
3149     int myrank = commGrid->GetRank();
3150     int nprocs = commGrid->GetSize();
3151 
3152     MPI_Offset fpos, end_fpos;
3153     struct stat st;     // get file size
3154     if (stat(filename.c_str(), &st) == -1)
3155     {
3156         MPI_Abort(MPI_COMM_WORLD, NOFILE);
3157     }
3158     int64_t file_size = st.st_size;
3159     if(myrank == 0)    // the offset needs to be for this rank
3160     {
3161         std::cout << "File is " << file_size << " bytes" << std::endl;
3162     }
3163     fpos = myrank * file_size / nprocs;
3164 
3165     if(myrank != (nprocs-1)) end_fpos = (myrank + 1) * file_size / nprocs;
3166     else end_fpos = file_size;
3167 
3168     MPI_File mpi_fh;
3169     MPI_File_open (commGrid->commWorld, const_cast<char*>(filename.c_str()), MPI_MODE_RDONLY, MPI_INFO_NULL, &mpi_fh);
3170 
3171     typedef std::map<std::string, uint64_t> KEYMAP; // due to potential (but extremely unlikely) collusions in MurmurHash, make the key to the std:map the string itself
3172     std::vector< KEYMAP > allkeys(nprocs);	  // map keeps the outgoing data unique, we could have applied this to HipMer too
3173 
3174     std::vector<std::string> lines;
3175     bool finished = SpParHelper::FetchBatch(mpi_fh, fpos, end_fpos, true, lines, myrank);
3176     int64_t entriesread = lines.size();
3177     SpHelper::ProcessLinesWithStringKeys(allkeys, lines,nprocs);
3178 
3179     while(!finished)
3180     {
3181         finished = SpParHelper::FetchBatch(mpi_fh, fpos, end_fpos, false, lines, myrank);
3182 
3183         entriesread += lines.size();
3184     	SpHelper::ProcessLinesWithStringKeys(allkeys, lines,nprocs);
3185     }
3186     int64_t allentriesread;
3187     MPI_Reduce(&entriesread, &allentriesread, 1, MPIType<int64_t>(), MPI_SUM, 0, commGrid->commWorld);
3188 #ifdef COMBBLAS_DEBUG
3189     if(myrank == 0)
3190         std::cout << "Initial reading finished. Total number of entries read across all processors is " << allentriesread << std::endl;
3191 #endif
3192 
3193     int * sendcnt = new int[nprocs];
3194     int * recvcnt = new int[nprocs];
3195     for(int i=0; i<nprocs; ++i)
3196 	sendcnt[i] = allkeys[i].size();
3197 
3198     MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, commGrid->GetWorld()); // share the counts
3199     int * sdispls = new int[nprocs]();
3200     int * rdispls = new int[nprocs]();
3201     std::partial_sum(sendcnt, sendcnt+nprocs-1, sdispls+1);
3202     std::partial_sum(recvcnt, recvcnt+nprocs-1, rdispls+1);
3203     totsend = std::accumulate(sendcnt,sendcnt+nprocs, static_cast<IT>(0));
3204     IT totrecv = std::accumulate(recvcnt,recvcnt+nprocs, static_cast<IT>(0));
3205 
3206     assert((totsend < std::numeric_limits<int>::max()));
3207     assert((totrecv < std::numeric_limits<int>::max()));
3208 
3209     // The following are declared in SpParMat.h
3210     // typedef std::array<char, MAXVERTNAME> STRASARRAY;
3211     // typedef std::pair< STRASARRAY, uint64_t> TYPE2SEND;
3212     senddata = new TYPE2SEND[totsend];
3213 
3214     #pragma omp parallel for
3215     for(int i=0; i<nprocs; ++i)
3216     {
3217 	size_t j = 0;
3218 	for(auto pobj:allkeys[i])
3219 	{
3220 		// The naked C-style array type is not copyable or assignable, but pair will require it, hence used std::array
3221 		std::array<char, MAXVERTNAME> vname;
3222 		std::copy( pobj.first.begin(), pobj.first.end(), vname.begin() );
3223 		if(pobj.first.length() < MAXVERTNAME)  vname[pobj.first.length()] = '\0';	// null termination
3224 
3225 		senddata[sdispls[i]+j] = TYPE2SEND(vname, pobj.second);
3226 		j++;
3227 	}
3228     }
3229     allkeys.clear();  // allkeys is no longer needed after this point
3230 
3231     MPI_Datatype MPI_HASH;
3232     MPI_Type_contiguous(sizeof(TYPE2SEND), MPI_CHAR, &MPI_HASH);
3233     MPI_Type_commit(&MPI_HASH);
3234 
3235     TYPE2SEND * recvdata = new TYPE2SEND[totrecv];
3236     MPI_Alltoallv(senddata, sendcnt, sdispls, MPI_HASH, recvdata, recvcnt, rdispls, MPI_HASH, commGrid->GetWorld());
3237     // do not delete send buffers yet as we will use them to recv back the data
3238 
3239     std::set< std::pair<uint64_t, std::string>  > uniqsorted;
3240     for(IT i=0; i< totrecv; ++i)
3241     {
3242 	    auto locnull = std::find(recvdata[i].first.begin(), recvdata[i].first.end(), '\0'); // find the null character (or string::end)
3243 	    std::string strtmp(recvdata[i].first.begin(), locnull); // range constructor
3244 
3245 	    uniqsorted.insert(std::make_pair(recvdata[i].second, strtmp));
3246     }
3247     uint64_t uniqsize = uniqsorted.size();
3248 
3249 #ifdef COMBBLAS_DEBUG
3250     if(myrank == 0)
3251 	    std::cout << "out of " << totrecv << " vertices received, " << uniqsize << " were unique" << std::endl;
3252 #endif
3253     uint64_t sizeuntil = 0;
3254     totallength = 0;
3255     MPI_Exscan( &uniqsize, &sizeuntil, 1, MPIType<uint64_t>(), MPI_SUM, commGrid->GetWorld() );
3256     MPI_Allreduce(&uniqsize, &totallength, 1,  MPIType<uint64_t>(), MPI_SUM, commGrid->GetWorld());
3257     if(myrank == 0) sizeuntil = 0;  // because MPI_Exscan says the recvbuf in process 0 is undefined
3258 
3259     distmapper =  FullyDistVec<IT,STRASARRAY>(commGrid, totallength,STRASARRAY{});
3260 
3261     // invindex does not conform to FullyDistVec boundaries, otherwise its contents are essentially the same as distmapper
3262     KEYMAP invindex;	// KEYMAP is map<string, uint64_t>.
3263     uint64_t locindex = 0;
3264     std::vector< std::vector< IT > > locs_send(nprocs);
3265     std::vector< std::vector< std::string > > data_send(nprocs);
3266     int * map_scnt = new int[nprocs]();	// send counts for this map only (to no confuse with the other sendcnt)
3267     for(auto itr = uniqsorted.begin(); itr != uniqsorted.end(); ++itr)
3268     {
3269 	    uint64_t globalindex = sizeuntil + locindex;
3270 	    invindex.insert(std::make_pair(itr->second, globalindex));
3271 
3272 	    IT newlocid;
3273 	    int owner = distmapper.Owner(globalindex, newlocid);
3274 
3275 	    //if(myrank == 0)
3276 	    //    std::cout << "invindex received " << itr->second << " with global index " << globalindex << " to be owned by " << owner << " with index " << newlocid << std::endl;
3277 
3278 	    locs_send[owner].push_back(newlocid);
3279 	    data_send[owner].push_back(itr->second);
3280 	    map_scnt[owner]++;
3281 	    locindex++;
3282     }
3283     uniqsorted.clear();	// clear memory
3284 
3285 
3286     /* BEGIN: Redistributing the permutation vector to fit the FullyDistVec semantics */
3287     SpParHelper::ReDistributeToVector(map_scnt, locs_send, data_send, distmapper.arr, commGrid->GetWorld());   // map_scnt is deleted here
3288     /* END: Redistributing the permutation vector to fit the FullyDistVec semantics */
3289 
3290     for(IT i=0; i< totrecv; ++i)
3291     {
3292 	    auto locnull = std::find(recvdata[i].first.begin(), recvdata[i].first.end(), '\0');
3293 	    std::string searchstr(recvdata[i].first.begin(), locnull); // range constructor
3294 
3295 	    auto resp = invindex.find(searchstr); // recvdata[i] is of type pair< STRASARRAY, uint64_t>
3296 	    if (resp != invindex.end())
3297 	    {
3298 		recvdata[i].second = resp->second;	// now instead of random numbers, recvdata's second entry will be its new index
3299 	    }
3300 	    else
3301 		std::cout << "Assertion failed at proc " << myrank << ": the absence of the entry in invindex is unexpected!!!" << std::endl;
3302     }
3303     MPI_Alltoallv(recvdata, recvcnt, rdispls, MPI_HASH, senddata, sendcnt, sdispls, MPI_HASH, commGrid->GetWorld());
3304     DeleteAll(recvdata, sendcnt, recvcnt, sdispls, rdispls);
3305     MPI_Type_free(&MPI_HASH);
3306 
3307     // the following gets deleted here: allkeys
3308     return mpi_fh;
3309 }
3310 
3311 
3312 
3313 //! Handles all sorts of orderings as long as there are no duplicates
3314 //! Does not take matrix market banner (only tuples)
3315 //! Data can be load imbalanced and the vertex labels can be arbitrary strings
3316 //! Replaces ReadDistribute for imbalanced arbitrary input in tuples format
3317 template <class IT, class NT, class DER>
3318 template <typename _BinaryOperation>
ReadGeneralizedTuples(const std::string & filename,_BinaryOperation BinOp)3319 FullyDistVec<IT,std::array<char, MAXVERTNAME> > SpParMat< IT,NT,DER >::ReadGeneralizedTuples (const std::string & filename, _BinaryOperation BinOp)
3320 {
3321     int myrank = commGrid->GetRank();
3322     int nprocs = commGrid->GetSize();
3323     TYPE2SEND * senddata;
3324     IT totsend;
3325     uint64_t totallength;
3326     FullyDistVec<IT,STRASARRAY> distmapper(commGrid); // choice of array<char, MAXVERTNAME> over string = array is required to be a contiguous container and an aggregate
3327 
3328     MPI_File mpi_fh = TupleRead1stPassNExchange(filename, senddata, totsend, distmapper, totallength);
3329 
3330     typedef std::map<std::string, uint64_t> KEYMAP;
3331     KEYMAP ultimateperm;	// the ultimate permutation
3332     for(IT i=0; i< totsend; ++i)
3333     {
3334 	    auto locnull = std::find(senddata[i].first.begin(), senddata[i].first.end(), '\0');
3335 
3336 	    std::string searchstr(senddata[i].first.begin(), locnull);
3337 	    auto ret = ultimateperm.emplace(std::make_pair(searchstr, senddata[i].second));
3338 	    if(!ret.second)	// the second is the boolean that tells success
3339 	    {
3340 	        // remember, we only sent unique vertex ids in the first place so we are expecting unique values in return
3341 		std::cout << "the duplication in ultimateperm is unexpected!!!" << std::endl;
3342 	    }
3343     }
3344     delete [] senddata;
3345 
3346     // rename the data now, first reset file pointers
3347     MPI_Offset fpos, end_fpos;
3348     struct stat st;     // get file size
3349     if (stat(filename.c_str(), &st) == -1)
3350     {
3351         MPI_Abort(MPI_COMM_WORLD, NOFILE);
3352     }
3353     int64_t file_size = st.st_size;
3354 
3355     fpos = myrank * file_size / nprocs;
3356 
3357     if(myrank != (nprocs-1)) end_fpos = (myrank + 1) * file_size / nprocs;
3358     else end_fpos = file_size;
3359 
3360     typedef typename DER::LocalIT LIT;
3361     std::vector<LIT> rows;
3362     std::vector<LIT> cols;
3363     std::vector<NT> vals;
3364 
3365     std::vector<std::string> lines;
3366     bool finished = SpParHelper::FetchBatch(mpi_fh, fpos, end_fpos, true, lines, myrank);
3367     int64_t entriesread = lines.size();
3368 
3369     SpHelper::ProcessStrLinesNPermute(rows, cols, vals, lines, ultimateperm);
3370 
3371     while(!finished)
3372     {
3373         finished = SpParHelper::FetchBatch(mpi_fh, fpos, end_fpos, false, lines, myrank);
3374         entriesread += lines.size();
3375     	SpHelper::ProcessStrLinesNPermute(rows, cols, vals, lines, ultimateperm);
3376     }
3377     int64_t allentriesread;
3378     MPI_Reduce(&entriesread, &allentriesread, 1, MPIType<int64_t>(), MPI_SUM, 0, commGrid->commWorld);
3379 #ifdef COMBBLAS_DEBUG
3380     if(myrank == 0)
3381         std::cout << "Second reading finished. Total number of entries read across all processors is " << allentriesread << std::endl;
3382 #endif
3383 
3384     MPI_File_close(&mpi_fh);
3385     std::vector< std::vector < std::tuple<LIT,LIT,NT> > > data(nprocs);
3386 
3387     LIT locsize = rows.size();   // remember: locsize != entriesread (unless the matrix is unsymmetric)
3388     for(LIT i=0; i<locsize; ++i)
3389     {
3390         LIT lrow, lcol;
3391         int owner = Owner(totallength, totallength, rows[i], cols[i], lrow, lcol);
3392         data[owner].push_back(std::make_tuple(lrow,lcol,vals[i]));
3393     }
3394     std::vector<LIT>().swap(rows);
3395     std::vector<LIT>().swap(cols);
3396     std::vector<NT>().swap(vals);
3397 
3398 #ifdef COMBBLAS_DEBUG
3399     if(myrank == 0)
3400         std::cout << "Packing to recipients finished, about to send..." << std::endl;
3401 #endif
3402 
3403     if(spSeq)   delete spSeq;
3404     SparseCommon(data, locsize, totallength, totallength, BinOp);
3405     // PrintInfo();
3406     // distmapper.ParallelWrite("distmapper.mtx", 1, CharArraySaveHandler());
3407     return distmapper;
3408 }
3409 
3410 
3411 
3412 //! Handles all sorts of orderings, even duplicates (what happens to them is determined by BinOp)
3413 //! Requires proper matrix market banner at the moment
3414 //! Replaces ReadDistribute for properly load balanced input in matrix market format
3415 template <class IT, class NT, class DER>
3416 template <typename _BinaryOperation>
ParallelReadMM(const std::string & filename,bool onebased,_BinaryOperation BinOp)3417 void SpParMat< IT,NT,DER >::ParallelReadMM (const std::string & filename, bool onebased, _BinaryOperation BinOp)
3418 {
3419     int32_t type = -1;
3420     int32_t symmetric = 0;
3421     int64_t nrows, ncols, nonzeros;
3422     int64_t linesread = 0;
3423 
3424     FILE *f;
3425     int myrank = commGrid->GetRank();
3426     int nprocs = commGrid->GetSize();
3427     if(myrank == 0)
3428     {
3429         MM_typecode matcode;
3430         if ((f = fopen(filename.c_str(), "r")) == NULL)
3431         {
3432             printf("COMBBLAS: Matrix-market file %s can not be found\n", filename.c_str());
3433             MPI_Abort(MPI_COMM_WORLD, NOFILE);
3434         }
3435         if (mm_read_banner(f, &matcode) != 0)
3436         {
3437             printf("Could not process Matrix Market banner.\n");
3438             exit(1);
3439         }
3440         linesread++;
3441 
3442         if (mm_is_complex(matcode))
3443         {
3444             printf("Sorry, this application does not support complext types");
3445             printf("Market Market type: [%s]\n", mm_typecode_to_str(matcode));
3446         }
3447         else if(mm_is_real(matcode))
3448         {
3449             std::cout << "Matrix is Float" << std::endl;
3450             type = 0;
3451         }
3452         else if(mm_is_integer(matcode))
3453         {
3454             std::cout << "Matrix is Integer" << std::endl;
3455             type = 1;
3456         }
3457         else if(mm_is_pattern(matcode))
3458         {
3459             std::cout << "Matrix is Boolean" << std::endl;
3460             type = 2;
3461         }
3462         if(mm_is_symmetric(matcode) || mm_is_hermitian(matcode))
3463         {
3464             std::cout << "Matrix is symmetric" << std::endl;
3465             symmetric = 1;
3466         }
3467         int ret_code;
3468         if ((ret_code = mm_read_mtx_crd_size(f, &nrows, &ncols, &nonzeros, &linesread)) !=0)  // ABAB: mm_read_mtx_crd_size made 64-bit friendly
3469             exit(1);
3470 
3471         std::cout << "Total number of nonzeros expected across all processors is " << nonzeros << std::endl;
3472 
3473     }
3474     MPI_Bcast(&type, 1, MPI_INT, 0, commGrid->commWorld);
3475     MPI_Bcast(&symmetric, 1, MPI_INT, 0, commGrid->commWorld);
3476     MPI_Bcast(&nrows, 1, MPIType<int64_t>(), 0, commGrid->commWorld);
3477     MPI_Bcast(&ncols, 1, MPIType<int64_t>(), 0, commGrid->commWorld);
3478     MPI_Bcast(&nonzeros, 1, MPIType<int64_t>(), 0, commGrid->commWorld);
3479 
3480     // Use fseek again to go backwards two bytes and check that byte with fgetc
3481     struct stat st;     // get file size
3482     if (stat(filename.c_str(), &st) == -1)
3483     {
3484         MPI_Abort(MPI_COMM_WORLD, NOFILE);
3485     }
3486     int64_t file_size = st.st_size;
3487     MPI_Offset fpos, end_fpos, endofheader;
3488     if(commGrid->GetRank() == 0)    // the offset needs to be for this rank
3489     {
3490         std::cout << "File is " << file_size << " bytes" << std::endl;
3491 	fpos = ftell(f);
3492 	endofheader =  fpos;
3493     	MPI_Bcast(&endofheader, 1, MPIType<MPI_Offset>(), 0, commGrid->commWorld);
3494         fclose(f);
3495     }
3496     else
3497     {
3498     	MPI_Bcast(&endofheader, 1, MPIType<MPI_Offset>(), 0, commGrid->commWorld);  // receive the file loc at the end of header
3499 	fpos = endofheader + myrank * (file_size-endofheader) / nprocs;
3500     }
3501     if(myrank != (nprocs-1)) end_fpos = endofheader + (myrank + 1) * (file_size-endofheader) / nprocs;
3502     else end_fpos = file_size;
3503 
3504     MPI_File mpi_fh;
3505     MPI_File_open (commGrid->commWorld, const_cast<char*>(filename.c_str()), MPI_MODE_RDONLY, MPI_INFO_NULL, &mpi_fh);
3506 
3507 
3508     typedef typename DER::LocalIT LIT;
3509     std::vector<LIT> rows;
3510     std::vector<LIT> cols;
3511     std::vector<NT> vals;
3512 
3513     std::vector<std::string> lines;
3514     bool finished = SpParHelper::FetchBatch(mpi_fh, fpos, end_fpos, true, lines, myrank);
3515     int64_t entriesread = lines.size();
3516     SpHelper::ProcessLines(rows, cols, vals, lines, symmetric, type, onebased);
3517     MPI_Barrier(commGrid->commWorld);
3518 
3519     while(!finished)
3520     {
3521         finished = SpParHelper::FetchBatch(mpi_fh, fpos, end_fpos, false, lines, myrank);
3522         entriesread += lines.size();
3523         SpHelper::ProcessLines(rows, cols, vals, lines, symmetric, type, onebased);
3524     }
3525     int64_t allentriesread;
3526     MPI_Reduce(&entriesread, &allentriesread, 1, MPIType<int64_t>(), MPI_SUM, 0, commGrid->commWorld);
3527 #ifdef COMBBLAS_DEBUG
3528     if(myrank == 0)
3529         std::cout << "Reading finished. Total number of entries read across all processors is " << allentriesread << std::endl;
3530 #endif
3531 
3532     std::vector< std::vector < std::tuple<LIT,LIT,NT> > > data(nprocs);
3533 
3534     LIT locsize = rows.size();   // remember: locsize != entriesread (unless the matrix is unsymmetric)
3535     for(LIT i=0; i<locsize; ++i)
3536     {
3537         LIT lrow, lcol;
3538         int owner = Owner(nrows, ncols, rows[i], cols[i], lrow, lcol);
3539         data[owner].push_back(std::make_tuple(lrow,lcol,vals[i]));
3540     }
3541     std::vector<LIT>().swap(rows);
3542     std::vector<LIT>().swap(cols);
3543     std::vector<NT>().swap(vals);
3544 
3545 #ifdef COMBBLAS_DEBUG
3546     if(myrank == 0)
3547         std::cout << "Packing to recepients finished, about to send..." << std::endl;
3548 #endif
3549 
3550     if(spSeq)   delete spSeq;
3551     SparseCommon(data, locsize, nrows, ncols, BinOp);
3552 }
3553 
3554 
3555 //! Handles all sorts of orderings as long as there are no duplicates
3556 //! May perform better when the data is already reverse column-sorted (i.e. in decreasing order)
3557 //! if nonum is true, then numerics are not supplied and they are assumed to be all 1's
3558 template <class IT, class NT, class DER>
3559 template <class HANDLER>
ReadDistribute(const std::string & filename,int master,bool nonum,HANDLER handler,bool transpose,bool pario)3560 void SpParMat< IT,NT,DER >::ReadDistribute (const std::string & filename, int master, bool nonum, HANDLER handler, bool transpose, bool pario)
3561 {
3562 #ifdef TAU_PROFILE
3563    	TAU_PROFILE_TIMER(rdtimer, "ReadDistribute", "void SpParMat::ReadDistribute (const string & , int, bool, HANDLER, bool)", TAU_DEFAULT);
3564    	TAU_PROFILE_START(rdtimer);
3565 #endif
3566 
3567 	std::ifstream infile;
3568 	FILE * binfile = NULL;	// points to "past header" if the file is binary
3569 	int seeklength = 0;
3570 	HeaderInfo hfile;
3571 	if(commGrid->GetRank() == master)	// 1 processor
3572 	{
3573 		hfile = ParseHeader(filename, binfile, seeklength);
3574 	}
3575 	MPI_Bcast(&seeklength, 1, MPI_INT, master, commGrid->commWorld);
3576 
3577 	IT total_m, total_n, total_nnz;
3578 	IT m_perproc = 0, n_perproc = 0;
3579 
3580 	int colneighs = commGrid->GetGridRows();	// number of neighbors along this processor column (including oneself)
3581 	int rowneighs = commGrid->GetGridCols();	// number of neighbors along this processor row (including oneself)
3582 
3583 	IT buffpercolneigh = MEMORYINBYTES / (colneighs * (2 * sizeof(IT) + sizeof(NT)));
3584 	IT buffperrowneigh = MEMORYINBYTES / (rowneighs * (2 * sizeof(IT) + sizeof(NT)));
3585 	if(pario)
3586 	{
3587 		// since all colneighs will be reading the data at the same time
3588 		// chances are they might all read the data that should go to one
3589 		// in that case buffperrowneigh > colneighs * buffpercolneigh
3590 		// in order not to overflow
3591 		buffpercolneigh /= colneighs;
3592 		if(seeklength == 0)
3593 			SpParHelper::Print("COMBBLAS: Parallel I/O requested but binary header is corrupted\n", commGrid->GetWorld());
3594 	}
3595 
3596 	// make sure that buffperrowneigh >= buffpercolneigh to cover for this patological case:
3597 	//   	-- all data received by a given column head (by vertical communication) are headed to a single processor along the row
3598 	//   	-- then making sure buffperrowneigh >= buffpercolneigh guarantees that the horizontal buffer will never overflow
3599 	buffperrowneigh = std::max(buffperrowneigh, buffpercolneigh);
3600 	if(std::max(buffpercolneigh * colneighs, buffperrowneigh * rowneighs) > std::numeric_limits<int>::max())
3601 	{
3602 		SpParHelper::Print("COMBBLAS: MPI doesn't support sending int64_t send/recv counts or displacements\n", commGrid->GetWorld());
3603 	}
3604 
3605 	int * cdispls = new int[colneighs];
3606 	for (IT i=0; i<colneighs; ++i)  cdispls[i] = i*buffpercolneigh;
3607 	int * rdispls = new int[rowneighs];
3608 	for (IT i=0; i<rowneighs; ++i)  rdispls[i] = i*buffperrowneigh;
3609 
3610 	int *ccurptrs = NULL, *rcurptrs = NULL;
3611 	int recvcount = 0;
3612 	IT * rows = NULL;
3613 	IT * cols = NULL;
3614 	NT * vals = NULL;
3615 
3616 	// Note: all other column heads that initiate the horizontal communication has the same "rankinrow" with the master
3617 	int rankincol = commGrid->GetRankInProcCol(master);	// get master's rank in its processor column
3618 	int rankinrow = commGrid->GetRankInProcRow(master);
3619 	std::vector< std::tuple<IT, IT, NT> > localtuples;
3620 
3621 	if(commGrid->GetRank() == master)	// 1 processor
3622 	{
3623 		if( !hfile.fileexists )
3624 		{
3625 			SpParHelper::Print( "COMBBLAS: Input file doesn't exist\n", commGrid->GetWorld());
3626 			total_n = 0; total_m = 0;
3627 			BcastEssentials(commGrid->commWorld, total_m, total_n, total_nnz, master);
3628 			return;
3629 		}
3630 		if (hfile.headerexists && hfile.format == 1)
3631 		{
3632 			SpParHelper::Print("COMBBLAS: Ascii input with binary headers is not supported\n", commGrid->GetWorld());
3633 			total_n = 0; total_m = 0;
3634 			BcastEssentials(commGrid->commWorld, total_m, total_n, total_nnz, master);
3635 			return;
3636 		}
3637 		if ( !hfile.headerexists )	// no header - ascii file (at this point, file exists)
3638 		{
3639 			infile.open(filename.c_str());
3640 			char comment[256];
3641 			infile.getline(comment,256);
3642 			while(comment[0] == '%')
3643 			{
3644 				infile.getline(comment,256);
3645 			}
3646 			std::stringstream ss;
3647 			ss << std::string(comment);
3648 			ss >> total_m >> total_n >> total_nnz;
3649 			if(pario)
3650 			{
3651 				SpParHelper::Print("COMBBLAS: Trying to read binary headerless file in parallel, aborting\n", commGrid->GetWorld());
3652 				total_n = 0; total_m = 0;
3653 				BcastEssentials(commGrid->commWorld, total_m, total_n, total_nnz, master);
3654 				return;
3655 			}
3656 		}
3657 		else // hfile.headerexists && hfile.format == 0
3658 		{
3659 			total_m = hfile.m;
3660 			total_n = hfile.n;
3661 			total_nnz = hfile.nnz;
3662 		}
3663 		m_perproc = total_m / colneighs;
3664 		n_perproc = total_n / rowneighs;
3665 		BcastEssentials(commGrid->commWorld, total_m, total_n, total_nnz, master);
3666 		AllocateSetBuffers(rows, cols, vals,  rcurptrs, ccurptrs, rowneighs, colneighs, buffpercolneigh);
3667 
3668 		if(seeklength > 0 && pario)   // sqrt(p) processors also do parallel binary i/o
3669 		{
3670 			IT entriestoread =  total_nnz / colneighs;
3671 			#ifdef IODEBUG
3672       std::ofstream oput;
3673 			commGrid->OpenDebugFile("Read", oput);
3674 			oput << "Total nnz: " << total_nnz << " entries to read: " << entriestoread << std::endl;
3675 			oput.close();
3676 			#endif
3677 			ReadAllMine(binfile, rows, cols, vals, localtuples, rcurptrs, ccurptrs, rdispls, cdispls, m_perproc, n_perproc,
3678 				rowneighs, colneighs, buffperrowneigh, buffpercolneigh, entriestoread, handler, rankinrow, transpose);
3679 		}
3680 		else	// only this (master) is doing I/O (text or binary)
3681 		{
3682 			IT temprow, tempcol;
3683 			NT tempval;
3684 			IT ntrow = 0, ntcol = 0; // not transposed row and column index
3685 			char line[1024];
3686 			bool nonumline = nonum;
3687 			IT cnz = 0;
3688 			for(; cnz < total_nnz; ++cnz)
3689 			{
3690 				int colrec;
3691 				size_t commonindex;
3692 				std::stringstream linestream;
3693 				if( (!hfile.headerexists) && (!infile.eof()))
3694 				{
3695 					// read one line at a time so that missing numerical values can be detected
3696 					infile.getline(line, 1024);
3697 					linestream << line;
3698 					linestream >> temprow >> tempcol;
3699 					if (!nonum)
3700 					{
3701 						// see if this line has a value
3702 						linestream >> std::skipws;
3703 						nonumline = linestream.eof();
3704 					}
3705 					--temprow;	// file is 1-based where C-arrays are 0-based
3706 					--tempcol;
3707 					ntrow = temprow;
3708 					ntcol = tempcol;
3709 				}
3710 				else if(hfile.headerexists && (!feof(binfile)) )
3711 				{
3712 					handler.binaryfill(binfile, temprow , tempcol, tempval);
3713 				}
3714 				if (transpose)
3715 				{
3716 					IT swap = temprow;
3717 					temprow = tempcol;
3718 					tempcol = swap;
3719 				}
3720 				colrec = std::min(static_cast<int>(temprow / m_perproc), colneighs-1);	// precipient processor along the column
3721 				commonindex = colrec * buffpercolneigh + ccurptrs[colrec];
3722 
3723 				rows[ commonindex ] = temprow;
3724 				cols[ commonindex ] = tempcol;
3725 				if( (!hfile.headerexists) && (!infile.eof()))
3726 				{
3727 					vals[ commonindex ] = nonumline ? handler.getNoNum(ntrow, ntcol) : handler.read(linestream, ntrow, ntcol); //tempval;
3728 				}
3729 				else if(hfile.headerexists && (!feof(binfile)) )
3730 				{
3731 					vals[ commonindex ] = tempval;
3732 				}
3733 				++ (ccurptrs[colrec]);
3734 				if(ccurptrs[colrec] == buffpercolneigh || (cnz == (total_nnz-1)) )		// one buffer is full, or file is done !
3735 				{
3736 					MPI_Scatter(ccurptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, rankincol, commGrid->colWorld); // first, send the receive counts
3737 
3738 					// generate space for own recv data ... (use arrays because vector<bool> is cripled, if NT=bool)
3739 					IT * temprows = new IT[recvcount];
3740 					IT * tempcols = new IT[recvcount];
3741 					NT * tempvals = new NT[recvcount];
3742 
3743 					// then, send all buffers that to their recipients ...
3744 					MPI_Scatterv(rows, ccurptrs, cdispls, MPIType<IT>(), temprows, recvcount,  MPIType<IT>(), rankincol, commGrid->colWorld);
3745 					MPI_Scatterv(cols, ccurptrs, cdispls, MPIType<IT>(), tempcols, recvcount,  MPIType<IT>(), rankincol, commGrid->colWorld);
3746 					MPI_Scatterv(vals, ccurptrs, cdispls, MPIType<NT>(), tempvals, recvcount,  MPIType<NT>(), rankincol, commGrid->colWorld);
3747 
3748 					std::fill_n(ccurptrs, colneighs, 0);  				// finally, reset current pointers !
3749 					DeleteAll(rows, cols, vals);
3750 
3751 					HorizontalSend(rows, cols, vals,temprows, tempcols, tempvals, localtuples, rcurptrs, rdispls,
3752 							buffperrowneigh, rowneighs, recvcount, m_perproc, n_perproc, rankinrow);
3753 
3754 					if( cnz != (total_nnz-1) )	// otherwise the loop will exit with noone to claim memory back
3755 					{
3756 						// reuse these buffers for the next vertical communication
3757 						rows = new IT [ buffpercolneigh * colneighs ];
3758 						cols = new IT [ buffpercolneigh * colneighs ];
3759 						vals = new NT [ buffpercolneigh * colneighs ];
3760 					}
3761 				} // end_if for "send buffer is full" case
3762 			} // end_for for "cnz < entriestoread" case
3763 			assert (cnz == total_nnz);
3764 
3765 			// Signal the end of file to other processors along the column
3766 			std::fill_n(ccurptrs, colneighs, std::numeric_limits<int>::max());
3767 			MPI_Scatter(ccurptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, rankincol, commGrid->colWorld);
3768 
3769 			// And along the row ...
3770 			std::fill_n(rcurptrs, rowneighs, std::numeric_limits<int>::max());
3771 			MPI_Scatter(rcurptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, rankinrow, commGrid->rowWorld);
3772 		}	// end of "else" (only one processor reads) block
3773 	}	// end_if for "master processor" case
3774 	else if( commGrid->OnSameProcCol(master) ) 	// (r-1) processors
3775 	{
3776 		BcastEssentials(commGrid->commWorld, total_m, total_n, total_nnz, master);
3777 		m_perproc = total_m / colneighs;
3778 		n_perproc = total_n / rowneighs;
3779 
3780 		if(seeklength > 0 && pario)   // these processors also do parallel binary i/o
3781 		{
3782 			binfile = fopen(filename.c_str(), "rb");
3783 			IT entrysize = handler.entrylength();
3784 			int myrankincol = commGrid->GetRankInProcCol();
3785 			IT perreader = total_nnz / colneighs;
3786 			IT read_offset = entrysize * static_cast<IT>(myrankincol) * perreader + seeklength;
3787 			IT entriestoread = perreader;
3788 			if (myrankincol == colneighs-1)
3789 				entriestoread = total_nnz - static_cast<IT>(myrankincol) * perreader;
3790 			fseek(binfile, read_offset, SEEK_SET);
3791 
3792 			#ifdef IODEBUG
3793       std::ofstream oput;
3794 			commGrid->OpenDebugFile("Read", oput);
3795 			oput << "Total nnz: " << total_nnz << " OFFSET : " << read_offset << " entries to read: " << entriestoread << std::endl;
3796 			oput.close();
3797 			#endif
3798 
3799 			AllocateSetBuffers(rows, cols, vals,  rcurptrs, ccurptrs, rowneighs, colneighs, buffpercolneigh);
3800 			ReadAllMine(binfile, rows, cols, vals, localtuples, rcurptrs, ccurptrs, rdispls, cdispls, m_perproc, n_perproc,
3801 				rowneighs, colneighs, buffperrowneigh, buffpercolneigh, entriestoread, handler, rankinrow, transpose);
3802 		}
3803 		else // only master does the I/O
3804 		{
3805 			while(total_n > 0 || total_m > 0)	// otherwise input file does not exist !
3806 			{
3807 				// void MPI::Comm::Scatterv(const void* sendbuf, const int sendcounts[], const int displs[], const MPI::Datatype& sendtype,
3808 				//				void* recvbuf, int recvcount, const MPI::Datatype & recvtype, int root) const
3809 				// The outcome is as if the root executed n send operations,
3810 				//	MPI_Send(sendbuf + displs[i] * extent(sendtype), sendcounts[i], sendtype, i, ...)
3811 				// and each process executed a receive,
3812 				// 	MPI_Recv(recvbuf, recvcount, recvtype, root, ...)
3813 				// The send buffer is ignored for all nonroot processes.
3814 
3815 				MPI_Scatter(ccurptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, rankincol, commGrid->colWorld);                       // first receive the receive counts ...
3816 				if( recvcount == std::numeric_limits<int>::max()) break;
3817 
3818 				// create space for incoming data ...
3819 				IT * temprows = new IT[recvcount];
3820 				IT * tempcols = new IT[recvcount];
3821 				NT * tempvals = new NT[recvcount];
3822 
3823 				// receive actual data ... (first 4 arguments are ignored in the receiver side)
3824 				MPI_Scatterv(rows, ccurptrs, cdispls, MPIType<IT>(), temprows, recvcount,  MPIType<IT>(), rankincol, commGrid->colWorld);
3825 				MPI_Scatterv(cols, ccurptrs, cdispls, MPIType<IT>(), tempcols, recvcount,  MPIType<IT>(), rankincol, commGrid->colWorld);
3826 				MPI_Scatterv(vals, ccurptrs, cdispls, MPIType<NT>(), tempvals, recvcount,  MPIType<NT>(), rankincol, commGrid->colWorld);
3827 
3828 				// now, send the data along the horizontal
3829 				rcurptrs = new int[rowneighs];
3830 				std::fill_n(rcurptrs, rowneighs, 0);
3831 
3832 				// HorizontalSend frees the memory of temp_xxx arrays and then creates and frees memory of all the six arrays itself
3833 				HorizontalSend(rows, cols, vals,temprows, tempcols, tempvals, localtuples, rcurptrs, rdispls,
3834 					buffperrowneigh, rowneighs, recvcount, m_perproc, n_perproc, rankinrow);
3835 			}
3836 		}
3837 
3838 		// Signal the end of file to other processors along the row
3839 		std::fill_n(rcurptrs, rowneighs, std::numeric_limits<int>::max());
3840 		MPI_Scatter(rcurptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, rankinrow, commGrid->rowWorld);
3841 		delete [] rcurptrs;
3842 	}
3843 	else		// r * (s-1) processors that only participate in the horizontal communication step
3844 	{
3845 		BcastEssentials(commGrid->commWorld, total_m, total_n, total_nnz, master);
3846 		m_perproc = total_m / colneighs;
3847 		n_perproc = total_n / rowneighs;
3848 		while(total_n > 0 || total_m > 0)	// otherwise input file does not exist !
3849 		{
3850 			// receive the receive count
3851 			MPI_Scatter(rcurptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, rankinrow, commGrid->rowWorld);
3852 			if( recvcount == std::numeric_limits<int>::max())
3853 				break;
3854 
3855 			// create space for incoming data ...
3856 			IT * temprows = new IT[recvcount];
3857 			IT * tempcols = new IT[recvcount];
3858 			NT * tempvals = new NT[recvcount];
3859 
3860 			MPI_Scatterv(rows, rcurptrs, rdispls, MPIType<IT>(), temprows, recvcount,  MPIType<IT>(), rankinrow, commGrid->rowWorld);
3861 			MPI_Scatterv(cols, rcurptrs, rdispls, MPIType<IT>(), tempcols, recvcount,  MPIType<IT>(), rankinrow, commGrid->rowWorld);
3862 			MPI_Scatterv(vals, rcurptrs, rdispls, MPIType<NT>(), tempvals, recvcount,  MPIType<NT>(), rankinrow, commGrid->rowWorld);
3863 
3864 			// now push what is ours to tuples
3865 			IT moffset = commGrid->myprocrow * m_perproc;
3866 			IT noffset = commGrid->myproccol * n_perproc;
3867 
3868 			for(IT i=0; i< recvcount; ++i)
3869 			{
3870 				localtuples.push_back( 	std::make_tuple(temprows[i]-moffset, tempcols[i]-noffset, tempvals[i]) );
3871 			}
3872 			DeleteAll(temprows,tempcols,tempvals);
3873 		}
3874 	}
3875 	DeleteAll(cdispls, rdispls);
3876 	std::tuple<IT,IT,NT> * arrtuples = new std::tuple<IT,IT,NT>[localtuples.size()];  // the vector will go out of scope, make it stick !
3877   std::copy(localtuples.begin(), localtuples.end(), arrtuples);
3878 
3879  	IT localm = (commGrid->myprocrow != (commGrid->grrows-1))? m_perproc: (total_m - (m_perproc * (commGrid->grrows-1)));
3880  	IT localn = (commGrid->myproccol != (commGrid->grcols-1))? n_perproc: (total_n - (n_perproc * (commGrid->grcols-1)));
3881 	spSeq->Create( localtuples.size(), localm, localn, arrtuples);		// the deletion of arrtuples[] is handled by SpMat::Create
3882 
3883 #ifdef TAU_PROFILE
3884    	TAU_PROFILE_STOP(rdtimer);
3885 #endif
3886 	return;
3887 }
3888 
3889 template <class IT, class NT, class DER>
AllocateSetBuffers(IT * & rows,IT * & cols,NT * & vals,int * & rcurptrs,int * & ccurptrs,int rowneighs,int colneighs,IT buffpercolneigh)3890 void SpParMat<IT,NT,DER>::AllocateSetBuffers(IT * & rows, IT * & cols, NT * & vals,  int * & rcurptrs, int * & ccurptrs, int rowneighs, int colneighs, IT buffpercolneigh)
3891 {
3892 	// allocate buffers on the heap as stack space is usually limited
3893 	rows = new IT [ buffpercolneigh * colneighs ];
3894 	cols = new IT [ buffpercolneigh * colneighs ];
3895 	vals = new NT [ buffpercolneigh * colneighs ];
3896 
3897 	ccurptrs = new int[colneighs];
3898 	rcurptrs = new int[rowneighs];
3899 	std::fill_n(ccurptrs, colneighs, 0);	// fill with zero
3900 	std::fill_n(rcurptrs, rowneighs, 0);
3901 }
3902 
3903 template <class IT, class NT, class DER>
BcastEssentials(MPI_Comm & world,IT & total_m,IT & total_n,IT & total_nnz,int master)3904 void SpParMat<IT,NT,DER>::BcastEssentials(MPI_Comm & world, IT & total_m, IT & total_n, IT & total_nnz, int master)
3905 {
3906 	MPI_Bcast(&total_m, 1, MPIType<IT>(), master, world);
3907 	MPI_Bcast(&total_n, 1, MPIType<IT>(), master, world);
3908 	MPI_Bcast(&total_nnz, 1, MPIType<IT>(), master, world);
3909 }
3910 
3911 /*
3912  * @post {rows, cols, vals are pre-allocated on the heap after this call}
3913  * @post {ccurptrs are set to zero; so that if another call is made to this function without modifying ccurptrs, no data will be send from this procesor}
3914  */
3915 template <class IT, class NT, class DER>
VerticalSend(IT * & rows,IT * & cols,NT * & vals,std::vector<std::tuple<IT,IT,NT>> & localtuples,int * rcurptrs,int * ccurptrs,int * rdispls,int * cdispls,IT m_perproc,IT n_perproc,int rowneighs,int colneighs,IT buffperrowneigh,IT buffpercolneigh,int rankinrow)3916 void SpParMat<IT,NT,DER>::VerticalSend(IT * & rows, IT * & cols, NT * & vals, std::vector< std::tuple<IT,IT,NT> > & localtuples, int * rcurptrs, int * ccurptrs, int * rdispls, int * cdispls,
3917 				  IT m_perproc, IT n_perproc, int rowneighs, int colneighs, IT buffperrowneigh, IT buffpercolneigh, int rankinrow)
3918 {
3919 	// first, send/recv the counts ...
3920 	int * colrecvdispls = new int[colneighs];
3921 	int * colrecvcounts = new int[colneighs];
3922 	MPI_Alltoall(ccurptrs, 1, MPI_INT, colrecvcounts, 1, MPI_INT, commGrid->colWorld);      // share the request counts
3923 	int totrecv = std::accumulate(colrecvcounts,colrecvcounts+colneighs,0);
3924 	colrecvdispls[0] = 0; 		// receive displacements are exact whereas send displacements have slack
3925 	for(int i=0; i<colneighs-1; ++i)
3926 		colrecvdispls[i+1] = colrecvdispls[i] + colrecvcounts[i];
3927 
3928 	// generate space for own recv data ... (use arrays because vector<bool> is cripled, if NT=bool)
3929 	IT * temprows = new IT[totrecv];
3930 	IT * tempcols = new IT[totrecv];
3931 	NT * tempvals = new NT[totrecv];
3932 
3933 	// then, exchange all buffers that to their recipients ...
3934 	MPI_Alltoallv(rows, ccurptrs, cdispls, MPIType<IT>(), temprows, colrecvcounts, colrecvdispls, MPIType<IT>(), commGrid->colWorld);
3935 	MPI_Alltoallv(cols, ccurptrs, cdispls, MPIType<IT>(), tempcols, colrecvcounts, colrecvdispls, MPIType<IT>(), commGrid->colWorld);
3936 	MPI_Alltoallv(vals, ccurptrs, cdispls, MPIType<NT>(), tempvals, colrecvcounts, colrecvdispls, MPIType<NT>(), commGrid->colWorld);
3937 
3938 	// finally, reset current pointers !
3939 	std::fill_n(ccurptrs, colneighs, 0);
3940 	DeleteAll(colrecvdispls, colrecvcounts);
3941 	DeleteAll(rows, cols, vals);
3942 
3943 	// rcurptrs/rdispls are zero initialized scratch space
3944 	HorizontalSend(rows, cols, vals,temprows, tempcols, tempvals, localtuples, rcurptrs, rdispls, buffperrowneigh, rowneighs, totrecv, m_perproc, n_perproc, rankinrow);
3945 
3946 	// reuse these buffers for the next vertical communication
3947 	rows = new IT [ buffpercolneigh * colneighs ];
3948 	cols = new IT [ buffpercolneigh * colneighs ];
3949 	vals = new NT [ buffpercolneigh * colneighs ];
3950 }
3951 
3952 
3953 /**
3954  * Private subroutine of ReadDistribute.
3955  * Executed by p_r processors on the first processor column.
3956  * @pre {rows, cols, vals are pre-allocated on the heap before this call}
3957  * @param[in] rankinrow {row head's rank in its processor row - determines the scatter person}
3958  */
3959 template <class IT, class NT, class DER>
3960 template <class HANDLER>
ReadAllMine(FILE * binfile,IT * & rows,IT * & cols,NT * & vals,std::vector<std::tuple<IT,IT,NT>> & localtuples,int * rcurptrs,int * ccurptrs,int * rdispls,int * cdispls,IT m_perproc,IT n_perproc,int rowneighs,int colneighs,IT buffperrowneigh,IT buffpercolneigh,IT entriestoread,HANDLER handler,int rankinrow,bool transpose)3961 void SpParMat<IT,NT,DER>::ReadAllMine(FILE * binfile, IT * & rows, IT * & cols, NT * & vals, std::vector< std::tuple<IT,IT,NT> > & localtuples, int * rcurptrs, int * ccurptrs, int * rdispls, int * cdispls,
3962 		IT m_perproc, IT n_perproc, int rowneighs, int colneighs, IT buffperrowneigh, IT buffpercolneigh, IT entriestoread, HANDLER handler, int rankinrow, bool transpose)
3963 {
3964 	assert(entriestoread != 0);
3965 	IT cnz = 0;
3966 	IT temprow, tempcol;
3967 	NT tempval;
3968 	int finishedglobal = 1;
3969 	while(cnz < entriestoread && !feof(binfile))	// this loop will execute at least once
3970 	{
3971 		handler.binaryfill(binfile, temprow , tempcol, tempval);
3972 		if (transpose)
3973 		{
3974 			IT swap = temprow;
3975 			temprow = tempcol;
3976 			tempcol = swap;
3977 		}
3978 		int colrec = std::min(static_cast<int>(temprow / m_perproc), colneighs-1);	// precipient processor along the column
3979 		size_t commonindex = colrec * buffpercolneigh + ccurptrs[colrec];
3980 		rows[ commonindex ] = temprow;
3981 		cols[ commonindex ] = tempcol;
3982 		vals[ commonindex ] = tempval;
3983 		++ (ccurptrs[colrec]);
3984 		if(ccurptrs[colrec] == buffpercolneigh || (cnz == (entriestoread-1)) )		// one buffer is full, or this processor's share is done !
3985 		{
3986 			#ifdef IODEBUG
3987       std::ofstream oput;
3988 			commGrid->OpenDebugFile("Read", oput);
3989 			oput << "To column neighbors: ";
3990       std::copy(ccurptrs, ccurptrs+colneighs, std::ostream_iterator<int>(oput, " ")); oput << std::endl;
3991 			oput.close();
3992 			#endif
3993 
3994 			VerticalSend(rows, cols, vals, localtuples, rcurptrs, ccurptrs, rdispls, cdispls, m_perproc, n_perproc,
3995 					rowneighs, colneighs, buffperrowneigh, buffpercolneigh, rankinrow);
3996 
3997 			if(cnz == (entriestoread-1))	// last execution of the outer loop
3998 			{
3999 				int finishedlocal = 1;	// I am done, but let me check others
4000 				MPI_Allreduce( &finishedlocal, &finishedglobal, 1, MPI_INT, MPI_BAND, commGrid->colWorld);
4001 				while(!finishedglobal)
4002 				{
4003 					#ifdef DEBUG
4004           std::ofstream oput;
4005 					commGrid->OpenDebugFile("Read", oput);
4006 					oput << "To column neighbors: ";
4007           std::copy(ccurptrs, ccurptrs+colneighs, std::ostream_iterator<int>(oput, " ")); oput << std::endl;
4008 					oput.close();
4009 					#endif
4010 
4011 					// postcondition of VerticalSend: ccurptrs are set to zero
4012 					// if another call is made to this function without modifying ccurptrs, no data will be send from this procesor
4013 					VerticalSend(rows, cols, vals, localtuples, rcurptrs, ccurptrs, rdispls, cdispls, m_perproc, n_perproc,
4014 						rowneighs, colneighs, buffperrowneigh, buffpercolneigh, rankinrow);
4015 
4016 					MPI_Allreduce( &finishedlocal, &finishedglobal, 1, MPI_INT, MPI_BAND, commGrid->colWorld);
4017 				}
4018 			}
4019 			else // the other loop will continue executing
4020 			{
4021 				int finishedlocal = 0;
4022 				MPI_Allreduce( &finishedlocal, &finishedglobal, 1, MPI_INT, MPI_BAND, commGrid->colWorld);
4023 			}
4024 		} // end_if for "send buffer is full" case
4025 		++cnz;
4026 	}
4027 
4028 	// signal the end to row neighbors
4029 	std::fill_n(rcurptrs, rowneighs, std::numeric_limits<int>::max());
4030 	int recvcount;
4031 	MPI_Scatter(rcurptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, rankinrow, commGrid->rowWorld);
4032 }
4033 
4034 
4035 /**
4036  * Private subroutine of ReadDistribute
4037  * @param[in] rankinrow {Row head's rank in its processor row}
4038  * Initially temp_xxx arrays carry data received along the proc. column AND needs to be sent along the proc. row
4039  * After usage, function frees the memory of temp_xxx arrays and then creates and frees memory of all the six arrays itself
4040  */
4041 template <class IT, class NT, class DER>
HorizontalSend(IT * & rows,IT * & cols,NT * & vals,IT * & temprows,IT * & tempcols,NT * & tempvals,std::vector<std::tuple<IT,IT,NT>> & localtuples,int * rcurptrs,int * rdispls,IT buffperrowneigh,int rowneighs,int recvcount,IT m_perproc,IT n_perproc,int rankinrow)4042 void SpParMat<IT,NT,DER>::HorizontalSend(IT * & rows, IT * & cols, NT * & vals, IT * & temprows, IT * & tempcols, NT * & tempvals, std::vector < std::tuple <IT,IT,NT> > & localtuples,
4043 					 int * rcurptrs, int * rdispls, IT buffperrowneigh, int rowneighs, int recvcount, IT m_perproc, IT n_perproc, int rankinrow)
4044 {
4045 	rows = new IT [ buffperrowneigh * rowneighs ];
4046 	cols = new IT [ buffperrowneigh * rowneighs ];
4047 	vals = new NT [ buffperrowneigh * rowneighs ];
4048 
4049 	// prepare to send the data along the horizontal
4050 	for(int i=0; i< recvcount; ++i)
4051 	{
4052 		int rowrec = std::min(static_cast<int>(tempcols[i] / n_perproc), rowneighs-1);
4053 		rows[ rowrec * buffperrowneigh + rcurptrs[rowrec] ] = temprows[i];
4054 		cols[ rowrec * buffperrowneigh + rcurptrs[rowrec] ] = tempcols[i];
4055 		vals[ rowrec * buffperrowneigh + rcurptrs[rowrec] ] = tempvals[i];
4056 		++ (rcurptrs[rowrec]);
4057 	}
4058 
4059 	#ifdef IODEBUG
4060   std::ofstream oput;
4061 	commGrid->OpenDebugFile("Read", oput);
4062 	oput << "To row neighbors: ";
4063   std::copy(rcurptrs, rcurptrs+rowneighs, std::ostream_iterator<int>(oput, " ")); oput << std::endl;
4064 	oput << "Row displacements were: ";
4065   std::copy(rdispls, rdispls+rowneighs, std::ostream_iterator<int>(oput, " ")); oput << std::endl;
4066 	oput.close();
4067 	#endif
4068 
4069 	MPI_Scatter(rcurptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, rankinrow, commGrid->rowWorld); // Send the receive counts for horizontal communication
4070 
4071 	// the data is now stored in rows/cols/vals, can reset temporaries
4072 	// sets size and capacity to new recvcount
4073 	DeleteAll(temprows, tempcols, tempvals);
4074 	temprows = new IT[recvcount];
4075 	tempcols = new IT[recvcount];
4076 	tempvals = new NT[recvcount];
4077 
4078 	// then, send all buffers that to their recipients ...
4079 	MPI_Scatterv(rows, rcurptrs, rdispls, MPIType<IT>(), temprows, recvcount,  MPIType<IT>(), rankinrow, commGrid->rowWorld);
4080 	MPI_Scatterv(cols, rcurptrs, rdispls, MPIType<IT>(), tempcols, recvcount,  MPIType<IT>(), rankinrow, commGrid->rowWorld);
4081 	MPI_Scatterv(vals, rcurptrs, rdispls, MPIType<NT>(), tempvals, recvcount,  MPIType<NT>(), rankinrow, commGrid->rowWorld);
4082 
4083 	// now push what is ours to tuples
4084 	IT moffset = commGrid->myprocrow * m_perproc;
4085 	IT noffset = commGrid->myproccol * n_perproc;
4086 
4087 	for(int i=0; i< recvcount; ++i)
4088 	{
4089 		localtuples.push_back( 	std::make_tuple(temprows[i]-moffset, tempcols[i]-noffset, tempvals[i]) );
4090 	}
4091 
4092 	std::fill_n(rcurptrs, rowneighs, 0);
4093 	DeleteAll(rows, cols, vals, temprows, tempcols, tempvals);
4094 }
4095 
4096 
4097 //! The input parameters' identity (zero) elements as well as
4098 //! their communication grid is preserved while outputting
4099 template <class IT, class NT, class DER>
Find(FullyDistVec<IT,IT> & distrows,FullyDistVec<IT,IT> & distcols,FullyDistVec<IT,NT> & distvals) const4100 void SpParMat<IT,NT,DER>::Find (FullyDistVec<IT,IT> & distrows, FullyDistVec<IT,IT> & distcols, FullyDistVec<IT,NT> & distvals) const
4101 {
4102 	if((*(distrows.commGrid) != *(distcols.commGrid)) || (*(distcols.commGrid) != *(distvals.commGrid)))
4103 	{
4104 		SpParHelper::Print("Grids are not comparable, Find() fails!", commGrid->GetWorld());
4105 		MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
4106 	}
4107 	IT globallen = getnnz();
4108 	SpTuples<IT,NT> Atuples(*spSeq);
4109 
4110 	FullyDistVec<IT,IT> nrows ( distrows.commGrid, globallen, 0);
4111 	FullyDistVec<IT,IT> ncols ( distcols.commGrid, globallen, 0);
4112 	FullyDistVec<IT,NT> nvals ( distvals.commGrid, globallen, NT());
4113 
4114 	IT prelen = Atuples.getnnz();
4115 	//IT postlen = nrows.MyLocLength();
4116 
4117 	int rank = commGrid->GetRank();
4118 	int nprocs = commGrid->GetSize();
4119 	IT * prelens = new IT[nprocs];
4120 	prelens[rank] = prelen;
4121 	MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(), prelens, 1, MPIType<IT>(), commGrid->GetWorld());
4122 	IT prelenuntil = std::accumulate(prelens, prelens+rank, static_cast<IT>(0));
4123 
4124 	int * sendcnt = new int[nprocs]();	// zero initialize
4125 	IT * rows = new IT[prelen];
4126 	IT * cols = new IT[prelen];
4127 	NT * vals = new NT[prelen];
4128 
4129 	int rowrank = commGrid->GetRankInProcRow();
4130 	int colrank = commGrid->GetRankInProcCol();
4131 	int rowneighs = commGrid->GetGridCols();
4132 	int colneighs = commGrid->GetGridRows();
4133 	IT * locnrows = new IT[colneighs];	// number of rows is calculated by a reduction among the processor column
4134 	IT * locncols = new IT[rowneighs];
4135 	locnrows[colrank] = getlocalrows();
4136 	locncols[rowrank] = getlocalcols();
4137 
4138 	MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(),locnrows, 1, MPIType<IT>(), commGrid->GetColWorld());
4139 	MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(),locncols, 1, MPIType<IT>(), commGrid->GetRowWorld());
4140 
4141 	IT roffset = std::accumulate(locnrows, locnrows+colrank, static_cast<IT>(0));
4142 	IT coffset = std::accumulate(locncols, locncols+rowrank, static_cast<IT>(0));
4143 
4144 	DeleteAll(locnrows, locncols);
4145 	for(int i=0; i< prelen; ++i)
4146 	{
4147 		IT locid;	// ignore local id, data will come in order
4148 		int owner = nrows.Owner(prelenuntil+i, locid);
4149 		sendcnt[owner]++;
4150 
4151 		rows[i] = Atuples.rowindex(i) + roffset;	// need the global row index
4152 		cols[i] = Atuples.colindex(i) + coffset;	// need the global col index
4153 		vals[i] = Atuples.numvalue(i);
4154 	}
4155 
4156 	int * recvcnt = new int[nprocs];
4157 	MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, commGrid->GetWorld());   // get the recv counts
4158 
4159 	int * sdpls = new int[nprocs]();	// displacements (zero initialized pid)
4160 	int * rdpls = new int[nprocs]();
4161 	std::partial_sum(sendcnt, sendcnt+nprocs-1, sdpls+1);
4162 	std::partial_sum(recvcnt, recvcnt+nprocs-1, rdpls+1);
4163 
4164 	MPI_Alltoallv(rows, sendcnt, sdpls, MPIType<IT>(), SpHelper::p2a(nrows.arr), recvcnt, rdpls, MPIType<IT>(), commGrid->GetWorld());
4165 	MPI_Alltoallv(cols, sendcnt, sdpls, MPIType<IT>(), SpHelper::p2a(ncols.arr), recvcnt, rdpls, MPIType<IT>(), commGrid->GetWorld());
4166 	MPI_Alltoallv(vals, sendcnt, sdpls, MPIType<NT>(), SpHelper::p2a(nvals.arr), recvcnt, rdpls, MPIType<NT>(), commGrid->GetWorld());
4167 
4168 	DeleteAll(sendcnt, recvcnt, sdpls, rdpls);
4169 	DeleteAll(prelens, rows, cols, vals);
4170 	distrows = nrows;
4171 	distcols = ncols;
4172 	distvals = nvals;
4173 }
4174 
4175 //! The input parameters' identity (zero) elements as well as
4176 //! their communication grid is preserved while outputting
4177 template <class IT, class NT, class DER>
Find(FullyDistVec<IT,IT> & distrows,FullyDistVec<IT,IT> & distcols) const4178 void SpParMat<IT,NT,DER>::Find (FullyDistVec<IT,IT> & distrows, FullyDistVec<IT,IT> & distcols) const
4179 {
4180 	if((*(distrows.commGrid) != *(distcols.commGrid)) )
4181 	{
4182 		SpParHelper::Print("Grids are not comparable, Find() fails!", commGrid->GetWorld());
4183 		MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
4184 	}
4185 	IT globallen = getnnz();
4186 	SpTuples<IT,NT> Atuples(*spSeq);
4187 
4188 	FullyDistVec<IT,IT> nrows ( distrows.commGrid, globallen, 0);
4189 	FullyDistVec<IT,IT> ncols ( distcols.commGrid, globallen, 0);
4190 
4191 	IT prelen = Atuples.getnnz();
4192 
4193 	int rank = commGrid->GetRank();
4194 	int nprocs = commGrid->GetSize();
4195 	IT * prelens = new IT[nprocs];
4196 	prelens[rank] = prelen;
4197 	MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(), prelens, 1, MPIType<IT>(), commGrid->GetWorld());
4198 	IT prelenuntil = std::accumulate(prelens, prelens+rank, static_cast<IT>(0));
4199 
4200 	int * sendcnt = new int[nprocs]();	// zero initialize
4201 	IT * rows = new IT[prelen];
4202 	IT * cols = new IT[prelen];
4203 	NT * vals = new NT[prelen];
4204 
4205 	int rowrank = commGrid->GetRankInProcRow();
4206 	int colrank = commGrid->GetRankInProcCol();
4207 	int rowneighs = commGrid->GetGridCols();
4208 	int colneighs = commGrid->GetGridRows();
4209 	IT * locnrows = new IT[colneighs];	// number of rows is calculated by a reduction among the processor column
4210 	IT * locncols = new IT[rowneighs];
4211 	locnrows[colrank] = getlocalrows();
4212 	locncols[rowrank] = getlocalcols();
4213 
4214 	MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(),locnrows, 1, MPIType<IT>(), commGrid->GetColWorld());
4215 	MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(),locncols, 1, MPIType<IT>(), commGrid->GetColWorld());
4216 	IT roffset = std::accumulate(locnrows, locnrows+colrank, static_cast<IT>(0));
4217 	IT coffset = std::accumulate(locncols, locncols+rowrank, static_cast<IT>(0));
4218 
4219 	DeleteAll(locnrows, locncols);
4220 	for(int i=0; i< prelen; ++i)
4221 	{
4222 		IT locid;	// ignore local id, data will come in order
4223 		int owner = nrows.Owner(prelenuntil+i, locid);
4224 		sendcnt[owner]++;
4225 
4226 		rows[i] = Atuples.rowindex(i) + roffset;	// need the global row index
4227 		cols[i] = Atuples.colindex(i) + coffset;	// need the global col index
4228 	}
4229 
4230 	int * recvcnt = new int[nprocs];
4231 	MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, commGrid->GetWorld());   // get the recv counts
4232 
4233 	int * sdpls = new int[nprocs]();	// displacements (zero initialized pid)
4234 	int * rdpls = new int[nprocs]();
4235 	std::partial_sum(sendcnt, sendcnt+nprocs-1, sdpls+1);
4236 	std::partial_sum(recvcnt, recvcnt+nprocs-1, rdpls+1);
4237 
4238 	MPI_Alltoallv(rows, sendcnt, sdpls, MPIType<IT>(), SpHelper::p2a(nrows.arr), recvcnt, rdpls, MPIType<IT>(), commGrid->GetWorld());
4239 	MPI_Alltoallv(cols, sendcnt, sdpls, MPIType<IT>(), SpHelper::p2a(ncols.arr), recvcnt, rdpls, MPIType<IT>(), commGrid->GetWorld());
4240 
4241 	DeleteAll(sendcnt, recvcnt, sdpls, rdpls);
4242 	DeleteAll(prelens, rows, cols, vals);
4243 	distrows = nrows;
4244 	distcols = ncols;
4245 }
4246 
4247 template <class IT, class NT, class DER>
put(std::ofstream & outfile) const4248 std::ofstream& SpParMat<IT,NT,DER>::put(std::ofstream& outfile) const
4249 {
4250 	outfile << (*spSeq) << std::endl;
4251 	return outfile;
4252 }
4253 
4254 template <class IU, class NU, class UDER>
operator <<(std::ofstream & outfile,const SpParMat<IU,NU,UDER> & s)4255 std::ofstream& operator<<(std::ofstream& outfile, const SpParMat<IU, NU, UDER> & s)
4256 {
4257 	return s.put(outfile) ;	// use the right put() function
4258 
4259 }
4260 
4261 /**
4262   * @param[in] grow {global row index}
4263   * @param[in] gcol {global column index}
4264   * @param[out] lrow {row index local to the owner}
4265   * @param[out] lcol {col index local to the owner}
4266   * @returns {owner processor id}
4267  **/
4268 template <class IT, class NT,class DER>
4269 template <typename LIT>
Owner(IT total_m,IT total_n,IT grow,IT gcol,LIT & lrow,LIT & lcol) const4270 int SpParMat<IT,NT,DER>::Owner(IT total_m, IT total_n, IT grow, IT gcol, LIT & lrow, LIT & lcol) const
4271 {
4272 	int procrows = commGrid->GetGridRows();
4273 	int proccols = commGrid->GetGridCols();
4274 	IT m_perproc = total_m / procrows;
4275 	IT n_perproc = total_n / proccols;
4276 
4277 	int own_procrow;	// owner's processor row
4278 	if(m_perproc != 0)
4279 	{
4280 		own_procrow = std::min(static_cast<int>(grow / m_perproc), procrows-1);	// owner's processor row
4281 	}
4282 	else	// all owned by the last processor row
4283 	{
4284 		own_procrow = procrows -1;
4285 	}
4286 	int own_proccol;
4287 	if(n_perproc != 0)
4288 	{
4289 		own_proccol = std::min(static_cast<int>(gcol / n_perproc), proccols-1);
4290 	}
4291 	else
4292 	{
4293 		own_proccol = proccols-1;
4294 	}
4295 	lrow = grow - (own_procrow * m_perproc);
4296 	lcol = gcol - (own_proccol * n_perproc);
4297 	return commGrid->GetRank(own_procrow, own_proccol);
4298 }
4299 
4300 /**
4301   * @param[out] rowOffset {Row offset imposed by process grid. Global row index = rowOffset + local row index.}
4302   * @param[out] colOffset {Column offset imposed by process grid. Global column index = colOffset + local column index.}
4303  **/
4304 template <class IT, class NT,class DER>
GetPlaceInGlobalGrid(IT & rowOffset,IT & colOffset) const4305 void SpParMat<IT,NT,DER>::GetPlaceInGlobalGrid(IT& rowOffset, IT& colOffset) const
4306 {
4307 	IT total_rows = getnrow();
4308 	IT total_cols = getncol();
4309 
4310 	int procrows = commGrid->GetGridRows();
4311 	int proccols = commGrid->GetGridCols();
4312 	IT rows_perproc = total_rows / procrows;
4313 	IT cols_perproc = total_cols / proccols;
4314 
4315 	rowOffset = commGrid->GetRankInProcCol()*rows_perproc;
4316 	colOffset = commGrid->GetRankInProcRow()*cols_perproc;
4317 }
4318 
4319 }
4320