1 /****************************************************************/
2 /* Parallel Combinatorial BLAS Library (for Graph Computations) */
3 /* version 1.6 -------------------------------------------------*/
4 /* date: 6/15/2017 ---------------------------------------------*/
5 /* authors: Ariful Azad, Aydin Buluc  --------------------------*/
6 /****************************************************************/
7 /*
8  Copyright (c) 2010-2017, The Regents of the University of California
9 
10  Permission is hereby granted, free of charge, to any person obtaining a copy
11  of this software and associated documentation files (the "Software"), to deal
12  in the Software without restriction, including without limitation the rights
13  to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14  copies of the Software, and to permit persons to whom the Software is
15  furnished to do so, subject to the following conditions:
16 
17  The above copyright notice and this permission notice shall be included in
18  all copies or substantial portions of the Software.
19 
20  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25  OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26  THE SOFTWARE.
27  */
28 
29 
30 #include <limits>
31 #include "FullyDistSpVec.h"
32 #include "SpDefs.h"
33 #include "SpHelper.h"
34 #include "hash.hpp"
35 #include "FileHeader.h"
36 #include <sys/types.h>
37 #include <sys/stat.h>
38 
39 #ifdef GNU_PARALLEL
40 #include <parallel/algorithm>
41 #include <parallel/numeric>
42 #endif
43 
44 #include "usort/parUtils.h"
45 
46 namespace combblas {
47 
48 template <class IT, class NT>
FullyDistSpVec(std::shared_ptr<CommGrid> grid)49 FullyDistSpVec<IT, NT>::FullyDistSpVec ( std::shared_ptr<CommGrid> grid)
50 : FullyDist<IT,NT,typename combblas::disable_if< combblas::is_boolean<NT>::value, NT >::type>(grid)
51 { };
52 
53 template <class IT, class NT>
FullyDistSpVec(std::shared_ptr<CommGrid> grid,IT globallen)54 FullyDistSpVec<IT, NT>::FullyDistSpVec ( std::shared_ptr<CommGrid> grid, IT globallen)
55 : FullyDist<IT,NT,typename combblas::disable_if< combblas::is_boolean<NT>::value, NT >::type>(grid,globallen)
56 { };
57 
58 template <class IT, class NT>
FullyDistSpVec()59 FullyDistSpVec<IT,NT>::FullyDistSpVec ()
60 : FullyDist<IT,NT,typename combblas::disable_if< combblas::is_boolean<NT>::value, NT >::type>()
61 { };
62 
63 template <class IT, class NT>
FullyDistSpVec(IT globallen)64 FullyDistSpVec<IT,NT>::FullyDistSpVec (IT globallen)
65 : FullyDist<IT,NT,typename combblas::disable_if< combblas::is_boolean<NT>::value, NT >::type>(globallen)
66 { }
67 
68 
69 template <class IT, class NT>
operator =(const FullyDistSpVec<IT,NT> & rhs)70 FullyDistSpVec<IT,NT> &  FullyDistSpVec<IT,NT>::operator=(const FullyDistSpVec< IT,NT > & rhs)
71 {
72 	if(this != &rhs)
73 	{
74 		FullyDist<IT,NT,typename combblas::disable_if< combblas::is_boolean<NT>::value, NT >::type>::operator= (rhs);	// to update glen and commGrid
75 		ind = rhs.ind;
76 		num = rhs.num;
77 	}
78 	return *this;
79 }
80 
81 template <class IT, class NT>
FullyDistSpVec(const FullyDistVec<IT,NT> & rhs)82 FullyDistSpVec<IT,NT>::FullyDistSpVec (const FullyDistVec<IT,NT> & rhs) // Conversion copy-constructor
83 : FullyDist<IT,NT,typename combblas::disable_if< combblas::is_boolean<NT>::value, NT >::type>(rhs.commGrid,rhs.glen)
84 {
85 	*this = rhs;
86 }
87 
88 // Conversion copy-constructor where unary op is true
89 template <class IT, class NT>
90 template <typename _UnaryOperation>
FullyDistSpVec(const FullyDistVec<IT,NT> & rhs,_UnaryOperation unop)91 FullyDistSpVec<IT,NT>::FullyDistSpVec (const FullyDistVec<IT,NT> & rhs, _UnaryOperation unop)
92 : FullyDist<IT,NT,typename combblas::disable_if< combblas::is_boolean<NT>::value, NT >::type>(rhs.commGrid,rhs.glen)
93 {
94 	//FullyDist<IT,NT,typename combblas::disable_if< combblas::is_boolean<NT>::value, NT >::type>::operator= (rhs);	// to update glen and commGrid
95 
96 	std::vector<IT>().swap(ind);
97 	std::vector<NT>().swap(num);
98 	IT vecsize = rhs.LocArrSize();
99 	for(IT i=0; i< vecsize; ++i)
100 	{
101 		if(unop(rhs.arr[i]))
102         {
103             ind.push_back(i);
104             num.push_back(rhs.arr[i]);
105         }
106 	}
107 }
108 
109 
110 
111 // create a sparse vector from local vectors
112 template <class IT, class NT>
FullyDistSpVec(std::shared_ptr<CommGrid> grid,IT globallen,const std::vector<IT> & indvec,const std::vector<NT> & numvec,bool SumDuplicates,bool sorted)113 FullyDistSpVec<IT,NT>::FullyDistSpVec (std::shared_ptr<CommGrid> grid, IT globallen, const std::vector<IT>& indvec, const std::vector<NT> & numvec, bool SumDuplicates, bool sorted)
114 : FullyDist<IT,NT,typename combblas::disable_if< combblas::is_boolean<NT>::value, NT >::type>(grid, globallen)
115 {
116 
117     assert(indvec.size()==numvec.size());
118     IT vecsize = indvec.size();
119     if(!sorted)
120     {
121         std::vector< std::pair<IT,NT> > tosort(vecsize);
122 #ifdef _OPENMP
123 #pragma omp parallel for
124 #endif
125         for(IT i=0; i<vecsize; ++i)
126         {
127             tosort[i] = std::make_pair(indvec[i], numvec[i]);
128         }
129 
130 #if defined(GNU_PARALLEL) && defined(_OPENMP)
131         __gnu_parallel::sort(tosort.begin(), tosort.end());
132 #else
133         std::sort(tosort.begin(), tosort.end());
134 #endif
135 
136 
137         ind.reserve(vecsize);
138         num.reserve(vecsize);
139         IT lastIndex=-1;
140         for(auto itr = tosort.begin(); itr != tosort.end(); ++itr)
141         {
142             if(lastIndex!=itr->first) //if SumDuplicates=false, keep only the first one
143             {
144                 ind.push_back(itr->first);
145                 num.push_back(itr->second);
146                 lastIndex = itr->first;
147             }
148             else if(SumDuplicates)
149             {
150                 num.back() += itr->second;
151             }
152         }
153 
154     }
155     else
156     {
157 
158         ind.reserve(vecsize);
159         num.reserve(vecsize);
160         IT lastIndex=-1;
161 
162         for(IT i=0; i< vecsize; ++i)
163         {
164             if(lastIndex!=indvec[i]) //if SumDuplicates=false, keep only the first one
165             {
166                 ind.push_back(indvec[i]);
167                 num.push_back(numvec[i]);
168                 lastIndex = indvec[i];
169             }
170             else if(SumDuplicates)
171             {
172                 num.back() += numvec[i];
173             }
174         }
175     }
176 }
177 
178 
179 
180 // ABAB: This function probably operates differently than a user would immediately expect
181 // ABAB: Write a well-posed description for it
182 template <class IT, class NT>
operator =(const FullyDistVec<IT,NT> & rhs)183 FullyDistSpVec<IT,NT> &  FullyDistSpVec<IT,NT>::operator=(const FullyDistVec< IT,NT > & rhs)		// conversion from dense
184 {
185 	FullyDist<IT,NT,typename combblas::disable_if< combblas::is_boolean<NT>::value, NT >::type>::operator= (rhs);	// to update glen and commGrid
186 
187 	std::vector<IT>().swap(ind);
188 	std::vector<NT>().swap(num);
189 	IT vecsize = rhs.LocArrSize();
190 	for(IT i=0; i< vecsize; ++i)
191 	{
192 		// rhs.zero does not exist after CombBLAS 1.2
193 		ind.push_back(i);
194 		num.push_back(rhs.arr[i]);
195 	}
196 	return *this;
197 }
198 
199 
200 /************************************************************************
201  * Create a sparse vector from index and value vectors (dense vectors)
202  * FullyDistSpVec v(globalsize, inds, vals):
203  *      nnz(v) = size(inds) = size(vals)
204  *      size(v) = globallen
205  *      if inds has duplicate entries and SumDuplicates is true then we sum
206  *      the values of duplicate indices. Otherwise, only the first entry is kept.
207  ************************************************************************/
208 template <class IT, class NT>
FullyDistSpVec(IT globallen,const FullyDistVec<IT,IT> & inds,const FullyDistVec<IT,NT> & vals,bool SumDuplicates)209 FullyDistSpVec<IT,NT>::FullyDistSpVec (IT globallen, const FullyDistVec<IT,IT> & inds,  const FullyDistVec<IT,NT> & vals, bool SumDuplicates)
210 : FullyDist<IT,NT,typename combblas::disable_if< combblas::is_boolean<NT>::value, NT >::type>(inds.commGrid,globallen)
211 {
212     if(*(inds.commGrid) != *(vals.commGrid))
213     {
214         SpParHelper::Print("Grids are not comparable, FullyDistSpVec() fails !");
215         MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
216     }
217     if(inds.TotalLength() != vals.TotalLength())
218     {
219         SpParHelper::Print("Index and value vectors have different sizes, FullyDistSpVec() fails !");
220         MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
221     }
222     //commGrid = inds.commGrid;
223     //glen = globallen;
224 
225     IT maxind = inds.Reduce(maximum<IT>(), (IT) 0);
226     if(maxind>=globallen)
227     {
228         SpParHelper::Print("At least one index is greater than globallen, FullyDistSpVec() fails !");
229         MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
230     }
231 
232 
233 
234     MPI_Comm World = commGrid->GetWorld();
235     int nprocs = commGrid->GetSize();
236     int * rdispls = new int[nprocs];
237     int * recvcnt = new int[nprocs];
238     int * sendcnt = new int[nprocs](); // initialize to 0
239     int * sdispls = new int[nprocs];
240 
241     // ----- share count --------
242     IT locsize = inds.LocArrSize();
243     for(IT i=0; i<locsize; ++i)
244     {
245         IT locind;
246         int owner = Owner(inds.arr[i], locind);
247         sendcnt[owner]++;
248     }
249     MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, World);
250 
251 
252     // ----- compute send and receive displacements --------
253 
254     sdispls[0] = 0;
255     rdispls[0] = 0;
256     for(int i=0; i<nprocs-1; ++i)
257     {
258         sdispls[i+1] = sdispls[i] + sendcnt[i];
259         rdispls[i+1] = rdispls[i] + recvcnt[i];
260     }
261 
262 
263     // ----- prepare data to be sent --------
264 
265     NT * datbuf = new NT[locsize];
266     IT * indbuf = new IT[locsize];
267     int *count = new int[nprocs](); //current position
268     for(IT i=0; i < locsize; ++i)
269     {
270         IT locind;
271         int owner = Owner(inds.arr[i], locind);
272         int id = sdispls[owner] + count[owner];
273         datbuf[id] = vals.arr[i];
274         indbuf[id] = locind;
275         count[owner]++;
276     }
277     delete [] count;
278     IT totrecv = std::accumulate(recvcnt,recvcnt+nprocs, static_cast<IT>(0));
279 
280 
281     // ----- Send and receive indices and values --------
282 
283     NT * recvdatbuf = new NT[totrecv];
284     MPI_Alltoallv(datbuf, sendcnt, sdispls, MPIType<NT>(), recvdatbuf, recvcnt, rdispls, MPIType<NT>(), World);
285     delete [] datbuf;
286 
287     IT * recvindbuf = new IT[totrecv];
288     MPI_Alltoallv(indbuf, sendcnt, sdispls, MPIType<IT>(), recvindbuf, recvcnt, rdispls, MPIType<IT>(), World);
289     delete [] indbuf;
290 
291 
292     // ------ merge and sort received data ----------
293 
294     std::vector< std::pair<IT,NT> > tosort;
295     tosort.resize(totrecv);
296     for(int i=0; i<totrecv; ++i)
297     {
298         tosort[i] = std::make_pair(recvindbuf[i], recvdatbuf[i]);
299     }
300     DeleteAll(recvindbuf, recvdatbuf);
301     DeleteAll(sdispls, rdispls, sendcnt, recvcnt);
302     std::sort(tosort.begin(), tosort.end());
303 
304 
305     // ------ create local sparse vector ----------
306 
307     ind.reserve(totrecv);
308     num.reserve(totrecv);
309     IT lastIndex=-1;
310     for(auto itr = tosort.begin(); itr != tosort.end(); ++itr)
311     {
312         if(lastIndex!=itr->first) //if SumDuplicates=false, keep only the first one
313         {
314             ind.push_back(itr->first);
315             num.push_back(itr->second);
316             lastIndex = itr->first;
317         }
318         else if(SumDuplicates)
319         {
320             num.back() += itr->second;
321         }
322     }
323 }
324 
325 
326 //! Returns a dense vector of nonzero values
327 //! for which the predicate is satisfied on values
328 template <class IT, class NT>
329 template <typename _Predicate>
FindVals(_Predicate pred) const330 FullyDistVec<IT,NT> FullyDistSpVec<IT,NT>::FindVals(_Predicate pred) const
331 {
332     FullyDistVec<IT,NT> found(commGrid);
333     MPI_Comm World = commGrid->GetWorld();
334     int nprocs = commGrid->GetSize();
335     int rank = commGrid->GetRank();
336 
337     IT sizelocal = getlocnnz();
338     for(IT i=0; i<sizelocal; ++i)
339     {
340         if(pred(num[i]))
341         {
342             found.arr.push_back(num[i]);
343         }
344     }
345     IT * dist = new IT[nprocs];
346     IT nsize = found.arr.size();
347     dist[rank] = nsize;
348     MPI_Allgather(MPI_IN_PLACE, 1, MPIType<IT>(), dist, 1, MPIType<IT>(), World);
349     IT lengthuntil = std::accumulate(dist, dist+rank, static_cast<IT>(0));
350     found.glen = std::accumulate(dist, dist+nprocs, static_cast<IT>(0));
351 
352     // Although the found vector is not reshuffled yet, its glen and commGrid are set
353     // We can call the Owner/MyLocLength/LengthUntil functions (to infer future distribution)
354 
355     // rebalance/redistribute
356     int * sendcnt = new int[nprocs];
357     std::fill(sendcnt, sendcnt+nprocs, 0);
358     for(IT i=0; i<nsize; ++i)
359     {
360         IT locind;
361         int owner = found.Owner(i+lengthuntil, locind);
362         ++sendcnt[owner];
363     }
364     int * recvcnt = new int[nprocs];
365     MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, World); // share the counts
366 
367     int * sdispls = new int[nprocs];
368     int * rdispls = new int[nprocs];
369     sdispls[0] = 0;
370     rdispls[0] = 0;
371     for(int i=0; i<nprocs-1; ++i)
372     {
373         sdispls[i+1] = sdispls[i] + sendcnt[i];
374         rdispls[i+1] = rdispls[i] + recvcnt[i];
375     }
376     IT totrecv = std::accumulate(recvcnt,recvcnt+nprocs, static_cast<IT>(0));
377     std::vector<NT> recvbuf(totrecv);
378 
379     // data is already in the right order in found.arr
380     MPI_Alltoallv(found.arr.data(), sendcnt, sdispls, MPIType<NT>(), recvbuf.data(), recvcnt, rdispls, MPIType<NT>(), World);
381     found.arr.swap(recvbuf);
382     delete [] dist;
383     DeleteAll(sendcnt, recvcnt, sdispls, rdispls);
384 
385     return found;
386 }
387 
388 
389 
390 //! Returns a dense vector of nonzero global indices
391 //! for which the predicate is satisfied on values
392 template <class IT, class NT>
393 template <typename _Predicate>
FindInds(_Predicate pred) const394 FullyDistVec<IT,IT> FullyDistSpVec<IT,NT>::FindInds(_Predicate pred) const
395 {
396     FullyDistVec<IT,IT> found(commGrid);
397     MPI_Comm World = commGrid->GetWorld();
398     int nprocs = commGrid->GetSize();
399     int rank = commGrid->GetRank();
400 
401     IT sizelocal = getlocnnz();
402     IT sizesofar = LengthUntil();
403     for(IT i=0; i<sizelocal; ++i)
404     {
405         if(pred(num[i]))
406         {
407             found.arr.push_back(ind[i]+sizesofar);
408         }
409     }
410     IT * dist = new IT[nprocs];
411     IT nsize = found.arr.size();
412     dist[rank] = nsize;
413     MPI_Allgather(MPI_IN_PLACE, 1, MPIType<IT>(), dist, 1, MPIType<IT>(), World);
414     IT lengthuntil = std::accumulate(dist, dist+rank, static_cast<IT>(0));
415     found.glen = std::accumulate(dist, dist+nprocs, static_cast<IT>(0));
416 
417     // Although the found vector is not reshuffled yet, its glen and commGrid are set
418     // We can call the Owner/MyLocLength/LengthUntil functions (to infer future distribution)
419 
420     // rebalance/redistribute
421     int * sendcnt = new int[nprocs];
422     std::fill(sendcnt, sendcnt+nprocs, 0);
423     for(IT i=0; i<nsize; ++i)
424     {
425         IT locind;
426         int owner = found.Owner(i+lengthuntil, locind);
427         ++sendcnt[owner];
428     }
429     int * recvcnt = new int[nprocs];
430     MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, World); // share the counts
431 
432     int * sdispls = new int[nprocs];
433     int * rdispls = new int[nprocs];
434     sdispls[0] = 0;
435     rdispls[0] = 0;
436     for(int i=0; i<nprocs-1; ++i)
437     {
438         sdispls[i+1] = sdispls[i] + sendcnt[i];
439         rdispls[i+1] = rdispls[i] + recvcnt[i];
440     }
441     IT totrecv = std::accumulate(recvcnt,recvcnt+nprocs, static_cast<IT>(0));
442     std::vector<IT> recvbuf(totrecv);
443 
444     // data is already in the right order in found.arr
445     MPI_Alltoallv(found.arr.data(), sendcnt, sdispls, MPIType<IT>(), recvbuf.data(), recvcnt, rdispls, MPIType<IT>(), World);
446     found.arr.swap(recvbuf);
447     delete [] dist;
448     DeleteAll(sendcnt, recvcnt, sdispls, rdispls);
449 
450     return found;
451 }
452 
453 
454 
455 template <class IT, class NT>
stealFrom(FullyDistSpVec<IT,NT> & victim)456 void FullyDistSpVec<IT,NT>::stealFrom(FullyDistSpVec<IT,NT> & victim)
457 {
458 	FullyDist<IT,NT,typename combblas::disable_if< combblas::is_boolean<NT>::value, NT >::type>::operator= (victim);	// to update glen and commGrid
459 	ind.swap(victim.ind);
460 	num.swap(victim.num);
461 }
462 
463 template <class IT, class NT>
operator [](IT indx)464 NT FullyDistSpVec<IT,NT>::operator[](IT indx)
465 {
466 	NT val;
467 	IT locind;
468 	int owner = Owner(indx, locind);
469 	int found = 0;
470 	if(commGrid->GetRank() == owner)
471 	{
472 		typename std::vector<IT>::const_iterator it = std::lower_bound(ind.begin(), ind.end(), locind);	// ind is a sorted vector
473 		if(it != ind.end() && locind == (*it))	// found
474 		{
475 			val = num[it-ind.begin()];
476 			found = 1;
477 		}
478 		else
479 		{
480 			val = NT();	// return NULL
481 			found = 0;
482 		}
483 	}
484 	MPI_Bcast(&found, 1, MPI_INT, owner, commGrid->GetWorld());
485 	MPI_Bcast(&val, 1, MPIType<NT>(), owner, commGrid->GetWorld());
486 	wasFound = found;
487 	return val;
488 }
489 
490 template <class IT, class NT>
GetLocalElement(IT indx)491 NT FullyDistSpVec<IT,NT>::GetLocalElement(IT indx)
492 {
493 	NT val = NT();
494 	IT locind;
495 	int owner = Owner(indx, locind);
496 	int found = 0;
497 	typename std::vector<IT>::const_iterator it = std::lower_bound(ind.begin(), ind.end(), locind);	// ind is a sorted vector
498 	if(commGrid->GetRank() == owner) {
499 		if(it != ind.end() && locind == (*it))	// found
500 		{
501 			val = num[it-ind.begin()];
502 			found = 1;
503 		}
504 	}
505 	wasFound = found;
506 	return val;
507 }
508 
509 //! Indexing is performed 0-based
510 template <class IT, class NT>
SetElement(IT indx,NT numx)511 void FullyDistSpVec<IT,NT>::SetElement (IT indx, NT numx)
512 {
513 	if(glen == 0)
514 		SpParHelper::Print("WARNING: SetElement() called on a vector with zero length\n");
515 
516 	IT locind;
517 	int owner = Owner(indx, locind);
518 	if(commGrid->GetRank() == owner)
519 	{
520 		typename std::vector<IT>::iterator iter = std::lower_bound(ind.begin(), ind.end(), locind);
521 		if(iter == ind.end())	// beyond limits, insert from back
522 		{
523 			ind.push_back(locind);
524 			num.push_back(numx);
525 		}
526 		else if (locind < *iter)	// not found, insert in the middle
527 		{
528 			// the order of insertions is crucial
529 			// if we first insert to ind, then ind.begin() is invalidated !
530 			num.insert(num.begin() + (iter-ind.begin()), numx);
531 			ind.insert(iter, locind);
532 		}
533 		else // found
534 		{
535 			*(num.begin() + (iter-ind.begin())) = numx;
536 		}
537 	}
538 }
539 
540 template <class IT, class NT>
DelElement(IT indx)541 void FullyDistSpVec<IT,NT>::DelElement (IT indx)
542 {
543 	IT locind;
544 	int owner = Owner(indx, locind);
545 	if(commGrid->GetRank() == owner)
546 	{
547 		typename std::vector<IT>::iterator iter = std::lower_bound(ind.begin(), ind.end(), locind);
548 		if(iter != ind.end() && !(locind < *iter))
549 		{
550 			num.erase(num.begin() + (iter-ind.begin()));
551 			ind.erase(iter);
552 		}
553 	}
554 }
555 
556 /**
557  * The distribution and length are inherited from ri
558  * Its zero is inherited from *this (because ri is of type IT)
559  * Example: This is [{1,n1},{4,n4},{7,n7},{8,n8},{9,n9}] with P_00 owning {1,4} and P_11 rest
560  * Assume ri = [4,1,5,7] is distributed as two elements per processor
561  * Then result has length 4, distrubuted two element per processor, even though 5 and 7 doesn't exist
562  * This is because we are returning a "dense" output, so the absent elements will be padded with 0
563 **/
564 template <class IT, class NT>
operator ()(const FullyDistVec<IT,IT> & ri) const565 FullyDistVec<IT,NT> FullyDistSpVec<IT,NT>::operator() (const FullyDistVec<IT,IT> & ri) const
566 {
567 	MPI_Comm World = commGrid->GetWorld();
568 	FullyDistVec<IT,NT> Indexed(ri.commGrid, ri.glen, NT());	// NT() is the initial value
569 	int nprocs;
570 	MPI_Comm_size(World, &nprocs);
571 	std::unordered_map<IT, IT> revr_map;       // inverted index that maps indices of *this to indices of output
572 	std::vector< std::vector<IT> > data_req(nprocs);
573 	IT locnnz = ri.LocArrSize();
574 
575 	// ABAB: Input sanity check
576 	int local = 1;
577 	int whole = 1;
578 	for(IT i=0; i < locnnz; ++i)
579 	{
580 		if(ri.arr[i] >= glen || ri.arr[i] < 0)
581 		{
582 			local = 0;
583 		}
584 	}
585 	MPI_Allreduce( &local, &whole, 1, MPI_INT, MPI_BAND, World);
586 	if(whole == 0)
587 	{
588 		throw outofrangeexception();
589 	}
590 
591 	for(IT i=0; i < locnnz; ++i)
592 	{
593 		IT locind;
594 		int owner = Owner(ri.arr[i], locind);	// numerical values in ri are 0-based
595 		data_req[owner].push_back(locind);
596                 revr_map.insert(typename std::unordered_map<IT, IT>::value_type(locind, i));
597 	}
598 	IT * sendbuf = new IT[locnnz];
599 	int * sendcnt = new int[nprocs];
600 	int * sdispls = new int[nprocs];
601 	for(int i=0; i<nprocs; ++i)
602 		sendcnt[i] = data_req[i].size();
603 
604 	int * rdispls = new int[nprocs];
605 	int * recvcnt = new int[nprocs];
606 	MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, World);  // share the request counts
607 
608 	sdispls[0] = 0;
609 	rdispls[0] = 0;
610 	for(int i=0; i<nprocs-1; ++i)
611 	{
612 		sdispls[i+1] = sdispls[i] + sendcnt[i];
613 		rdispls[i+1] = rdispls[i] + recvcnt[i];
614 	}
615 	IT totrecv = std::accumulate(recvcnt,recvcnt+nprocs,0);
616 	IT * recvbuf = new IT[totrecv];
617 
618 	for(int i=0; i<nprocs; ++i)
619 	{
620     std::copy(data_req[i].begin(), data_req[i].end(), sendbuf+sdispls[i]);
621 		std::vector<IT>().swap(data_req[i]);
622 	}
623 	MPI_Alltoallv(sendbuf, sendcnt, sdispls, MPIType<IT>(), recvbuf, recvcnt, rdispls, MPIType<IT>(), World);  // request data
624 
625 	// We will return the requested data,
626 	// our return can be at most as big as the request
627 	// and smaller if we are missing some elements
628 	IT * indsback = new IT[totrecv];
629 	NT * databack = new NT[totrecv];
630 
631 	int * ddispls = new int[nprocs];
632 	std::copy(rdispls, rdispls+nprocs, ddispls);
633 	for(int i=0; i<nprocs; ++i)
634 	{
635 		// this is not the most efficient method because it scans ind vector nprocs = sqrt(p) times
636 		IT * it = std::set_intersection(recvbuf+rdispls[i], recvbuf+rdispls[i]+recvcnt[i], ind.begin(), ind.end(), indsback+rdispls[i]);
637 		recvcnt[i] = (it - (indsback+rdispls[i]));	// update with size of the intersection
638 
639 		IT vi = 0;
640 		for(int j = rdispls[i]; j < rdispls[i] + recvcnt[i]; ++j)	// fetch the numerical values
641 		{
642 			// indsback is a subset of ind
643 			while(indsback[j] > ind[vi])
644 				++vi;
645 			databack[j] = num[vi++];
646 		}
647 	}
648 
649 	DeleteAll(recvbuf, ddispls);
650 	NT * databuf = new NT[ri.LocArrSize()];
651 
652 	MPI_Alltoall(recvcnt, 1, MPI_INT, sendcnt, 1, MPI_INT, World);	// share the response counts, overriding request counts
653 	MPI_Alltoallv(indsback, recvcnt, rdispls, MPIType<IT>(), sendbuf, sendcnt, sdispls, MPIType<IT>(), World);  // send indices
654 	MPI_Alltoallv(databack, recvcnt, rdispls, MPIType<NT>(), databuf, sendcnt, sdispls, MPIType<NT>(), World);  // send data
655 	DeleteAll(rdispls, recvcnt, indsback, databack);
656 
657 	// Now create the output from databuf (holds numerical values) and sendbuf (holds indices)
658 	// arr is already resized during its construction
659 	for(int i=0; i<nprocs; ++i)
660 	{
661 		// data will come globally sorted from processors
662 		// i.e. ind owned by proc_i is always smaller than
663 		// ind owned by proc_j for j < i
664 		for(int j=sdispls[i]; j< sdispls[i]+sendcnt[i]; ++j)
665 		{
666 			typename std::unordered_map<IT,IT>::iterator it = revr_map.find(sendbuf[j]);
667 			Indexed.arr[it->second] = databuf[j];
668 			// cout << it->second << "(" << sendbuf[j] << "):" << databuf[j] << endl;
669 		}
670 	}
671 	DeleteAll(sdispls, sendcnt, sendbuf, databuf);
672 	return Indexed;
673 }
674 
675 template <class IT, class NT>
iota(IT globalsize,NT first)676 void FullyDistSpVec<IT,NT>::iota(IT globalsize, NT first)
677 {
678 	glen = globalsize;
679 	IT length = MyLocLength();
680 	ind.resize(length);
681 	num.resize(length);
682 	SpHelper::iota(ind.begin(), ind.end(), 0);	// offset'd within processors
683 	SpHelper::iota(num.begin(), num.end(), LengthUntil() + first);	// global across processors
684 }
685 
686 
687 //! iota over existing nonzero entries
688 template <class IT, class NT>
nziota(NT first)689 void FullyDistSpVec<IT,NT>::nziota(NT first)
690 {
691     std::iota(num.begin(), num.end(), NnzUntil() + first);	// global across processors
692 }
693 
694 //! Returns the number of nonzeros until this processor
695 template <class IT, class NT>
696 IT FullyDistSpVec<IT,NT>::NnzUntil() const
697 {
698     IT mynnz = ind.size();
699     IT prevnnz = 0;
700     MPI_Scan(&mynnz, &prevnnz, 1, MPIType<IT>(), MPI_SUM, commGrid->GetWorld());
701     return (prevnnz - mynnz);
702 }
703 
704 
705 
706 /* old version
707 // - sorts the entries with respect to nonzero values
708 // - ignores structural zeros
709 // - keeps the sparsity structure intact
710 // - returns a permutation representing the mapping from old to new locations
711 template <class IT, class NT>
712 FullyDistSpVec<IT, IT> FullyDistSpVec<IT, NT>::sort()
713 {
714     MPI_Comm World = commGrid->GetWorld();
715     FullyDistSpVec<IT,IT> temp(commGrid);
716     if(getnnz()==0) return temp;
717     IT nnz = getlocnnz();
718     pair<NT,IT> * vecpair = new pair<NT,IT>[nnz];
719 
720 
721 
722     int nprocs, rank;
723     MPI_Comm_size(World, &nprocs);
724     MPI_Comm_rank(World, &rank);
725 
726     IT * dist = new IT[nprocs];
727     dist[rank] = nnz;
728     MPI_Allgather(MPI_IN_PLACE, 1, MPIType<IT>(), dist, 1, MPIType<IT>(), World);
729     IT until = LengthUntil();
730     for(IT i=0; i< nnz; ++i)
731     {
732         vecpair[i].first = num[i];	// we'll sort wrt numerical values
733         vecpair[i].second = ind[i] + until;
734 
735     }
736     SpParHelper::MemoryEfficientPSort(vecpair, nnz, dist, World);
737 
738     vector< IT > nind(nnz);
739     vector< IT > nnum(nnz);
740 
741     for(IT i=0; i< nnz; ++i)
742     {
743         num[i] = vecpair[i].first;	// sorted range (change the object itself)
744         nind[i] = ind[i];		// make sure the sparsity distribution is the same
745         nnum[i] = vecpair[i].second;	// inverse permutation stored as numerical values
746 
747     }
748     delete [] vecpair;
749     delete [] dist;
750 
751     temp.glen = glen;
752     temp.ind = nind;
753     temp.num = nnum;
754     return temp;
755 }
756 */
757 
758 
759 /*
760  TODO: This function is just a hack at this moment.
761  The indices of the return vector is not correct.
762  FIX this
763  */
764 // - sorts the entries with respect to nonzero values
765 // - ignores structural zeros
766 // - keeps the sparsity structure intact
767 // - returns a permutation representing the mapping from old to new locations
768 template <class IT, class NT>
sort()769 FullyDistSpVec<IT, IT> FullyDistSpVec<IT, NT>::sort()
770 {
771 	MPI_Comm World = commGrid->GetWorld();
772 	FullyDistSpVec<IT,IT> temp(commGrid);
773     if(getnnz()==0) return temp;
774 	IT nnz = getlocnnz();
775 	std::pair<NT,IT> * vecpair = new std::pair<NT,IT>[nnz];
776 
777 
778 
779 	int nprocs, rank;
780 	MPI_Comm_size(World, &nprocs);
781 	MPI_Comm_rank(World, &rank);
782 
783 	IT * dist = new IT[nprocs];
784 	dist[rank] = nnz;
785 	MPI_Allgather(MPI_IN_PLACE, 1, MPIType<IT>(), dist, 1, MPIType<IT>(), World);
786     IT until = LengthUntil();
787 #ifdef THREADED
788 #pragma omp parallel for
789 #endif
790 	for(IT i=0; i< nnz; ++i)
791 	{
792 		vecpair[i].first = num[i];	// we'll sort wrt numerical values
793 		vecpair[i].second = ind[i] + until;
794 
795 	}
796 	std::vector<std::pair<NT,IT>> sorted = SpParHelper::KeyValuePSort(vecpair, nnz, dist, World);
797 
798     nnz = sorted.size();
799     temp.num.resize(nnz);
800     temp.ind.resize(nnz);
801 
802 #ifdef THREADED
803 #pragma omp parallel for
804 #endif
805 	for(IT i=0; i< nnz; ++i)
806 	{
807 		//num[i] = sorted[i].first;	// sorted range (change the object itself)
808 		//nind[i] = ind[i];		// make sure the sparsity distribution is the same
809 		temp.num[i] = sorted[i].second;	// inverse permutation stored as numerical values
810         temp.ind[i] = i; // we are not using this information at this moment
811 	}
812 
813 	delete [] vecpair;
814 	delete [] dist;
815 
816 	temp.glen = glen;
817 	return temp;
818 }
819 
820 
821 
822 /*
823 // - sorts the entries with respect to nonzero values
824 // - ignores structural zeros
825 // - keeps the sparsity structure intact
826 // - returns a permutation representing the mapping from old to new locations
827 template <class IT, class NT>
828 FullyDistSpVec<IT, IT> FullyDistSpVec<IT, NT>::sort()
829 {
830     MPI_Comm World = commGrid->GetWorld();
831     FullyDistSpVec<IT,IT> temp(commGrid);
832     if(getnnz()==0) return temp;
833     IT nnz = getlocnnz();
834     //pair<NT,IT> * vecpair = new pair<NT,IT>[nnz];
835     vector<IndexHolder<NT>> in(nnz);
836     //vector<IndexHolder<NT>> out;
837 
838 
839 
840     int nprocs, rank;
841     MPI_Comm_size(World, &nprocs);
842     MPI_Comm_rank(World, &rank);
843 
844     //IT * dist = new IT[nprocs];
845     //dist[rank] = nnz;
846     //MPI_Allgather(MPI_IN_PLACE, 1, MPIType<IT>(), dist, 1, MPIType<IT>(), World);
847     IT until = LengthUntil();
848     for(IT i=0; i< nnz; ++i)
849     {
850         //vecpair[i].first = num[i];	// we'll sort wrt numerical values
851         //vecpair[i].second = ind[i] + until;
852 
853         in[i] = IndexHolder<NT>(num[i], static_cast<unsigned long>(ind[i] + until));
854     }
855     //SpParHelper::MemoryEfficientPSort(vecpair, nnz, dist, World);
856 
857     //MPI_Barrier(World);
858     //cout << "before sorting " << in.size() << endl;
859     par::sampleSort(in, World);
860     //MPI_Barrier(World);
861     //cout << "after sorting " << in.size() << endl;
862     //MPI_Barrier(World);
863 
864     //vector< IT > nind(out.size());
865     //vector< IT > nnum(out.size());
866 
867     temp.ind.resize(in.size());
868     temp.num.resize(in.size());
869     for(IT i=0; i< in.size(); ++i)
870     {
871         //num[i] = vecpair[i].first;	// sorted range (change the object itself)
872         //nind[i] = ind[i];		// make sure the sparsity distribution is the same
873         //nnum[i] = vecpair[i].second;	// inverse permutation stored as numerical values
874 
875         //num[i] = out[i].value;	// sorted range (change the object itself)
876         //nind[i] = ind[i];		// make sure the sparsity distribution is the same
877         //nnum[i] = static_cast<IT>(out[i].index);	// inverse permutation stored as numerical values
878         temp.num[i] = static_cast<IT>(in[i].index);	// inverse permutation stored as numerical values
879         //cout << temp.num[i] << " " ;
880     }
881 
882     temp.glen = glen;
883     return temp;
884 }
885  */
886 
887 
888 template <class IT, class NT>
889 template <typename _BinaryOperation >
UniqAll2All(_BinaryOperation __binary_op,MPI_Op mympiop)890 FullyDistSpVec<IT,NT> FullyDistSpVec<IT, NT>::UniqAll2All(_BinaryOperation __binary_op, MPI_Op mympiop)
891 {
892     MPI_Comm World = commGrid->GetWorld();
893 	int nprocs = commGrid->GetSize();
894 
895     std::vector< std::vector< NT > > datsent(nprocs);
896 	std::vector< std::vector< IT > > indsent(nprocs);
897 
898     IT locind;
899     size_t locvec = num.size();     // nnz in local vector
900     IT lenuntil = LengthUntil();    // to convert to global index
901 	for(size_t i=0; i< locvec; ++i)
902 	{
903         uint64_t myhash;    // output of MurmurHash3_x64_64 is 64-bits regardless of the input length
904         MurmurHash3_x64_64((const void*) &(num[i]),sizeof(NT), 0, &myhash);
905         double range = static_cast<double>(myhash) * static_cast<double>(glen);
906         NT mapped = range / static_cast<double>(std::numeric_limits<uint64_t>::max());   // mapped is in range [0,n)
907         int owner = Owner(mapped, locind);
908 
909         datsent[owner].push_back(num[i]);  // all identical entries will be hashed to the same value -> same processor
910         indsent[owner].push_back(ind[i]+lenuntil);
911     }
912     int * sendcnt = new int[nprocs];
913 	int * sdispls = new int[nprocs];
914 	for(int i=0; i<nprocs; ++i)
915 		sendcnt[i] = (int) datsent[i].size();
916 
917 	int * rdispls = new int[nprocs];
918 	int * recvcnt = new int[nprocs];
919 	MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, World);  // share the request counts
920 	sdispls[0] = 0;
921 	rdispls[0] = 0;
922 	for(int i=0; i<nprocs-1; ++i)
923 	{
924 		sdispls[i+1] = sdispls[i] + sendcnt[i];
925 		rdispls[i+1] = rdispls[i] + recvcnt[i];
926 	}
927     NT * datbuf = new NT[locvec];
928 	for(int i=0; i<nprocs; ++i)
929 	{
930     std::copy(datsent[i].begin(), datsent[i].end(), datbuf+sdispls[i]);
931 		std::vector<NT>().swap(datsent[i]);
932 	}
933     IT * indbuf = new IT[locvec];
934     for(int i=0; i<nprocs; ++i)
935 	{
936     std::copy(indsent[i].begin(), indsent[i].end(), indbuf+sdispls[i]);
937 		std::vector<IT>().swap(indsent[i]);
938 	}
939     IT totrecv = std::accumulate(recvcnt,recvcnt+nprocs, static_cast<IT>(0));
940 	NT * recvdatbuf = new NT[totrecv];
941 	MPI_Alltoallv(datbuf, sendcnt, sdispls, MPIType<NT>(), recvdatbuf, recvcnt, rdispls, MPIType<NT>(), World);
942     delete [] datbuf;
943 
944     IT * recvindbuf = new IT[totrecv];
945     MPI_Alltoallv(indbuf, sendcnt, sdispls, MPIType<IT>(), recvindbuf, recvcnt, rdispls, MPIType<IT>(), World);
946     delete [] indbuf;
947 
948     std::vector< std::pair<NT,IT> > tosort;   // in fact, tomerge would be a better name but it is unlikely to be faster
949 
950 	for(int i=0; i<nprocs; ++i)
951 	{
952 		for(int j = rdispls[i]; j < rdispls[i] + recvcnt[i]; ++j)	// fetch the numerical values
953 		{
954             tosort.push_back(std::make_pair(recvdatbuf[j], recvindbuf[j]));
955 		}
956 	}
957 	DeleteAll(recvindbuf, recvdatbuf);
958     std::sort(tosort.begin(), tosort.end());
959     //std::unique returns an iterator to the element that follows the last element not removed.
960     typename std::vector< std::pair<NT,IT> >::iterator last;
961     last = std::unique (tosort.begin(), tosort.end(), equal_first<NT,IT>());
962 
963     std::vector< std::vector< NT > > datback(nprocs);
964 	std::vector< std::vector< IT > > indback(nprocs);
965 
966     for(typename std::vector< std::pair<NT,IT> >::iterator itr = tosort.begin(); itr != last; ++itr)
967     {
968         IT locind;
969         int owner = Owner(itr->second, locind);
970 
971         datback[owner].push_back(itr->first);
972         indback[owner].push_back(locind);
973     }
974     for(int i=0; i<nprocs; ++i) sendcnt[i] = (int) datback[i].size();
975     MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, World);  // share the request counts
976     for(int i=0; i<nprocs-1; ++i)
977 	{
978 		sdispls[i+1] = sdispls[i] + sendcnt[i];
979 		rdispls[i+1] = rdispls[i] + recvcnt[i];
980 	}
981     datbuf = new NT[tosort.size()];
982 	for(int i=0; i<nprocs; ++i)
983 	{
984 		std::copy(datback[i].begin(), datback[i].end(), datbuf+sdispls[i]);
985 		std::vector<NT>().swap(datback[i]);
986 	}
987     indbuf = new IT[tosort.size()];
988     for(int i=0; i<nprocs; ++i)
989 	{
990 		std::copy(indback[i].begin(), indback[i].end(), indbuf+sdispls[i]);
991 		std::vector<IT>().swap(indback[i]);
992 	}
993     totrecv = std::accumulate(recvcnt,recvcnt+nprocs, static_cast<IT>(0));   // update value
994 
995     recvdatbuf = new NT[totrecv];
996 	MPI_Alltoallv(datbuf, sendcnt, sdispls, MPIType<NT>(), recvdatbuf, recvcnt, rdispls, MPIType<NT>(), World);
997     delete [] datbuf;
998 
999     recvindbuf = new IT[totrecv];
1000     MPI_Alltoallv(indbuf, sendcnt, sdispls, MPIType<IT>(), recvindbuf, recvcnt, rdispls, MPIType<IT>(), World);
1001     delete [] indbuf;
1002 
1003     FullyDistSpVec<IT,NT> Indexed(commGrid, glen);	// length(Indexed) = length(glen) = length(*this)
1004 
1005     std::vector< std::pair<IT,NT> > back2sort;
1006     for(int i=0; i<nprocs; ++i)
1007 	{
1008 		for(int j = rdispls[i]; j < rdispls[i] + recvcnt[i]; ++j)	// fetch the numerical values
1009 		{
1010             back2sort.push_back(std::make_pair(recvindbuf[j], recvdatbuf[j]));
1011 		}
1012 	}
1013     std::sort(back2sort.begin(), back2sort.end());
1014     for(typename std::vector< std::pair<IT,NT> >::iterator itr = back2sort.begin(); itr != back2sort.end(); ++itr)
1015     {
1016         Indexed.ind.push_back(itr->first);
1017         Indexed.num.push_back(itr->second);
1018     }
1019 
1020     DeleteAll(sdispls, rdispls, sendcnt, recvcnt);
1021     DeleteAll(recvindbuf, recvdatbuf);
1022     return Indexed;
1023 }
1024 
1025 
1026 // ABAB: \todo Concept control so it only gets called in integers
1027 template <class IT, class NT>
1028 template <typename _BinaryOperation >
Uniq(_BinaryOperation __binary_op,MPI_Op mympiop)1029 FullyDistSpVec<IT,NT> FullyDistSpVec<IT, NT>::Uniq(_BinaryOperation __binary_op, MPI_Op mympiop)
1030 {
1031     return UniqAll2All(__binary_op, mympiop);
1032 }
1033 
1034 template <class IT, class NT>
operator +=(const FullyDistSpVec<IT,NT> & rhs)1035 FullyDistSpVec<IT,NT> & FullyDistSpVec<IT, NT>::operator+=(const FullyDistSpVec<IT,NT> & rhs)
1036 {
1037 	if(this != &rhs)
1038 	{
1039 		if(glen != rhs.glen)
1040 		{
1041 			std::cerr << "Vector dimensions don't match for addition\n";
1042 			return *this;
1043 		}
1044 		IT lsize = getlocnnz();
1045 		IT rsize = rhs.getlocnnz();
1046 
1047 		std::vector< IT > nind;
1048 		std::vector< NT > nnum;
1049 		nind.reserve(lsize+rsize);
1050 		nnum.reserve(lsize+rsize);
1051 
1052 		IT i=0, j=0;
1053 		while(i < lsize && j < rsize)
1054 		{
1055 			// assignment won't change the size of vector, push_back is necessary
1056 			if(ind[i] > rhs.ind[j])
1057 			{
1058 				nind.push_back( rhs.ind[j] );
1059 				nnum.push_back( rhs.num[j++] );
1060 			}
1061 			else if(ind[i] < rhs.ind[j])
1062 			{
1063 				nind.push_back( ind[i] );
1064 				nnum.push_back( num[i++] );
1065 			}
1066 			else
1067 			{
1068 				nind.push_back( ind[i] );
1069 				nnum.push_back( num[i++] + rhs.num[j++] );
1070 			}
1071 		}
1072 		while( i < lsize)	// rhs was depleted first
1073 		{
1074 			nind.push_back( ind[i] );
1075 			nnum.push_back( num[i++] );
1076 		}
1077 		while( j < rsize) 	// *this was depleted first
1078 		{
1079 			nind.push_back( rhs.ind[j] );
1080 			nnum.push_back( rhs.num[j++] );
1081 		}
1082 		ind.swap(nind);		// ind will contain the elements of nind with capacity shrunk-to-fit size
1083 		num.swap(nnum);
1084 	}
1085 	else
1086 	{
1087 		typename std::vector<NT>::iterator it;
1088 		for(it = num.begin(); it != num.end(); ++it)
1089 			(*it) *= 2;
1090 	}
1091 	return *this;
1092 };
1093 template <class IT, class NT>
operator -=(const FullyDistSpVec<IT,NT> & rhs)1094 FullyDistSpVec<IT,NT> & FullyDistSpVec<IT, NT>::operator-=(const FullyDistSpVec<IT,NT> & rhs)
1095 {
1096 	if(this != &rhs)
1097 	{
1098 		if(glen != rhs.glen)
1099 		{
1100 			std::cerr << "Vector dimensions don't match for addition\n";
1101 			return *this;
1102 		}
1103 		IT lsize = getlocnnz();
1104 		IT rsize = rhs.getlocnnz();
1105 		std::vector< IT > nind;
1106 		std::vector< NT > nnum;
1107 		nind.reserve(lsize+rsize);
1108 		nnum.reserve(lsize+rsize);
1109 
1110 		IT i=0, j=0;
1111 		while(i < lsize && j < rsize)
1112 		{
1113 			// assignment won't change the size of vector, push_back is necessary
1114 			if(ind[i] > rhs.ind[j])
1115 			{
1116 				nind.push_back( rhs.ind[j] );
1117 				nnum.push_back( -static_cast<NT>(rhs.num[j++]) );
1118 			}
1119 			else if(ind[i] < rhs.ind[j])
1120 			{
1121 				nind.push_back( ind[i] );
1122 				nnum.push_back( num[i++] );
1123 			}
1124 			else
1125 			{
1126 				nind.push_back( ind[i] );
1127 				nnum.push_back( num[i++] - rhs.num[j++] );	// ignore numerical cancellations
1128 			}
1129 		}
1130 		while( i < lsize)	// rhs was depleted first
1131 		{
1132 			nind.push_back( ind[i] );
1133 			nnum.push_back( num[i++] );
1134 		}
1135 		while( j < rsize) 	// *this was depleted first
1136 		{
1137 			nind.push_back( rhs.ind[j] );
1138 			nnum.push_back( NT() - (rhs.num[j++]) );
1139 		}
1140 		ind.swap(nind);		// ind will contain the elements of nind with capacity shrunk-to-fit size
1141 		num.swap(nnum);
1142 	}
1143 	else
1144 	{
1145 		ind.clear();
1146 		num.clear();
1147 	}
1148 	return *this;
1149 };
1150 
1151 
1152 template <class IT, class NT>
1153 template <typename _BinaryOperation>
SparseCommon(std::vector<std::vector<std::pair<IT,NT>>> & data,_BinaryOperation BinOp)1154 void FullyDistSpVec<IT,NT>::SparseCommon(std::vector< std::vector < std::pair<IT,NT> > > & data, _BinaryOperation BinOp)
1155 {
1156 	int nprocs = commGrid->GetSize();
1157 	int * sendcnt = new int[nprocs];
1158 	int * recvcnt = new int[nprocs];
1159 	for(int i=0; i<nprocs; ++i)
1160 		sendcnt[i] = data[i].size();	// sizes are all the same
1161 
1162 	MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, commGrid->GetWorld()); // share the counts
1163 	int * sdispls = new int[nprocs]();
1164 	int * rdispls = new int[nprocs]();
1165 	std::partial_sum(sendcnt, sendcnt+nprocs-1, sdispls+1);
1166 	std::partial_sum(recvcnt, recvcnt+nprocs-1, rdispls+1);
1167 	IT totrecv = rdispls[nprocs-1]+recvcnt[nprocs-1];
1168 	IT totsend = sdispls[nprocs-1]+sendcnt[nprocs-1];
1169 
1170 
1171   	std::pair<IT,NT> * senddata = new std::pair<IT,NT>[totsend];	// re-used for both rows and columns
1172 	for(int i=0; i<nprocs; ++i)
1173 	{
1174     std::copy(data[i].begin(), data[i].end(), senddata+sdispls[i]);
1175 		std::vector< std::pair<IT,NT> >().swap(data[i]);	// clear memory
1176 	}
1177 	MPI_Datatype MPI_pair;
1178 	MPI_Type_contiguous(sizeof(std::tuple<IT,NT>), MPI_CHAR, &MPI_pair);
1179 	MPI_Type_commit(&MPI_pair);
1180 
1181 	std::pair<IT,NT> * recvdata = new std::pair<IT,NT>[totrecv];
1182 	MPI_Alltoallv(senddata, sendcnt, sdispls, MPI_pair, recvdata, recvcnt, rdispls, MPI_pair, commGrid->GetWorld());
1183 
1184 	DeleteAll(senddata, sendcnt, recvcnt, sdispls, rdispls);
1185 	MPI_Type_free(&MPI_pair);
1186 
1187 	if(!is_sorted(recvdata, recvdata+totrecv))
1188 		std::sort(recvdata, recvdata+totrecv);
1189 
1190 	ind.push_back(recvdata[0].first);
1191 	num.push_back(recvdata[0].second);
1192 	for(IT i=1; i< totrecv; ++i)
1193        	{
1194 		if(ind.back() == recvdata[i].first)
1195 	   	{
1196 		      	num.back() = BinOp(num.back(), recvdata[i].second);
1197 	      	}
1198 	      	else
1199 	      	{
1200 			ind.push_back(recvdata[i].first);
1201 			num.push_back(recvdata[i].second);
1202 	      	}
1203 	}
1204 	delete [] recvdata;
1205 }
1206 
1207 template <class IT, class NT>
1208 template <typename _BinaryOperation>
ParallelRead(const std::string & filename,bool onebased,_BinaryOperation BinOp)1209 void FullyDistSpVec<IT,NT>::ParallelRead (const std::string & filename, bool onebased, _BinaryOperation BinOp)
1210 {
1211     int64_t gnnz;	// global nonzeros (glen is already declared as part of this class's private data)
1212     int64_t linesread = 0;
1213 
1214     FILE *f;
1215     int myrank = commGrid->GetRank();
1216     int nprocs = commGrid->GetSize();
1217     if(myrank == 0)
1218     {
1219 	if((f = fopen(filename.c_str(),"r"))==NULL)
1220 	{
1221 		std::cout << "File failed to open\n";
1222 		MPI_Abort(commGrid->commWorld, NOFILE);
1223 	}
1224 	else
1225 	{
1226 		fscanf(f,"%lld %lld\n", &glen, &gnnz);
1227 	}
1228         std::cout << "Total number of nonzeros expected across all processors is " << gnnz << std::endl;
1229 
1230     }
1231     MPI_Bcast(&glen, 1, MPIType<int64_t>(), 0, commGrid->commWorld);
1232     MPI_Bcast(&gnnz, 1, MPIType<int64_t>(), 0, commGrid->commWorld);
1233 
1234 
1235     struct stat st;     // get file size
1236     if (stat(filename.c_str(), &st) == -1)
1237     {
1238         MPI_Abort(commGrid->commWorld, NOFILE);
1239     }
1240     int64_t file_size = st.st_size;
1241     MPI_Offset fpos, end_fpos;
1242     if(myrank == 0)    // the offset needs to be for this rank
1243     {
1244         std::cout << "File is " << file_size << " bytes" << std::endl;
1245         fpos = ftell(f);
1246         fclose(f);
1247     }
1248     else
1249     {
1250         fpos = myrank * file_size / nprocs;
1251     }
1252     if(myrank != (nprocs-1)) end_fpos = (myrank + 1) * file_size / nprocs;
1253     else end_fpos = file_size;
1254     MPI_Barrier(commGrid->commWorld);
1255 
1256     MPI_File mpi_fh;
1257     MPI_File_open (commGrid->commWorld, const_cast<char*>(filename.c_str()), MPI_MODE_RDONLY, MPI_INFO_NULL, &mpi_fh);
1258 
1259     std::vector< std::vector < std::pair<IT,NT> > > data(nprocs);	// data to send
1260 
1261     std::vector<std::string> lines;
1262 
1263     SpParHelper::Print("Fetching first piece\n");
1264 
1265     MPI_Barrier(commGrid->commWorld);
1266     bool finished = SpParHelper::FetchBatch(mpi_fh, fpos, end_fpos, true, lines, myrank);
1267     int64_t entriesread = lines.size();
1268     SpParHelper::Print("Fetched first piece\n");
1269 
1270     MPI_Barrier(commGrid->commWorld);
1271 
1272     IT ii;
1273     NT vv;
1274     for (auto itr=lines.begin(); itr != lines.end(); ++itr)
1275     {
1276 	std::stringstream ss(*itr);
1277 	ss >> ii >> vv;
1278 	if(onebased)	ii--;
1279 	IT locind;
1280 	int owner = Owner(ii, locind);       // recipient (owner) processor
1281         data[owner].push_back(std::make_pair(locind,vv));
1282     }
1283 
1284     while(!finished)
1285     {
1286         finished = SpParHelper::FetchBatch(mpi_fh, fpos, end_fpos, false, lines, myrank);
1287         entriesread += lines.size();
1288         for (auto itr=lines.begin(); itr != lines.end(); ++itr)
1289     	{
1290 		std::stringstream ss(*itr);
1291 		ss >> ii >> vv;
1292 		if(onebased)	ii--;
1293 		IT locind;
1294 		int owner = Owner(ii, locind);       // recipient (owner) processor
1295         	data[owner].push_back(std::make_pair(locind,vv));
1296     	}
1297     }
1298     int64_t allentriesread;
1299     MPI_Reduce(&entriesread, &allentriesread, 1, MPIType<int64_t>(), MPI_SUM, 0, commGrid->commWorld);
1300 #ifdef COMBBLAS_DEBUG
1301     if(myrank == 0)
1302         std::cout << "Reading finished. Total number of entries read across all processors is " << allentriesread << std::endl;
1303 #endif
1304 
1305     SparseCommon(data, BinOp);
1306 }
1307 
1308 template <class IT, class NT>
1309 template <class HANDLER>
ParallelWrite(const std::string & filename,bool onebased,HANDLER handler,bool includeindices,bool includeheader)1310 void FullyDistSpVec<IT,NT>::ParallelWrite(const std::string & filename, bool onebased, HANDLER handler, bool includeindices, bool includeheader)
1311 {
1312        	int myrank = commGrid->GetRank();
1313     	int nprocs = commGrid->GetSize();
1314 	IT totalLength = TotalLength();
1315 	IT totalNNZ = getnnz();
1316 
1317 	std::stringstream ss;
1318 	if(includeheader && myrank == 0)
1319 	{
1320 		ss << totalLength << '\t' << totalNNZ << '\n';	// rank-0 has the header
1321 	}
1322 	IT entries =  getlocnnz();
1323 	IT sizeuntil = 0;
1324 	MPI_Exscan( &entries, &sizeuntil, 1, MPIType<IT>(), MPI_SUM, commGrid->GetWorld() );
1325 	if(myrank == 0) sizeuntil = 0;	// because MPI_Exscan says the recvbuf in process 0 is undefined
1326 
1327 	if(includeindices)
1328 	{
1329 		if(onebased)	sizeuntil += 1;	// increment by 1
1330 
1331 		for(IT i=0; i< entries; ++i)
1332 		{
1333 			ss << ind[i]+sizeuntil << '\t';
1334 			handler.save(ss, num[i], ind[i]+sizeuntil);
1335 			ss << '\n';
1336 		}
1337 	}
1338 	else	// the base doesn't matter if we don't include indices
1339 	{
1340 		IT dummy = 0;	// dummy because we don't want indices to be printed
1341 		for(IT i=0; i< entries; ++i)
1342 		{
1343 			handler.save(ss, num[i], dummy);
1344 			ss << '\n';
1345 		}
1346 	}
1347 
1348 	std::string text = ss.str();
1349 
1350 	int64_t * bytes = new int64_t[nprocs];
1351     	bytes[myrank] = text.size();
1352     	MPI_Allgather(MPI_IN_PLACE, 1, MPIType<int64_t>(), bytes, 1, MPIType<int64_t>(), commGrid->GetWorld());
1353 	int64_t bytesuntil = std::accumulate(bytes, bytes+myrank, static_cast<int64_t>(0));
1354 	int64_t bytestotal = std::accumulate(bytes, bytes+nprocs, static_cast<int64_t>(0));
1355 
1356     	if(myrank == 0)	// only leader rights the original file with no content
1357     	{
1358 		std::ofstream ofs(filename.c_str(), std::ios::binary | std::ios::out);
1359 #ifdef COMBBLAS_DEBUG
1360     std::cout << "Creating file with " << bytestotal << " bytes" << std::endl;
1361 #endif
1362     		ofs.seekp(bytestotal - 1);
1363     		ofs.write("", 1);	// this will likely create a sparse file so the actual disks won't spin yet
1364 		ofs.close();
1365    	}
1366      	MPI_Barrier(commGrid->GetWorld());
1367 
1368 	struct stat st;     // get file size
1369     	if (stat(filename.c_str(), &st) == -1)
1370     	{
1371        		MPI_Abort(commGrid->GetWorld(), NOFILE);
1372     	}
1373 	if(myrank == nprocs-1)	// let some other processor do the testing
1374 	{
1375 #ifdef COMBBLAS_DEBUG
1376     std::cout << "File is actually " << st.st_size << " bytes seen from process " << myrank << std::endl;
1377 #endif
1378 	}
1379 
1380     	FILE *ffinal;
1381 	if ((ffinal = fopen(filename.c_str(), "rb+")) == NULL)	// then everyone fills it
1382         {
1383 		printf("COMBBLAS: Vector output file %s failed to open at process %d\n", filename.c_str(), myrank);
1384             	MPI_Abort(commGrid->GetWorld(), NOFILE);
1385        	}
1386 	fseek (ffinal , bytesuntil , SEEK_SET );
1387 	fwrite(text.c_str(),1, bytes[myrank] ,ffinal);
1388 	fflush(ffinal);
1389 	fclose(ffinal);
1390 	delete [] bytes;
1391 }
1392 
1393 //! Called on an existing object
1394 //! ABAB: Obsolete, will be deleted once moved to Github (and becomes independent of KDT)
1395 template <class IT, class NT>
1396 template <class HANDLER>
ReadDistribute(std::ifstream & infile,int master,HANDLER handler)1397 std::ifstream& FullyDistSpVec<IT,NT>::ReadDistribute (std::ifstream& infile, int master, HANDLER handler)
1398 {
1399 	IT total_nnz;
1400 	MPI_Comm World = commGrid->GetWorld();
1401 	int neighs = commGrid->GetSize();  // number of neighbors (including oneself)
1402 	int buffperneigh = MEMORYINBYTES / (neighs * (sizeof(IT) + sizeof(NT)));
1403 
1404 	int * displs = new int[neighs];
1405 	for (int i=0; i<neighs; ++i)
1406 		displs[i] = i*buffperneigh;
1407 
1408 	int * curptrs = NULL;
1409 	int recvcount = 0;
1410 	IT * inds = NULL;
1411 	NT * vals = NULL;
1412 	int rank = commGrid->GetRank();
1413 	if(rank == master)	// 1 processor only
1414 	{
1415 		inds = new IT [ buffperneigh * neighs ];
1416 		vals = new NT [ buffperneigh * neighs ];
1417 		curptrs = new int[neighs];
1418 		std::fill_n(curptrs, neighs, 0);	// fill with zero
1419 		if (infile.is_open())
1420 		{
1421 			infile.clear();
1422 			infile.seekg(0);
1423 			IT numrows, numcols;
1424 			bool indIsRow = true;
1425 			infile >> numrows >> numcols >> total_nnz;
1426 			if (numcols == 1)
1427 			{ // column vector, read vector indices from the row index
1428 				indIsRow = true;
1429 				glen = numrows;
1430 			}
1431 			else
1432 			{ // row vector, read vector indices from the column index
1433 				indIsRow = false;
1434 				glen = numcols;
1435 			}
1436 			MPI_Bcast(&glen, 1, MPIType<IT>(), master, World);
1437 
1438 			IT tempind;
1439 			IT temprow, tempcol;
1440 			IT cnz = 0;
1441 			while ( (!infile.eof()) && cnz < total_nnz)
1442 			{
1443 				infile >> temprow >> tempcol;
1444 				if (indIsRow)
1445 					tempind = temprow;
1446 				else
1447 					tempind = tempcol;
1448 				tempind--;
1449 				IT locind;
1450 				int rec = Owner(tempind, locind);	// recipient (owner) processor  (ABAB: But if the length is not set yet, this should be wrong)
1451 				inds[ rec * buffperneigh + curptrs[rec] ] = locind;
1452 				vals[ rec * buffperneigh + curptrs[rec] ] = handler.read(infile, tempind);
1453 				++ (curptrs[rec]);
1454 
1455 				if(curptrs[rec] == buffperneigh || (cnz == (total_nnz-1)) )		// one buffer is full, or file is done !
1456 				{
1457 					// first, send the receive counts ...
1458 					MPI_Scatter(curptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, master, World);
1459 
1460 					// generate space for own recv data ... (use arrays because vector<bool> is cripled, if NT=bool)
1461 					IT * tempinds = new IT[recvcount];
1462 					NT * tempvals = new NT[recvcount];
1463 
1464 					// then, send all buffers that to their recipients ...
1465 					MPI_Scatterv(inds, curptrs, displs, MPIType<IT>(), tempinds, recvcount,  MPIType<IT>(), master, World);
1466 					MPI_Scatterv(vals, curptrs, displs, MPIType<NT>(), tempvals, recvcount,  MPIType<NT>(), master, World);
1467 
1468 					// now push what is ours to tuples
1469 					for(IT i=0; i< recvcount; ++i)
1470 					{
1471 						ind.push_back( tempinds[i] );	// already offset'd by the sender
1472 						num.push_back( tempvals[i] );
1473 					}
1474 
1475 					// reset current pointers so that we can reuse {inds,vals} buffers
1476 					std::fill_n(curptrs, neighs, 0);
1477 					DeleteAll(tempinds, tempvals);
1478 				}
1479 				++ cnz;
1480 			}
1481 			assert (cnz == total_nnz);
1482 
1483 			// Signal the end of file to other processors along the diagonal
1484 			std::fill_n(curptrs, neighs, std::numeric_limits<int>::max());
1485 			MPI_Scatter(curptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, master, World);
1486 		}
1487 		else	// input file does not exist !
1488 		{
1489 			glen = 0;
1490 			MPI_Bcast(&glen, 1, MPIType<IT>(), master, World);
1491 		}
1492 		DeleteAll(inds,vals, curptrs);
1493 	}
1494 	else 	 	// all other processors
1495 	{
1496 		MPI_Bcast(&glen, 1, MPIType<IT>(), master, World);
1497 		while(glen > 0)	// otherwise, input file do not exist
1498 		{
1499 			// first receive the receive counts ...
1500 			MPI_Scatter(curptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, master, World);
1501 
1502 			if( recvcount == std::numeric_limits<int>::max())
1503 				break;
1504 
1505 			// create space for incoming data ...
1506 			IT * tempinds = new IT[recvcount];
1507 			NT * tempvals = new NT[recvcount];
1508 
1509 			// receive actual data ... (first 4 arguments are ignored in the receiver side)
1510 			MPI_Scatterv(inds, curptrs, displs, MPIType<IT>(), tempinds, recvcount,  MPIType<IT>(), master, World);
1511 			MPI_Scatterv(vals, curptrs, displs, MPIType<NT>(), tempvals, recvcount,  MPIType<NT>(), master, World);
1512 
1513 			// now push what is ours to tuples
1514 			for(IT i=0; i< recvcount; ++i)
1515 			{
1516 				ind.push_back( tempinds[i] );
1517 				num.push_back( tempvals[i] );
1518 			}
1519 			DeleteAll(tempinds, tempvals);
1520 		}
1521 	}
1522 	delete [] displs;
1523 	MPI_Barrier(World);
1524 	return infile;
1525 }
1526 
1527 template <class IT, class NT>
1528 template <class HANDLER>
SaveGathered(std::ofstream & outfile,int master,HANDLER handler,bool printProcSplits)1529 void FullyDistSpVec<IT,NT>::SaveGathered(std::ofstream& outfile, int master, HANDLER handler, bool printProcSplits)
1530 {
1531 	int rank, nprocs;
1532 	MPI_Comm World = commGrid->GetWorld();
1533 	MPI_Comm_rank(World, &rank);
1534 	MPI_Comm_size(World, &nprocs);
1535 	MPI_File thefile;
1536 
1537 	char _fn[] = "temp_fullydistspvec"; // AL: this is to avoid the problem that C++ string literals are const char* while C string literals are char*, leading to a const warning (technically error, but compilers are tolerant)
1538 	int mpi_err = MPI_File_open(World, _fn, MPI_MODE_CREATE | MPI_MODE_WRONLY, MPI_INFO_NULL, &thefile);
1539 	if(mpi_err != MPI_SUCCESS)
1540 	{
1541 		char mpi_err_str[MPI_MAX_ERROR_STRING];
1542     		int  mpi_err_strlen;
1543 		MPI_Error_string(mpi_err, mpi_err_str, &mpi_err_strlen);
1544 		printf("MPI_File_open failed (%s)\n", mpi_err_str);
1545 		MPI_Abort(World, 1);
1546     	}
1547 
1548 	IT * dist = new IT[nprocs];
1549 	dist[rank] = getlocnnz();
1550 	MPI_Allgather(MPI_IN_PLACE, 1, MPIType<IT>(), dist, 1, MPIType<IT>(), World);
1551 	IT sizeuntil = std::accumulate(dist, dist+rank, static_cast<IT>(0));
1552 	IT totalLength = TotalLength();
1553 	IT totalNNZ = getnnz();
1554 
1555 	struct mystruct
1556 	{
1557 		IT ind;
1558 		NT num;
1559 	};
1560 
1561 	MPI_Datatype datatype;
1562 	MPI_Type_contiguous(sizeof(mystruct), MPI_CHAR, &datatype );
1563 	MPI_Type_commit(&datatype);
1564 	int dsize;
1565 	MPI_Type_size(datatype, &dsize);
1566 
1567 	// The disp displacement argument specifies the position
1568 	// (absolute offset in bytes from the beginning of the file)
1569 	char native[] = "native"; // AL: this is to avoid the problem that C++ string literals are const char* while C string literals are char*, leading to a const warning (technically error, but compilers are tolerant)
1570 	MPI_File_set_view(thefile, static_cast<int>(sizeuntil * dsize), datatype, datatype, native, MPI_INFO_NULL);
1571 
1572 	int count = ind.size();
1573 	mystruct * packed = new mystruct[count];
1574 	for(int i=0; i<count; ++i)
1575 	{
1576 		packed[i].ind = ind[i] + sizeuntil;
1577 		packed[i].num = num[i];
1578 	}
1579 
1580 	mpi_err = MPI_File_write(thefile, packed, count, datatype, NULL);
1581 	if(mpi_err != MPI_SUCCESS)
1582         {
1583                 char mpi_err_str[MPI_MAX_ERROR_STRING];
1584                 int  mpi_err_strlen;
1585                 MPI_Error_string(mpi_err, mpi_err_str, &mpi_err_strlen);
1586                 printf("MPI_File_write failed (%s)\n", mpi_err_str);
1587                 MPI_Abort(World, 1);
1588         }
1589 
1590 	MPI_Barrier(World);
1591 	MPI_File_close(&thefile);
1592 	delete [] packed;
1593 
1594 	// Now let processor-0 read the file and print
1595 	if(rank == 0)
1596 	{
1597 		FILE * f = fopen("temp_fullydistspvec", "r");
1598         	if(!f)
1599         	{
1600 			std::cerr << "Problem reading binary input file\n";
1601 			return;
1602 	        }
1603 		IT maxd = *std::max_element(dist, dist+nprocs);
1604 		mystruct * data = new mystruct[maxd];
1605 
1606 		std::streamsize oldPrecision = outfile.precision();
1607 		outfile.precision(21);
1608 		outfile << totalLength << "\t1\t" << totalNNZ << std::endl;
1609 		for(int i=0; i<nprocs; ++i)
1610 		{
1611 			// read n_per_proc integers and print them
1612 			if (fread(data, dsize, dist[i], f) < static_cast<size_t>(dist[i]))
1613 			{
1614 			    std::cout << "fread 660 failed! attempting to continue..." << std::endl;
1615 			}
1616 
1617 			if (printProcSplits)
1618 				outfile << "Elements stored on proc " << i << ":" << std::endl;
1619 
1620 			for (int j = 0; j < dist[i]; j++)
1621 			{
1622 				outfile << data[j].ind+1 << "\t1\t";
1623 				handler.save(outfile, data[j].num, data[j].ind);
1624 				outfile << std::endl;
1625 			}
1626 		}
1627 		outfile.precision(oldPrecision);
1628 		fclose(f);
1629 		remove("temp_fullydistspvec");
1630 		delete [] data;
1631 		delete [] dist;
1632 	}
1633 	MPI_Barrier(World);
1634 }
1635 
1636 
1637 template <class IT, class NT>
1638 template <typename _Predicate>
Count(_Predicate pred) const1639 IT FullyDistSpVec<IT,NT>::Count(_Predicate pred) const
1640 {
1641 	IT local = count_if( num.begin(), num.end(), pred );
1642 	IT whole = 0;
1643 	MPI_Allreduce( &local, &whole, 1, MPIType<IT>(), MPI_SUM, commGrid->GetWorld());
1644 	return whole;
1645 }
1646 
1647 
1648 template <class IT, class NT>
1649 template <typename _BinaryOperation>
Reduce(_BinaryOperation __binary_op,NT init) const1650 NT FullyDistSpVec<IT,NT>::Reduce(_BinaryOperation __binary_op, NT init) const
1651 {
1652 	// std::accumulate returns init for empty sequences
1653 	// the semantics are init + num[0] + ... + num[n]
1654 	NT localsum = std::accumulate( num.begin(), num.end(), init, __binary_op);
1655 
1656 	NT totalsum = init;
1657 	MPI_Allreduce( &localsum, &totalsum, 1, MPIType<NT>(), MPIOp<_BinaryOperation, NT>::op(), commGrid->GetWorld());
1658 	return totalsum;
1659 }
1660 
1661 template <class IT, class NT>
1662 template <typename OUT, typename _BinaryOperation, typename _UnaryOperation>
1663 OUT FullyDistSpVec<IT,NT>::Reduce(_BinaryOperation __binary_op, OUT default_val, _UnaryOperation __unary_op) const
1664 {
1665 	// std::accumulate returns identity for empty sequences
1666 	OUT localsum = default_val;
1667 
1668 	if (num.size() > 0)
1669 	{
1670 		typename std::vector< NT >::const_iterator iter = num.begin();
1671 		//localsum = __unary_op(*iter);
1672 		//iter++;
1673 		while (iter < num.end())
1674 		{
1675 			localsum = __binary_op(localsum, __unary_op(*iter));
1676 			iter++;
1677 		}
1678 	}
1679 
1680 	OUT totalsum = default_val;
1681 	MPI_Allreduce( &localsum, &totalsum, 1, MPIType<OUT>(), MPIOp<_BinaryOperation, OUT>::op(), commGrid->GetWorld());
1682 	return totalsum;
1683 }
1684 
1685 template <class IT, class NT>
PrintInfo(std::string vectorname) const1686 void FullyDistSpVec<IT,NT>::PrintInfo(std::string vectorname) const
1687 {
1688 	IT nznz = getnnz();
1689 	if (commGrid->GetRank() == 0)
1690 		std::cout << "As a whole, " << vectorname << " has: " << nznz << " nonzeros and length " << glen << std::endl;
1691 }
1692 
1693 template <class IT, class NT>
DebugPrint()1694 void FullyDistSpVec<IT,NT>::DebugPrint()
1695 {
1696 	int rank, nprocs;
1697 	MPI_Comm World = commGrid->GetWorld();
1698 	MPI_Comm_rank(World, &rank);
1699 	MPI_Comm_size(World, &nprocs);
1700 	MPI_File thefile;
1701 
1702 	char tfilename[32] = "temp_fullydistspvec";
1703 	MPI_File_open(World, tfilename, MPI_MODE_CREATE | MPI_MODE_WRONLY, MPI_INFO_NULL, &thefile);
1704 
1705 	IT * dist = new IT[nprocs];
1706 	dist[rank] = getlocnnz();
1707 	MPI_Allgather(MPI_IN_PLACE, 1, MPIType<IT>(), dist, 1, MPIType<IT>(), World);
1708 	IT sizeuntil = std::accumulate(dist, dist+rank, static_cast<IT>(0));
1709 
1710 	struct mystruct
1711 	{
1712 		IT ind;
1713 		NT num;
1714 	};
1715 
1716 	MPI_Datatype datatype;
1717 	MPI_Type_contiguous(sizeof(mystruct), MPI_CHAR, &datatype );
1718 	MPI_Type_commit(&datatype);
1719 	int dsize;
1720 	MPI_Type_size(datatype, &dsize);
1721 
1722 	// The disp displacement argument specifies the position
1723 	// (absolute offset in bytes from the beginning of the file)
1724 	char openmode[32] = "native";
1725     	MPI_File_set_view(thefile, static_cast<int>(sizeuntil * dsize), datatype, datatype, openmode, MPI_INFO_NULL);
1726 
1727 	int count = ind.size();
1728 	mystruct * packed = new mystruct[count];
1729 	for(int i=0; i<count; ++i)
1730 	{
1731 		packed[i].ind = ind[i];
1732 		packed[i].num = num[i];
1733 	}
1734 	MPI_File_write(thefile, packed, count, datatype, MPI_STATUS_IGNORE);
1735 	MPI_Barrier(World);
1736 	MPI_File_close(&thefile);
1737 	delete [] packed;
1738 
1739 	// Now let processor-0 read the file and print
1740 	if(rank == 0)
1741 	{
1742 		FILE * f = fopen("temp_fullydistspvec", "r");
1743                 if(!f)
1744                 {
1745                         std::cerr << "Problem reading binary input file\n";
1746                         return;
1747                 }
1748 		IT maxd = *std::max_element(dist, dist+nprocs);
1749 		mystruct * data = new mystruct[maxd];
1750 
1751 		for(int i=0; i<nprocs; ++i)
1752 		{
1753 			// read n_per_proc integers and print them
1754 			if (fread(data, dsize, dist[i],f) < static_cast<size_t>(dist[i]))
1755 			{
1756 			    std::cout << "fread 802 failed! attempting to continue..." << std::endl;
1757 			}
1758 
1759 			std::cout << "Elements stored on proc " << i << ": {";
1760 			for (int j = 0; j < dist[i]; j++)
1761 			{
1762 				std::cout << "(" << data[j].ind << "," << data[j].num << "), ";
1763 			}
1764 			std::cout << "}" << std::endl;
1765 		}
1766 		fclose(f);
1767 		remove("temp_fullydistspvec");
1768 		delete [] data;
1769 		delete [] dist;
1770 	}
1771 	MPI_Barrier(World);
1772 }
1773 
1774 template <class IT, class NT>
Reset()1775 void FullyDistSpVec<IT,NT>::Reset()
1776 {
1777 	ind.resize(0);
1778 	num.resize(0);
1779 }
1780 
1781 // Assigns given locations their value, needs to be sorted
1782 template <class IT, class NT>
BulkSet(IT inds[],int count)1783 void FullyDistSpVec<IT,NT>::BulkSet(IT inds[], int count) {
1784 	ind.resize(count);
1785 	num.resize(count);
1786 	std::copy(inds, inds+count, ind.data());
1787 	std::copy(inds, inds+count, num.data());
1788 }
1789 
1790 
1791 
1792 /*
1793  ** Create a new sparse vector vout by swaping the indices and values of a sparse vector vin.
1794  ** the length of vout is globallen, which must be greater than the maximum entry of vin.
1795  ** nnz(vin) = nnz(vout)
1796  ** for every nonzero entry vin[k]: vout[vin[k]] = k
1797  */
1798 /*
1799 template <class IT, class NT>
1800 FullyDistSpVec<IT,NT> FullyDistSpVec<IT,NT>::Invert (IT globallen)
1801 {
1802     FullyDistSpVec<IT,NT> Inverted(commGrid, globallen);
1803     IT max_entry = Reduce(maximum<IT>(), (IT) 0 ) ;
1804     if(max_entry >= globallen)
1805     {
1806         cout << "Sparse vector has entries (" << max_entry  << ") larger than requested global vector length " << globallen << endl;
1807         return Inverted;
1808     }
1809 
1810 
1811 	int nprocs = commGrid->GetSize();
1812 	vector< vector< NT > > datsent(nprocs);
1813 	vector< vector< IT > > indsent(nprocs);
1814 
1815 	IT ploclen = getlocnnz();
1816 	for(IT k=0; k < ploclen; ++k)
1817 	{
1818 		IT locind;
1819 		int owner = Inverted.Owner(num[k], locind);     // numerical values in rhs are 0-based indices
1820         IT gind = ind[k] + LengthUntil();
1821         datsent[owner].push_back(gind);
1822 		indsent[owner].push_back(locind);   // so that we don't need no correction at the recipient
1823 	}
1824 	int * sendcnt = new int[nprocs];
1825 	int * sdispls = new int[nprocs];
1826 	for(int i=0; i<nprocs; ++i)
1827 		sendcnt[i] = (int) datsent[i].size();
1828 
1829 	int * rdispls = new int[nprocs];
1830 	int * recvcnt = new int[nprocs];
1831     MPI_Comm World = commGrid->GetWorld();
1832 	MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, World);  // share the request counts
1833 	sdispls[0] = 0;
1834 	rdispls[0] = 0;
1835 	for(int i=0; i<nprocs-1; ++i)
1836 	{
1837 		sdispls[i+1] = sdispls[i] + sendcnt[i];
1838 		rdispls[i+1] = rdispls[i] + recvcnt[i];
1839 	}
1840     NT * datbuf = new NT[ploclen];
1841 	for(int i=0; i<nprocs; ++i)
1842 	{
1843 		copy(datsent[i].begin(), datsent[i].end(), datbuf+sdispls[i]);
1844 		vector<NT>().swap(datsent[i]);
1845 	}
1846     IT * indbuf = new IT[ploclen];
1847     for(int i=0; i<nprocs; ++i)
1848 	{
1849 		copy(indsent[i].begin(), indsent[i].end(), indbuf+sdispls[i]);
1850 		vector<IT>().swap(indsent[i]);
1851 	}
1852     IT totrecv = accumulate(recvcnt,recvcnt+nprocs, static_cast<IT>(0));
1853 	NT * recvdatbuf = new NT[totrecv];
1854 	MPI_Alltoallv(datbuf, sendcnt, sdispls, MPIType<NT>(), recvdatbuf, recvcnt, rdispls, MPIType<NT>(), World);
1855     delete [] datbuf;
1856 
1857     IT * recvindbuf = new IT[totrecv];
1858     MPI_Alltoallv(indbuf, sendcnt, sdispls, MPIType<IT>(), recvindbuf, recvcnt, rdispls, MPIType<IT>(), World);
1859     delete [] indbuf;
1860 
1861 
1862     vector< pair<IT,NT> > tosort;   // in fact, tomerge would be a better name but it is unlikely to be faster
1863 
1864 	for(int i=0; i<nprocs; ++i)
1865 	{
1866 		for(int j = rdispls[i]; j < rdispls[i] + recvcnt[i]; ++j)	// fetch the numerical values
1867 		{
1868             tosort.push_back(make_pair(recvindbuf[j], recvdatbuf[j]));
1869 		}
1870 	}
1871 	DeleteAll(recvindbuf, recvdatbuf);
1872     DeleteAll(sdispls, rdispls, sendcnt, recvcnt);
1873     std::sort(tosort.begin(), tosort.end());
1874 
1875     IT lastIndex=-1;
1876     for(typename vector<pair<IT,NT>>::iterator itr = tosort.begin(); itr != tosort.end(); ++itr)
1877     {
1878         if(lastIndex!=itr->first) // avoid duplicate indices
1879         {
1880             Inverted.ind.push_back(itr->first);
1881             Inverted.num.push_back(itr->second);
1882         }
1883         lastIndex = itr->first;
1884 
1885 	}
1886 	return Inverted;
1887 
1888 }
1889 */
1890 
1891 
1892 
1893 /*
1894  ** Create a new sparse vector vout by swaping the indices and values of a sparse vector vin.
1895  ** the length of vout is globallen, which must be greater than the maximum entry of vin.
1896  ** nnz(vin) = nnz(vout)
1897  ** for every nonzero entry vin[k]: vout[vin[k]] = k
1898  */
1899 
1900 template <class IT, class NT>
Invert(IT globallen)1901 FullyDistSpVec<IT,NT> FullyDistSpVec<IT,NT>::Invert (IT globallen)
1902 {
1903     FullyDistSpVec<IT,NT> Inverted(commGrid, globallen);
1904     IT max_entry = Reduce(maximum<IT>(), (IT) 0 ) ;
1905     if(max_entry >= globallen)
1906     {
1907         std::cout << "Sparse vector has entries (" << max_entry  << ") larger than requested global vector length " << globallen << std::endl;
1908         return Inverted;
1909     }
1910 
1911     MPI_Comm World = commGrid->GetWorld();
1912 	int nprocs = commGrid->GetSize();
1913     int * rdispls = new int[nprocs+1];
1914 	int * recvcnt = new int[nprocs];
1915     int * sendcnt = new int[nprocs](); // initialize to 0
1916 	int * sdispls = new int[nprocs+1];
1917 
1918 
1919 	IT ploclen = getlocnnz();
1920 #ifdef _OPENMP
1921 #pragma omp parallel for
1922 #endif
1923 	for(IT k=0; k < ploclen; ++k)
1924 	{
1925 		IT locind;
1926 		int owner = Inverted.Owner(num[k], locind);
1927 #ifdef _OPENMP
1928         __sync_fetch_and_add(&sendcnt[owner], 1);
1929 #else
1930         sendcnt[owner]++;
1931 #endif
1932 	}
1933 
1934 
1935    	MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, World);  // share the request counts
1936 
1937     sdispls[0] = 0;
1938 	rdispls[0] = 0;
1939 	for(int i=0; i<nprocs; ++i)
1940 	{
1941 		sdispls[i+1] = sdispls[i] + sendcnt[i];
1942 		rdispls[i+1] = rdispls[i] + recvcnt[i];
1943 	}
1944 
1945 
1946 
1947     NT * datbuf = new NT[ploclen];
1948     IT * indbuf = new IT[ploclen];
1949     int *count = new int[nprocs](); //current position
1950 #ifdef _OPENMP
1951 #pragma omp parallel for
1952 #endif
1953     for(IT i=0; i < ploclen; ++i)
1954 	{
1955 		IT locind;
1956         int owner = Inverted.Owner(num[i], locind);
1957         int id;
1958 #ifdef _OPENMP
1959         id = sdispls[owner] + __sync_fetch_and_add(&count[owner], 1);
1960 #else
1961         id = sdispls[owner] + count[owner];
1962         count[owner]++;
1963 #endif
1964         datbuf[id] = ind[i] + LengthUntil();
1965         indbuf[id] = locind;
1966 	}
1967     delete [] count;
1968 
1969 
1970     IT totrecv = rdispls[nprocs];
1971 	NT * recvdatbuf = new NT[totrecv];
1972 	MPI_Alltoallv(datbuf, sendcnt, sdispls, MPIType<NT>(), recvdatbuf, recvcnt, rdispls, MPIType<NT>(), World);
1973     delete [] datbuf;
1974 
1975     IT * recvindbuf = new IT[totrecv];
1976     MPI_Alltoallv(indbuf, sendcnt, sdispls, MPIType<IT>(), recvindbuf, recvcnt, rdispls, MPIType<IT>(), World);
1977     delete [] indbuf;
1978 
1979 
1980     std::vector< std::pair<IT,NT> > tosort;
1981     tosort.resize(totrecv);
1982 #ifdef _OPENMP
1983 #pragma omp parallel for
1984 #endif
1985 	for(int i=0; i<totrecv; ++i)
1986 	{
1987         tosort[i] = std::make_pair(recvindbuf[i], recvdatbuf[i]);
1988 	}
1989 	DeleteAll(recvindbuf, recvdatbuf);
1990     DeleteAll(sdispls, rdispls, sendcnt, recvcnt);
1991 
1992 #if defined(GNU_PARALLEL) && defined(_OPENMP)
1993     __gnu_parallel::sort(tosort.begin(), tosort.end());
1994 #else
1995     std::sort(tosort.begin(), tosort.end());
1996 #endif
1997 
1998     Inverted.ind.reserve(totrecv);
1999     Inverted.num.reserve(totrecv);
2000     IT lastIndex=-1;
2001 
2002     // not threaded because Inverted.ind is kept sorted
2003     for(typename std::vector<std::pair<IT,NT>>::iterator itr = tosort.begin(); itr != tosort.end(); ++itr)
2004     {
2005         if(lastIndex!=itr->first) // avoid duplicate indices
2006         {
2007             Inverted.ind.push_back(itr->first);
2008             Inverted.num.push_back(itr->second);
2009         }
2010         lastIndex = itr->first;
2011 
2012 	}
2013 
2014 	return Inverted;
2015 
2016 }
2017 
2018 
2019 
2020 /*
2021  // generalized invert taking binary operations to define index and values of the inverted vector
2022  // _BinaryOperationDuplicate: function to reduce duplicate entries
2023  */
2024 
2025 template <class IT, class NT>
2026 template <typename _BinaryOperationIdx, typename _BinaryOperationVal, typename _BinaryOperationDuplicate>
Invert(IT globallen,_BinaryOperationIdx __binopIdx,_BinaryOperationVal __binopVal,_BinaryOperationDuplicate __binopDuplicate)2027 FullyDistSpVec<IT,NT> FullyDistSpVec<IT,NT>::Invert (IT globallen, _BinaryOperationIdx __binopIdx, _BinaryOperationVal __binopVal, _BinaryOperationDuplicate __binopDuplicate)
2028 
2029 {
2030 
2031     FullyDistSpVec<IT,NT> Inverted(commGrid, globallen);
2032 
2033 
2034     // identify the max index in the composed vector
2035     IT localmax = (IT) 0;
2036     for(size_t k=0; k < num.size(); ++k)
2037     {
2038         localmax = std::max(localmax, __binopIdx(num[k], ind[k] + LengthUntil()));
2039     }
2040     IT globalmax = (IT) 0;
2041     MPI_Allreduce( &localmax, &globalmax, 1, MPIType<IT>(), MPI_MAX, commGrid->GetWorld());
2042 
2043     if(globalmax >= globallen)
2044     {
2045         std::cout << "Sparse vector has entries (" << globalmax  << ") larger than requested global vector length " << globallen << std::endl;
2046         return Inverted;
2047     }
2048 
2049 
2050 
2051     MPI_Comm World = commGrid->GetWorld();
2052     int nprocs = commGrid->GetSize();
2053     int * rdispls = new int[nprocs+1];
2054     int * recvcnt = new int[nprocs];
2055     int * sendcnt = new int[nprocs](); // initialize to 0
2056     int * sdispls = new int[nprocs+1];
2057 
2058 
2059     IT ploclen = getlocnnz();
2060 #ifdef _OPENMP
2061 #pragma omp parallel for
2062 #endif
2063     for(IT k=0; k < ploclen; ++k)
2064     {
2065         IT locind;
2066         IT globind = __binopIdx(num[k], ind[k] + LengthUntil()); // get global index of the inverted vector
2067         int owner = Inverted.Owner(globind, locind);
2068 
2069 #ifdef _OPENMP
2070         __sync_fetch_and_add(&sendcnt[owner], 1);
2071 #else
2072         sendcnt[owner]++;
2073 #endif
2074     }
2075 
2076 
2077     MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, World);
2078 
2079     sdispls[0] = 0;
2080     rdispls[0] = 0;
2081     for(int i=0; i<nprocs; ++i)
2082     {
2083         sdispls[i+1] = sdispls[i] + sendcnt[i];
2084         rdispls[i+1] = rdispls[i] + recvcnt[i];
2085     }
2086 
2087 
2088     NT * datbuf = new NT[ploclen];
2089     IT * indbuf = new IT[ploclen];
2090     int *count = new int[nprocs](); //current position
2091 #ifdef _OPENMP
2092 #pragma omp parallel for
2093 #endif
2094     for(IT i=0; i < ploclen; ++i)
2095     {
2096         IT locind;
2097         IT globind = __binopIdx(num[i], ind[i] + LengthUntil()); // get global index of the inverted vector
2098         int owner = Inverted.Owner(globind, locind);
2099         int id;
2100 #ifdef _OPENMP
2101         id = sdispls[owner] + __sync_fetch_and_add(&count[owner], 1);
2102 #else
2103         id = sdispls[owner] + count[owner];
2104         count[owner]++;
2105 #endif
2106         datbuf[id] = __binopVal(num[i], ind[i] + LengthUntil());
2107         indbuf[id] = locind;
2108     }
2109     delete [] count;
2110 
2111 
2112     IT totrecv = rdispls[nprocs];
2113     NT * recvdatbuf = new NT[totrecv];
2114     MPI_Alltoallv(datbuf, sendcnt, sdispls, MPIType<NT>(), recvdatbuf, recvcnt, rdispls, MPIType<NT>(), World);
2115     delete [] datbuf;
2116 
2117     IT * recvindbuf = new IT[totrecv];
2118     MPI_Alltoallv(indbuf, sendcnt, sdispls, MPIType<IT>(), recvindbuf, recvcnt, rdispls, MPIType<IT>(), World);
2119     delete [] indbuf;
2120 
2121 
2122     std::vector< std::pair<IT,NT> > tosort;
2123     tosort.resize(totrecv);
2124 #ifdef _OPENMP
2125 #pragma omp parallel for
2126 #endif
2127     for(int i=0; i<totrecv; ++i)
2128     {
2129         tosort[i] = std::make_pair(recvindbuf[i], recvdatbuf[i]);
2130     }
2131     DeleteAll(recvindbuf, recvdatbuf);
2132     DeleteAll(sdispls, rdispls, sendcnt, recvcnt);
2133 
2134 #if defined(GNU_PARALLEL) && defined(_OPENMP)
2135     __gnu_parallel::sort(tosort.begin(), tosort.end());
2136 #else
2137     std::sort(tosort.begin(), tosort.end());
2138 #endif
2139 
2140     Inverted.ind.reserve(totrecv);
2141     Inverted.num.reserve(totrecv);
2142 
2143 
2144     // not threaded because Inverted.ind is kept sorted
2145     for(typename std::vector<std::pair<IT,NT>>::iterator itr = tosort.begin(); itr != tosort.end(); )
2146     {
2147         IT ind = itr->first;
2148         NT val = itr->second;
2149         ++itr;
2150 
2151         while(itr != tosort.end() && itr->first == ind)
2152         {
2153             val = __binopDuplicate(val, itr->second);
2154             ++itr;
2155         }
2156 
2157 
2158         Inverted.ind.push_back(ind);
2159         Inverted.num.push_back(val);
2160 
2161     }
2162 
2163     return Inverted;
2164 
2165 }
2166 
2167 
2168 // Invert using RMA
2169 
2170 template <class IT, class NT>
2171 template <typename _BinaryOperationIdx, typename _BinaryOperationVal>
InvertRMA(IT globallen,_BinaryOperationIdx __binopIdx,_BinaryOperationVal __binopVal)2172 FullyDistSpVec<IT,NT> FullyDistSpVec<IT,NT>::InvertRMA (IT globallen, _BinaryOperationIdx __binopIdx, _BinaryOperationVal __binopVal)
2173 
2174 {
2175 
2176     FullyDistSpVec<IT,NT> Inverted(commGrid, globallen);
2177     int myrank;
2178     MPI_Comm_rank(commGrid->GetWorld(), &myrank);
2179 
2180     // identify the max index in the composed vector
2181     IT localmax = (IT) 0;
2182     for(size_t k=0; k < num.size(); ++k)
2183     {
2184         localmax = std::max(localmax, __binopIdx(num[k], ind[k] + LengthUntil()));
2185     }
2186     IT globalmax = (IT) 0;
2187     MPI_Allreduce( &localmax, &globalmax, 1, MPIType<IT>(), MPI_MAX, commGrid->GetWorld());
2188 
2189     if(globalmax >= globallen)
2190     {
2191         std::cout << "Sparse vector has entries (" << globalmax  << ") larger than requested global vector length " << globallen << std::endl;
2192         return Inverted;
2193     }
2194 
2195 
2196 
2197     MPI_Comm World = commGrid->GetWorld();
2198     int nprocs = commGrid->GetSize();
2199     int * rdispls = new int[nprocs+1];
2200     int * recvcnt = new int[nprocs](); // initialize to 0
2201     int * sendcnt = new int[nprocs](); // initialize to 0
2202     int * sdispls = new int[nprocs+1];
2203 
2204 
2205     IT ploclen = getlocnnz();
2206 #ifdef _OPENMP
2207 #pragma omp parallel for
2208 #endif
2209     for(IT k=0; k < ploclen; ++k)
2210     {
2211         IT locind;
2212         IT globind = __binopIdx(num[k], ind[k] + LengthUntil()); // get global index of the inverted vector
2213         int owner = Inverted.Owner(globind, locind);
2214 
2215 #ifdef _OPENMP
2216         __sync_fetch_and_add(&sendcnt[owner], 1);
2217 #else
2218         sendcnt[owner]++;
2219 #endif
2220     }
2221 
2222 
2223     MPI_Win win2;
2224     MPI_Win_create(recvcnt, nprocs * sizeof(MPI_INT), sizeof(MPI_INT), MPI_INFO_NULL, World, &win2);
2225     for(int i=0; i<nprocs; ++i)
2226     {
2227         if(sendcnt[i]>0)
2228         {
2229             MPI_Win_lock(MPI_LOCK_SHARED,i,MPI_MODE_NOCHECK,win2);
2230             MPI_Put(&sendcnt[i], 1, MPI_INT, i, myrank, 1, MPI_INT, win2);
2231             MPI_Win_unlock(i, win2);
2232         }
2233     }
2234     MPI_Win_free(&win2);
2235 
2236 
2237 
2238     sdispls[0] = 0;
2239     rdispls[0] = 0;
2240     for(int i=0; i<nprocs; ++i)
2241     {
2242         sdispls[i+1] = sdispls[i] + sendcnt[i];
2243         rdispls[i+1] = rdispls[i] + recvcnt[i];
2244     }
2245 
2246     int * rmadispls = new int[nprocs+1];
2247     MPI_Win win3;
2248     MPI_Win_create(rmadispls, nprocs * sizeof(MPI_INT), sizeof(MPI_INT), MPI_INFO_NULL, World, &win3);
2249     for(int i=0; i<nprocs; ++i)
2250     {
2251         if(recvcnt[i]>0)
2252         {
2253             MPI_Win_lock(MPI_LOCK_SHARED,i,MPI_MODE_NOCHECK,win3);
2254             MPI_Put(&rdispls[i], 1, MPI_INT, i, myrank, 1, MPI_INT, win3);
2255             MPI_Win_unlock(i, win3);
2256         }
2257     }
2258     MPI_Win_free(&win3);
2259 
2260 
2261     NT * datbuf = new NT[ploclen];
2262     IT * indbuf = new IT[ploclen];
2263     int *count = new int[nprocs](); //current position
2264 #ifdef _OPENMP
2265 #pragma omp parallel for
2266 #endif
2267     for(IT i=0; i < ploclen; ++i)
2268     {
2269         IT locind;
2270         IT globind = __binopIdx(num[i], ind[i] + LengthUntil()); // get global index of the inverted vector
2271         int owner = Inverted.Owner(globind, locind);
2272         int id;
2273 #ifdef _OPENMP
2274         id = sdispls[owner] + __sync_fetch_and_add(&count[owner], 1);
2275 #else
2276         id = sdispls[owner] + count[owner];
2277         count[owner]++;
2278 #endif
2279         datbuf[id] = __binopVal(num[i], ind[i] + LengthUntil());
2280         indbuf[id] = locind;
2281     }
2282     delete [] count;
2283 
2284 
2285     IT totrecv = rdispls[nprocs];
2286     NT * recvdatbuf = new NT[totrecv];
2287     IT * recvindbuf = new IT[totrecv];
2288     MPI_Win win, win1;
2289     MPI_Win_create(recvdatbuf, totrecv * sizeof(NT), sizeof(NT), MPI_INFO_NULL, commGrid->GetWorld(), &win);
2290     MPI_Win_create(recvindbuf, totrecv * sizeof(IT), sizeof(IT), MPI_INFO_NULL, commGrid->GetWorld(), &win1);
2291     //MPI_Win_fence(0, win);
2292     //MPI_Win_fence(0, win1);
2293     for(int i=0; i<nprocs; ++i)
2294     {
2295         if(sendcnt[i]>0)
2296         {
2297             MPI_Win_lock(MPI_LOCK_SHARED, i, 0, win);
2298             MPI_Put(&datbuf[sdispls[i]], sendcnt[i], MPIType<NT>(), i, rmadispls[i], sendcnt[i], MPIType<NT>(), win);
2299             MPI_Win_unlock(i, win);
2300 
2301             MPI_Win_lock(MPI_LOCK_SHARED, i, 0, win1);
2302             MPI_Put(&indbuf[sdispls[i]], sendcnt[i], MPIType<IT>(), i, rmadispls[i], sendcnt[i], MPIType<IT>(), win1);
2303             MPI_Win_unlock(i, win1);
2304         }
2305     }
2306     //MPI_Win_fence(0, win);
2307     //MPI_Win_fence(0, win1);
2308     MPI_Win_free(&win);
2309     MPI_Win_free(&win1);
2310 
2311     delete [] datbuf;
2312     delete [] indbuf;
2313     delete [] rmadispls;
2314 
2315 
2316     std::vector< std::pair<IT,NT> > tosort;
2317     tosort.resize(totrecv);
2318 #ifdef _OPENMP
2319 #pragma omp parallel for
2320 #endif
2321     for(int i=0; i<totrecv; ++i)
2322     {
2323         tosort[i] = std::make_pair(recvindbuf[i], recvdatbuf[i]);
2324     }
2325     DeleteAll(recvindbuf, recvdatbuf);
2326     DeleteAll(sdispls, rdispls, sendcnt, recvcnt);
2327 
2328 #if defined(GNU_PARALLEL) && defined(_OPENMP)
2329     __gnu_parallel::sort(tosort.begin(), tosort.end());
2330 #else
2331     std::sort(tosort.begin(), tosort.end());
2332 #endif
2333 
2334     Inverted.ind.reserve(totrecv);
2335     Inverted.num.reserve(totrecv);
2336     IT lastIndex=-1;
2337 
2338     // not threaded because Inverted.ind is kept sorted
2339     for(typename std::vector<std::pair<IT,NT>>::iterator itr = tosort.begin(); itr != tosort.end(); ++itr)
2340     {
2341         if(lastIndex!=itr->first) // avoid duplicate indices
2342         {
2343             Inverted.ind.push_back(itr->first);
2344             Inverted.num.push_back(itr->second);
2345         }
2346         lastIndex = itr->first;
2347 
2348     }
2349 
2350     return Inverted;
2351 
2352 }
2353 
2354 
2355 
2356 template <typename IT, typename NT>
2357 template <typename NT1, typename _UnaryOperation>
Select(const FullyDistVec<IT,NT1> & denseVec,_UnaryOperation __unop)2358 void FullyDistSpVec<IT,NT>::Select (const FullyDistVec<IT,NT1> & denseVec, _UnaryOperation __unop)
2359 {
2360 	if(*commGrid == *(denseVec.commGrid))
2361 	{
2362 		if(TotalLength() != denseVec.TotalLength())
2363 		{
2364 			std::ostringstream outs;
2365 			outs << "Vector dimensions don't match (" << TotalLength() << " vs " << denseVec.TotalLength() << ") for Select\n";
2366 			SpParHelper::Print(outs.str());
2367 			MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
2368 		}
2369 		else
2370 		{
2371 
2372 			IT spsize = getlocnnz();
2373             IT k = 0;
2374             // iterate over the sparse vector
2375             for(IT i=0; i< spsize; ++i)
2376             {
2377                 if(__unop(denseVec.arr[ind[i]]))
2378                 {
2379                     ind[k] = ind[i];
2380                     num[k++] = num[i];
2381                 }
2382             }
2383             ind.resize(k);
2384             num.resize(k);
2385 		}
2386 	}
2387 	else
2388 	{
2389         std::ostringstream outs;
2390         outs << "Grids are not comparable for Select" << std::endl;
2391         SpParHelper::Print(outs.str());
2392 		MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2393 	}
2394 }
2395 
2396 
2397 // \todo: Shouldn't this wrap EWiseApply for code maintanence instead?
2398 template <typename IT, typename NT>
2399 template <typename NT1>
Setminus(const FullyDistSpVec<IT,NT1> & other)2400 void FullyDistSpVec<IT,NT>::Setminus (const FullyDistSpVec<IT,NT1> & other)
2401 {
2402     if(*commGrid == *(other.commGrid))
2403     {
2404         if(TotalLength() != other.TotalLength())
2405         {
2406             std::ostringstream outs;
2407             outs << "Vector dimensions don't match (" << TotalLength() << " vs " << other.TotalLength() << ") for Select\n";
2408             SpParHelper::Print(outs.str());
2409             MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
2410         }
2411         else
2412         {
2413 
2414             IT mysize = getlocnnz();
2415             IT othersize = other.getlocnnz();
2416             IT k = 0, i=0, j=0;
2417             // iterate over the sparse vector
2418             for(; i< mysize && j < othersize;)
2419             {
2420                 if(other.ind[j] == ind[i]) //skip
2421                 {
2422                     i++; j++;
2423                 }
2424                 else if(other.ind[j] > ind[i])
2425                 {
2426                     ind[k] = ind[i];
2427                     num[k++] = num[i++];
2428                 }
2429                 else j++;
2430             }
2431             while(i< mysize)
2432             {
2433                 ind[k] = ind[i];
2434                 num[k++] = num[i++];
2435             }
2436 
2437             ind.resize(k);
2438             num.resize(k);
2439         }
2440     }
2441     else
2442     {
2443         std::ostringstream outs;
2444         outs << "Grids are not comparable for Select" << std::endl;
2445         SpParHelper::Print(outs.str());
2446         MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2447     }
2448 }
2449 
2450 
2451 
2452 template <typename IT, typename NT>
2453 template <typename NT1, typename _UnaryOperation, typename _BinaryOperation>
SelectApply(const FullyDistVec<IT,NT1> & denseVec,_UnaryOperation __unop,_BinaryOperation __binop)2454 void FullyDistSpVec<IT,NT>::SelectApply (const FullyDistVec<IT,NT1> & denseVec, _UnaryOperation __unop, _BinaryOperation __binop)
2455 {
2456 	if(*commGrid == *(denseVec.commGrid))
2457 	{
2458 		if(TotalLength() != denseVec.TotalLength())
2459 		{
2460 			std::ostringstream outs;
2461 			outs << "Vector dimensions don't match (" << TotalLength() << " vs " << denseVec.TotalLength() << ") for Select\n";
2462 			SpParHelper::Print(outs.str());
2463 			MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
2464 		}
2465 		else
2466 		{
2467 
2468 			IT spsize = getlocnnz();
2469             IT k = 0;
2470             // iterate over the sparse vector
2471             for(IT i=0; i< spsize; ++i)
2472             {
2473                 if(__unop(denseVec.arr[ind[i]]))
2474                 {
2475                     ind[k] = ind[i];
2476                     num[k++] = __binop(num[i], denseVec.arr[ind[i]]);
2477                 }
2478             }
2479             ind.resize(k);
2480             num.resize(k);
2481 		}
2482 	}
2483 	else
2484 	{
2485         std::ostringstream outs;
2486         outs << "Grids are not comparable for Select" << std::endl;
2487         SpParHelper::Print(outs.str());
2488 		MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2489 	}
2490 }
2491 
2492 
2493 // apply an unary function to each nnz and return a new vector
2494 // can be a constrauctor
2495 /*
2496 template <typename IT, typename NT>
2497 template <typename NT1, typename _UnaryOperation>
2498 FullyDistSpVec<IT,NT1> FullyDistSpVec<IT,NT>::Apply(_UnaryOperation __unop)
2499 {
2500     FullyDistSpVec<IT,NT1> composed(commGrid, TotalLength());
2501     IT spsize = getlocnnz();
2502     for(IT i=0; i< spsize; ++i)
2503     {
2504         composed.ind.push_back(ind[i]);
2505         composed.num.push_back( __unop(num[i]));
2506     }
2507     return composed;
2508 }
2509 */
2510 
2511 
2512 
2513 /* exp version
2514   */
2515 template <class IT, class NT>
2516 template <typename _UnaryOperation>
FilterByVal(FullyDistSpVec<IT,IT> Selector,_UnaryOperation __unop,bool filterByIndex)2517 void FullyDistSpVec<IT,NT>::FilterByVal (FullyDistSpVec<IT,IT> Selector, _UnaryOperation __unop, bool filterByIndex)
2518 {
2519     if(*commGrid != *(Selector.commGrid))
2520     {
2521         std::ostringstream outs;
2522         outs << "Grids are not comparable for Filter" << std::endl;
2523         SpParHelper::Print(outs.str());
2524 		MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2525     }
2526     int nprocs = commGrid->GetSize();
2527     MPI_Comm World = commGrid->GetWorld();
2528 
2529 
2530 	int * rdispls = new int[nprocs];
2531     int sendcnt = Selector.ind.size();
2532     int * recvcnt = new int[nprocs];
2533     MPI_Allgather(&sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, World);
2534 
2535 	rdispls[0] = 0;
2536 	for(int i=0; i<nprocs-1; ++i)
2537 	{
2538 		rdispls[i+1] = rdispls[i] + recvcnt[i];
2539 	}
2540 
2541 
2542     IT * sendbuf = new IT[sendcnt];
2543 #ifdef _OPENMP
2544 #pragma omp parallel for
2545 #endif
2546     for(int k=0; k<sendcnt; k++)
2547     {
2548         if(filterByIndex)
2549             sendbuf[k] = Selector.ind[k] + Selector.LengthUntil();
2550         else
2551             sendbuf[k] = Selector.num[k];
2552     }
2553 
2554     IT totrecv = std::accumulate(recvcnt,recvcnt+nprocs, static_cast<IT>(0));
2555 
2556     std::vector<IT> recvbuf;
2557     recvbuf.resize(totrecv);
2558 
2559     MPI_Allgatherv(sendbuf, sendcnt, MPIType<IT>(), recvbuf.data(), recvcnt, rdispls, MPIType<IT>(), World);
2560     delete [] sendbuf;
2561     DeleteAll(rdispls,recvcnt);
2562 
2563      if(!filterByIndex) // need to sort
2564      {
2565 #if defined(GNU_PARALLEL) && defined(_OPENMP)
2566     __gnu_parallel::sort(recvbuf.begin(), recvbuf.end());
2567 #else
2568     std::sort(recvbuf.begin(), recvbuf.end());
2569 #endif
2570      }
2571 
2572     // now perform filter (recvbuf is sorted) // TODO: OpenMP parallel and keep things sorted
2573     IT k=0;
2574 
2575     for(IT i=0; i<num.size(); i++)
2576     {
2577         IT val = __unop(num[i]);
2578         if(!std::binary_search(recvbuf.begin(), recvbuf.end(), val))
2579         {
2580             ind[k] = ind[i];
2581             num[k++] = num[i];
2582         }
2583     }
2584     ind.resize(k);
2585     num.resize(k);
2586 
2587 
2588 }
2589 
2590 }
2591