1 /****************************************************************/
2 /* Parallel Combinatorial BLAS Library (for Graph Computations) */
3 /* version 1.6 -------------------------------------------------*/
4 /* date: 6/15/2017 ---------------------------------------------*/
5 /* authors: Ariful Azad, Aydin Buluc --------------------------*/
6 /****************************************************************/
7 /*
8 Copyright (c) 2010-2017, The Regents of the University of California
9
10 Permission is hereby granted, free of charge, to any person obtaining a copy
11 of this software and associated documentation files (the "Software"), to deal
12 in the Software without restriction, including without limitation the rights
13 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 copies of the Software, and to permit persons to whom the Software is
15 furnished to do so, subject to the following conditions:
16
17 The above copyright notice and this permission notice shall be included in
18 all copies or substantial portions of the Software.
19
20 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26 THE SOFTWARE.
27 */
28
29
30 #include <limits>
31 #include "FullyDistSpVec.h"
32 #include "SpDefs.h"
33 #include "SpHelper.h"
34 #include "hash.hpp"
35 #include "FileHeader.h"
36 #include <sys/types.h>
37 #include <sys/stat.h>
38
39 #ifdef GNU_PARALLEL
40 #include <parallel/algorithm>
41 #include <parallel/numeric>
42 #endif
43
44 #include "usort/parUtils.h"
45
46 namespace combblas {
47
48 template <class IT, class NT>
FullyDistSpVec(std::shared_ptr<CommGrid> grid)49 FullyDistSpVec<IT, NT>::FullyDistSpVec ( std::shared_ptr<CommGrid> grid)
50 : FullyDist<IT,NT,typename combblas::disable_if< combblas::is_boolean<NT>::value, NT >::type>(grid)
51 { };
52
53 template <class IT, class NT>
FullyDistSpVec(std::shared_ptr<CommGrid> grid,IT globallen)54 FullyDistSpVec<IT, NT>::FullyDistSpVec ( std::shared_ptr<CommGrid> grid, IT globallen)
55 : FullyDist<IT,NT,typename combblas::disable_if< combblas::is_boolean<NT>::value, NT >::type>(grid,globallen)
56 { };
57
58 template <class IT, class NT>
FullyDistSpVec()59 FullyDistSpVec<IT,NT>::FullyDistSpVec ()
60 : FullyDist<IT,NT,typename combblas::disable_if< combblas::is_boolean<NT>::value, NT >::type>()
61 { };
62
63 template <class IT, class NT>
FullyDistSpVec(IT globallen)64 FullyDistSpVec<IT,NT>::FullyDistSpVec (IT globallen)
65 : FullyDist<IT,NT,typename combblas::disable_if< combblas::is_boolean<NT>::value, NT >::type>(globallen)
66 { }
67
68
69 template <class IT, class NT>
operator =(const FullyDistSpVec<IT,NT> & rhs)70 FullyDistSpVec<IT,NT> & FullyDistSpVec<IT,NT>::operator=(const FullyDistSpVec< IT,NT > & rhs)
71 {
72 if(this != &rhs)
73 {
74 FullyDist<IT,NT,typename combblas::disable_if< combblas::is_boolean<NT>::value, NT >::type>::operator= (rhs); // to update glen and commGrid
75 ind = rhs.ind;
76 num = rhs.num;
77 }
78 return *this;
79 }
80
81 template <class IT, class NT>
FullyDistSpVec(const FullyDistVec<IT,NT> & rhs)82 FullyDistSpVec<IT,NT>::FullyDistSpVec (const FullyDistVec<IT,NT> & rhs) // Conversion copy-constructor
83 : FullyDist<IT,NT,typename combblas::disable_if< combblas::is_boolean<NT>::value, NT >::type>(rhs.commGrid,rhs.glen)
84 {
85 *this = rhs;
86 }
87
88 // Conversion copy-constructor where unary op is true
89 template <class IT, class NT>
90 template <typename _UnaryOperation>
FullyDistSpVec(const FullyDistVec<IT,NT> & rhs,_UnaryOperation unop)91 FullyDistSpVec<IT,NT>::FullyDistSpVec (const FullyDistVec<IT,NT> & rhs, _UnaryOperation unop)
92 : FullyDist<IT,NT,typename combblas::disable_if< combblas::is_boolean<NT>::value, NT >::type>(rhs.commGrid,rhs.glen)
93 {
94 //FullyDist<IT,NT,typename combblas::disable_if< combblas::is_boolean<NT>::value, NT >::type>::operator= (rhs); // to update glen and commGrid
95
96 std::vector<IT>().swap(ind);
97 std::vector<NT>().swap(num);
98 IT vecsize = rhs.LocArrSize();
99 for(IT i=0; i< vecsize; ++i)
100 {
101 if(unop(rhs.arr[i]))
102 {
103 ind.push_back(i);
104 num.push_back(rhs.arr[i]);
105 }
106 }
107 }
108
109
110
111 // create a sparse vector from local vectors
112 template <class IT, class NT>
FullyDistSpVec(std::shared_ptr<CommGrid> grid,IT globallen,const std::vector<IT> & indvec,const std::vector<NT> & numvec,bool SumDuplicates,bool sorted)113 FullyDistSpVec<IT,NT>::FullyDistSpVec (std::shared_ptr<CommGrid> grid, IT globallen, const std::vector<IT>& indvec, const std::vector<NT> & numvec, bool SumDuplicates, bool sorted)
114 : FullyDist<IT,NT,typename combblas::disable_if< combblas::is_boolean<NT>::value, NT >::type>(grid, globallen)
115 {
116
117 assert(indvec.size()==numvec.size());
118 IT vecsize = indvec.size();
119 if(!sorted)
120 {
121 std::vector< std::pair<IT,NT> > tosort(vecsize);
122 #ifdef _OPENMP
123 #pragma omp parallel for
124 #endif
125 for(IT i=0; i<vecsize; ++i)
126 {
127 tosort[i] = std::make_pair(indvec[i], numvec[i]);
128 }
129
130 #if defined(GNU_PARALLEL) && defined(_OPENMP)
131 __gnu_parallel::sort(tosort.begin(), tosort.end());
132 #else
133 std::sort(tosort.begin(), tosort.end());
134 #endif
135
136
137 ind.reserve(vecsize);
138 num.reserve(vecsize);
139 IT lastIndex=-1;
140 for(auto itr = tosort.begin(); itr != tosort.end(); ++itr)
141 {
142 if(lastIndex!=itr->first) //if SumDuplicates=false, keep only the first one
143 {
144 ind.push_back(itr->first);
145 num.push_back(itr->second);
146 lastIndex = itr->first;
147 }
148 else if(SumDuplicates)
149 {
150 num.back() += itr->second;
151 }
152 }
153
154 }
155 else
156 {
157
158 ind.reserve(vecsize);
159 num.reserve(vecsize);
160 IT lastIndex=-1;
161
162 for(IT i=0; i< vecsize; ++i)
163 {
164 if(lastIndex!=indvec[i]) //if SumDuplicates=false, keep only the first one
165 {
166 ind.push_back(indvec[i]);
167 num.push_back(numvec[i]);
168 lastIndex = indvec[i];
169 }
170 else if(SumDuplicates)
171 {
172 num.back() += numvec[i];
173 }
174 }
175 }
176 }
177
178
179
180 // ABAB: This function probably operates differently than a user would immediately expect
181 // ABAB: Write a well-posed description for it
182 template <class IT, class NT>
operator =(const FullyDistVec<IT,NT> & rhs)183 FullyDistSpVec<IT,NT> & FullyDistSpVec<IT,NT>::operator=(const FullyDistVec< IT,NT > & rhs) // conversion from dense
184 {
185 FullyDist<IT,NT,typename combblas::disable_if< combblas::is_boolean<NT>::value, NT >::type>::operator= (rhs); // to update glen and commGrid
186
187 std::vector<IT>().swap(ind);
188 std::vector<NT>().swap(num);
189 IT vecsize = rhs.LocArrSize();
190 for(IT i=0; i< vecsize; ++i)
191 {
192 // rhs.zero does not exist after CombBLAS 1.2
193 ind.push_back(i);
194 num.push_back(rhs.arr[i]);
195 }
196 return *this;
197 }
198
199
200 /************************************************************************
201 * Create a sparse vector from index and value vectors (dense vectors)
202 * FullyDistSpVec v(globalsize, inds, vals):
203 * nnz(v) = size(inds) = size(vals)
204 * size(v) = globallen
205 * if inds has duplicate entries and SumDuplicates is true then we sum
206 * the values of duplicate indices. Otherwise, only the first entry is kept.
207 ************************************************************************/
208 template <class IT, class NT>
FullyDistSpVec(IT globallen,const FullyDistVec<IT,IT> & inds,const FullyDistVec<IT,NT> & vals,bool SumDuplicates)209 FullyDistSpVec<IT,NT>::FullyDistSpVec (IT globallen, const FullyDistVec<IT,IT> & inds, const FullyDistVec<IT,NT> & vals, bool SumDuplicates)
210 : FullyDist<IT,NT,typename combblas::disable_if< combblas::is_boolean<NT>::value, NT >::type>(inds.commGrid,globallen)
211 {
212 if(*(inds.commGrid) != *(vals.commGrid))
213 {
214 SpParHelper::Print("Grids are not comparable, FullyDistSpVec() fails !");
215 MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
216 }
217 if(inds.TotalLength() != vals.TotalLength())
218 {
219 SpParHelper::Print("Index and value vectors have different sizes, FullyDistSpVec() fails !");
220 MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
221 }
222 //commGrid = inds.commGrid;
223 //glen = globallen;
224
225 IT maxind = inds.Reduce(maximum<IT>(), (IT) 0);
226 if(maxind>=globallen)
227 {
228 SpParHelper::Print("At least one index is greater than globallen, FullyDistSpVec() fails !");
229 MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
230 }
231
232
233
234 MPI_Comm World = commGrid->GetWorld();
235 int nprocs = commGrid->GetSize();
236 int * rdispls = new int[nprocs];
237 int * recvcnt = new int[nprocs];
238 int * sendcnt = new int[nprocs](); // initialize to 0
239 int * sdispls = new int[nprocs];
240
241 // ----- share count --------
242 IT locsize = inds.LocArrSize();
243 for(IT i=0; i<locsize; ++i)
244 {
245 IT locind;
246 int owner = Owner(inds.arr[i], locind);
247 sendcnt[owner]++;
248 }
249 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, World);
250
251
252 // ----- compute send and receive displacements --------
253
254 sdispls[0] = 0;
255 rdispls[0] = 0;
256 for(int i=0; i<nprocs-1; ++i)
257 {
258 sdispls[i+1] = sdispls[i] + sendcnt[i];
259 rdispls[i+1] = rdispls[i] + recvcnt[i];
260 }
261
262
263 // ----- prepare data to be sent --------
264
265 NT * datbuf = new NT[locsize];
266 IT * indbuf = new IT[locsize];
267 int *count = new int[nprocs](); //current position
268 for(IT i=0; i < locsize; ++i)
269 {
270 IT locind;
271 int owner = Owner(inds.arr[i], locind);
272 int id = sdispls[owner] + count[owner];
273 datbuf[id] = vals.arr[i];
274 indbuf[id] = locind;
275 count[owner]++;
276 }
277 delete [] count;
278 IT totrecv = std::accumulate(recvcnt,recvcnt+nprocs, static_cast<IT>(0));
279
280
281 // ----- Send and receive indices and values --------
282
283 NT * recvdatbuf = new NT[totrecv];
284 MPI_Alltoallv(datbuf, sendcnt, sdispls, MPIType<NT>(), recvdatbuf, recvcnt, rdispls, MPIType<NT>(), World);
285 delete [] datbuf;
286
287 IT * recvindbuf = new IT[totrecv];
288 MPI_Alltoallv(indbuf, sendcnt, sdispls, MPIType<IT>(), recvindbuf, recvcnt, rdispls, MPIType<IT>(), World);
289 delete [] indbuf;
290
291
292 // ------ merge and sort received data ----------
293
294 std::vector< std::pair<IT,NT> > tosort;
295 tosort.resize(totrecv);
296 for(int i=0; i<totrecv; ++i)
297 {
298 tosort[i] = std::make_pair(recvindbuf[i], recvdatbuf[i]);
299 }
300 DeleteAll(recvindbuf, recvdatbuf);
301 DeleteAll(sdispls, rdispls, sendcnt, recvcnt);
302 std::sort(tosort.begin(), tosort.end());
303
304
305 // ------ create local sparse vector ----------
306
307 ind.reserve(totrecv);
308 num.reserve(totrecv);
309 IT lastIndex=-1;
310 for(auto itr = tosort.begin(); itr != tosort.end(); ++itr)
311 {
312 if(lastIndex!=itr->first) //if SumDuplicates=false, keep only the first one
313 {
314 ind.push_back(itr->first);
315 num.push_back(itr->second);
316 lastIndex = itr->first;
317 }
318 else if(SumDuplicates)
319 {
320 num.back() += itr->second;
321 }
322 }
323 }
324
325
326 //! Returns a dense vector of nonzero values
327 //! for which the predicate is satisfied on values
328 template <class IT, class NT>
329 template <typename _Predicate>
FindVals(_Predicate pred) const330 FullyDistVec<IT,NT> FullyDistSpVec<IT,NT>::FindVals(_Predicate pred) const
331 {
332 FullyDistVec<IT,NT> found(commGrid);
333 MPI_Comm World = commGrid->GetWorld();
334 int nprocs = commGrid->GetSize();
335 int rank = commGrid->GetRank();
336
337 IT sizelocal = getlocnnz();
338 for(IT i=0; i<sizelocal; ++i)
339 {
340 if(pred(num[i]))
341 {
342 found.arr.push_back(num[i]);
343 }
344 }
345 IT * dist = new IT[nprocs];
346 IT nsize = found.arr.size();
347 dist[rank] = nsize;
348 MPI_Allgather(MPI_IN_PLACE, 1, MPIType<IT>(), dist, 1, MPIType<IT>(), World);
349 IT lengthuntil = std::accumulate(dist, dist+rank, static_cast<IT>(0));
350 found.glen = std::accumulate(dist, dist+nprocs, static_cast<IT>(0));
351
352 // Although the found vector is not reshuffled yet, its glen and commGrid are set
353 // We can call the Owner/MyLocLength/LengthUntil functions (to infer future distribution)
354
355 // rebalance/redistribute
356 int * sendcnt = new int[nprocs];
357 std::fill(sendcnt, sendcnt+nprocs, 0);
358 for(IT i=0; i<nsize; ++i)
359 {
360 IT locind;
361 int owner = found.Owner(i+lengthuntil, locind);
362 ++sendcnt[owner];
363 }
364 int * recvcnt = new int[nprocs];
365 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, World); // share the counts
366
367 int * sdispls = new int[nprocs];
368 int * rdispls = new int[nprocs];
369 sdispls[0] = 0;
370 rdispls[0] = 0;
371 for(int i=0; i<nprocs-1; ++i)
372 {
373 sdispls[i+1] = sdispls[i] + sendcnt[i];
374 rdispls[i+1] = rdispls[i] + recvcnt[i];
375 }
376 IT totrecv = std::accumulate(recvcnt,recvcnt+nprocs, static_cast<IT>(0));
377 std::vector<NT> recvbuf(totrecv);
378
379 // data is already in the right order in found.arr
380 MPI_Alltoallv(found.arr.data(), sendcnt, sdispls, MPIType<NT>(), recvbuf.data(), recvcnt, rdispls, MPIType<NT>(), World);
381 found.arr.swap(recvbuf);
382 delete [] dist;
383 DeleteAll(sendcnt, recvcnt, sdispls, rdispls);
384
385 return found;
386 }
387
388
389
390 //! Returns a dense vector of nonzero global indices
391 //! for which the predicate is satisfied on values
392 template <class IT, class NT>
393 template <typename _Predicate>
FindInds(_Predicate pred) const394 FullyDistVec<IT,IT> FullyDistSpVec<IT,NT>::FindInds(_Predicate pred) const
395 {
396 FullyDistVec<IT,IT> found(commGrid);
397 MPI_Comm World = commGrid->GetWorld();
398 int nprocs = commGrid->GetSize();
399 int rank = commGrid->GetRank();
400
401 IT sizelocal = getlocnnz();
402 IT sizesofar = LengthUntil();
403 for(IT i=0; i<sizelocal; ++i)
404 {
405 if(pred(num[i]))
406 {
407 found.arr.push_back(ind[i]+sizesofar);
408 }
409 }
410 IT * dist = new IT[nprocs];
411 IT nsize = found.arr.size();
412 dist[rank] = nsize;
413 MPI_Allgather(MPI_IN_PLACE, 1, MPIType<IT>(), dist, 1, MPIType<IT>(), World);
414 IT lengthuntil = std::accumulate(dist, dist+rank, static_cast<IT>(0));
415 found.glen = std::accumulate(dist, dist+nprocs, static_cast<IT>(0));
416
417 // Although the found vector is not reshuffled yet, its glen and commGrid are set
418 // We can call the Owner/MyLocLength/LengthUntil functions (to infer future distribution)
419
420 // rebalance/redistribute
421 int * sendcnt = new int[nprocs];
422 std::fill(sendcnt, sendcnt+nprocs, 0);
423 for(IT i=0; i<nsize; ++i)
424 {
425 IT locind;
426 int owner = found.Owner(i+lengthuntil, locind);
427 ++sendcnt[owner];
428 }
429 int * recvcnt = new int[nprocs];
430 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, World); // share the counts
431
432 int * sdispls = new int[nprocs];
433 int * rdispls = new int[nprocs];
434 sdispls[0] = 0;
435 rdispls[0] = 0;
436 for(int i=0; i<nprocs-1; ++i)
437 {
438 sdispls[i+1] = sdispls[i] + sendcnt[i];
439 rdispls[i+1] = rdispls[i] + recvcnt[i];
440 }
441 IT totrecv = std::accumulate(recvcnt,recvcnt+nprocs, static_cast<IT>(0));
442 std::vector<IT> recvbuf(totrecv);
443
444 // data is already in the right order in found.arr
445 MPI_Alltoallv(found.arr.data(), sendcnt, sdispls, MPIType<IT>(), recvbuf.data(), recvcnt, rdispls, MPIType<IT>(), World);
446 found.arr.swap(recvbuf);
447 delete [] dist;
448 DeleteAll(sendcnt, recvcnt, sdispls, rdispls);
449
450 return found;
451 }
452
453
454
455 template <class IT, class NT>
stealFrom(FullyDistSpVec<IT,NT> & victim)456 void FullyDistSpVec<IT,NT>::stealFrom(FullyDistSpVec<IT,NT> & victim)
457 {
458 FullyDist<IT,NT,typename combblas::disable_if< combblas::is_boolean<NT>::value, NT >::type>::operator= (victim); // to update glen and commGrid
459 ind.swap(victim.ind);
460 num.swap(victim.num);
461 }
462
463 template <class IT, class NT>
operator [](IT indx)464 NT FullyDistSpVec<IT,NT>::operator[](IT indx)
465 {
466 NT val;
467 IT locind;
468 int owner = Owner(indx, locind);
469 int found = 0;
470 if(commGrid->GetRank() == owner)
471 {
472 typename std::vector<IT>::const_iterator it = std::lower_bound(ind.begin(), ind.end(), locind); // ind is a sorted vector
473 if(it != ind.end() && locind == (*it)) // found
474 {
475 val = num[it-ind.begin()];
476 found = 1;
477 }
478 else
479 {
480 val = NT(); // return NULL
481 found = 0;
482 }
483 }
484 MPI_Bcast(&found, 1, MPI_INT, owner, commGrid->GetWorld());
485 MPI_Bcast(&val, 1, MPIType<NT>(), owner, commGrid->GetWorld());
486 wasFound = found;
487 return val;
488 }
489
490 template <class IT, class NT>
GetLocalElement(IT indx)491 NT FullyDistSpVec<IT,NT>::GetLocalElement(IT indx)
492 {
493 NT val = NT();
494 IT locind;
495 int owner = Owner(indx, locind);
496 int found = 0;
497 typename std::vector<IT>::const_iterator it = std::lower_bound(ind.begin(), ind.end(), locind); // ind is a sorted vector
498 if(commGrid->GetRank() == owner) {
499 if(it != ind.end() && locind == (*it)) // found
500 {
501 val = num[it-ind.begin()];
502 found = 1;
503 }
504 }
505 wasFound = found;
506 return val;
507 }
508
509 //! Indexing is performed 0-based
510 template <class IT, class NT>
SetElement(IT indx,NT numx)511 void FullyDistSpVec<IT,NT>::SetElement (IT indx, NT numx)
512 {
513 if(glen == 0)
514 SpParHelper::Print("WARNING: SetElement() called on a vector with zero length\n");
515
516 IT locind;
517 int owner = Owner(indx, locind);
518 if(commGrid->GetRank() == owner)
519 {
520 typename std::vector<IT>::iterator iter = std::lower_bound(ind.begin(), ind.end(), locind);
521 if(iter == ind.end()) // beyond limits, insert from back
522 {
523 ind.push_back(locind);
524 num.push_back(numx);
525 }
526 else if (locind < *iter) // not found, insert in the middle
527 {
528 // the order of insertions is crucial
529 // if we first insert to ind, then ind.begin() is invalidated !
530 num.insert(num.begin() + (iter-ind.begin()), numx);
531 ind.insert(iter, locind);
532 }
533 else // found
534 {
535 *(num.begin() + (iter-ind.begin())) = numx;
536 }
537 }
538 }
539
540 template <class IT, class NT>
DelElement(IT indx)541 void FullyDistSpVec<IT,NT>::DelElement (IT indx)
542 {
543 IT locind;
544 int owner = Owner(indx, locind);
545 if(commGrid->GetRank() == owner)
546 {
547 typename std::vector<IT>::iterator iter = std::lower_bound(ind.begin(), ind.end(), locind);
548 if(iter != ind.end() && !(locind < *iter))
549 {
550 num.erase(num.begin() + (iter-ind.begin()));
551 ind.erase(iter);
552 }
553 }
554 }
555
556 /**
557 * The distribution and length are inherited from ri
558 * Its zero is inherited from *this (because ri is of type IT)
559 * Example: This is [{1,n1},{4,n4},{7,n7},{8,n8},{9,n9}] with P_00 owning {1,4} and P_11 rest
560 * Assume ri = [4,1,5,7] is distributed as two elements per processor
561 * Then result has length 4, distrubuted two element per processor, even though 5 and 7 doesn't exist
562 * This is because we are returning a "dense" output, so the absent elements will be padded with 0
563 **/
564 template <class IT, class NT>
operator ()(const FullyDistVec<IT,IT> & ri) const565 FullyDistVec<IT,NT> FullyDistSpVec<IT,NT>::operator() (const FullyDistVec<IT,IT> & ri) const
566 {
567 MPI_Comm World = commGrid->GetWorld();
568 FullyDistVec<IT,NT> Indexed(ri.commGrid, ri.glen, NT()); // NT() is the initial value
569 int nprocs;
570 MPI_Comm_size(World, &nprocs);
571 std::unordered_map<IT, IT> revr_map; // inverted index that maps indices of *this to indices of output
572 std::vector< std::vector<IT> > data_req(nprocs);
573 IT locnnz = ri.LocArrSize();
574
575 // ABAB: Input sanity check
576 int local = 1;
577 int whole = 1;
578 for(IT i=0; i < locnnz; ++i)
579 {
580 if(ri.arr[i] >= glen || ri.arr[i] < 0)
581 {
582 local = 0;
583 }
584 }
585 MPI_Allreduce( &local, &whole, 1, MPI_INT, MPI_BAND, World);
586 if(whole == 0)
587 {
588 throw outofrangeexception();
589 }
590
591 for(IT i=0; i < locnnz; ++i)
592 {
593 IT locind;
594 int owner = Owner(ri.arr[i], locind); // numerical values in ri are 0-based
595 data_req[owner].push_back(locind);
596 revr_map.insert(typename std::unordered_map<IT, IT>::value_type(locind, i));
597 }
598 IT * sendbuf = new IT[locnnz];
599 int * sendcnt = new int[nprocs];
600 int * sdispls = new int[nprocs];
601 for(int i=0; i<nprocs; ++i)
602 sendcnt[i] = data_req[i].size();
603
604 int * rdispls = new int[nprocs];
605 int * recvcnt = new int[nprocs];
606 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, World); // share the request counts
607
608 sdispls[0] = 0;
609 rdispls[0] = 0;
610 for(int i=0; i<nprocs-1; ++i)
611 {
612 sdispls[i+1] = sdispls[i] + sendcnt[i];
613 rdispls[i+1] = rdispls[i] + recvcnt[i];
614 }
615 IT totrecv = std::accumulate(recvcnt,recvcnt+nprocs,0);
616 IT * recvbuf = new IT[totrecv];
617
618 for(int i=0; i<nprocs; ++i)
619 {
620 std::copy(data_req[i].begin(), data_req[i].end(), sendbuf+sdispls[i]);
621 std::vector<IT>().swap(data_req[i]);
622 }
623 MPI_Alltoallv(sendbuf, sendcnt, sdispls, MPIType<IT>(), recvbuf, recvcnt, rdispls, MPIType<IT>(), World); // request data
624
625 // We will return the requested data,
626 // our return can be at most as big as the request
627 // and smaller if we are missing some elements
628 IT * indsback = new IT[totrecv];
629 NT * databack = new NT[totrecv];
630
631 int * ddispls = new int[nprocs];
632 std::copy(rdispls, rdispls+nprocs, ddispls);
633 for(int i=0; i<nprocs; ++i)
634 {
635 // this is not the most efficient method because it scans ind vector nprocs = sqrt(p) times
636 IT * it = std::set_intersection(recvbuf+rdispls[i], recvbuf+rdispls[i]+recvcnt[i], ind.begin(), ind.end(), indsback+rdispls[i]);
637 recvcnt[i] = (it - (indsback+rdispls[i])); // update with size of the intersection
638
639 IT vi = 0;
640 for(int j = rdispls[i]; j < rdispls[i] + recvcnt[i]; ++j) // fetch the numerical values
641 {
642 // indsback is a subset of ind
643 while(indsback[j] > ind[vi])
644 ++vi;
645 databack[j] = num[vi++];
646 }
647 }
648
649 DeleteAll(recvbuf, ddispls);
650 NT * databuf = new NT[ri.LocArrSize()];
651
652 MPI_Alltoall(recvcnt, 1, MPI_INT, sendcnt, 1, MPI_INT, World); // share the response counts, overriding request counts
653 MPI_Alltoallv(indsback, recvcnt, rdispls, MPIType<IT>(), sendbuf, sendcnt, sdispls, MPIType<IT>(), World); // send indices
654 MPI_Alltoallv(databack, recvcnt, rdispls, MPIType<NT>(), databuf, sendcnt, sdispls, MPIType<NT>(), World); // send data
655 DeleteAll(rdispls, recvcnt, indsback, databack);
656
657 // Now create the output from databuf (holds numerical values) and sendbuf (holds indices)
658 // arr is already resized during its construction
659 for(int i=0; i<nprocs; ++i)
660 {
661 // data will come globally sorted from processors
662 // i.e. ind owned by proc_i is always smaller than
663 // ind owned by proc_j for j < i
664 for(int j=sdispls[i]; j< sdispls[i]+sendcnt[i]; ++j)
665 {
666 typename std::unordered_map<IT,IT>::iterator it = revr_map.find(sendbuf[j]);
667 Indexed.arr[it->second] = databuf[j];
668 // cout << it->second << "(" << sendbuf[j] << "):" << databuf[j] << endl;
669 }
670 }
671 DeleteAll(sdispls, sendcnt, sendbuf, databuf);
672 return Indexed;
673 }
674
675 template <class IT, class NT>
iota(IT globalsize,NT first)676 void FullyDistSpVec<IT,NT>::iota(IT globalsize, NT first)
677 {
678 glen = globalsize;
679 IT length = MyLocLength();
680 ind.resize(length);
681 num.resize(length);
682 SpHelper::iota(ind.begin(), ind.end(), 0); // offset'd within processors
683 SpHelper::iota(num.begin(), num.end(), LengthUntil() + first); // global across processors
684 }
685
686
687 //! iota over existing nonzero entries
688 template <class IT, class NT>
nziota(NT first)689 void FullyDistSpVec<IT,NT>::nziota(NT first)
690 {
691 std::iota(num.begin(), num.end(), NnzUntil() + first); // global across processors
692 }
693
694 //! Returns the number of nonzeros until this processor
695 template <class IT, class NT>
696 IT FullyDistSpVec<IT,NT>::NnzUntil() const
697 {
698 IT mynnz = ind.size();
699 IT prevnnz = 0;
700 MPI_Scan(&mynnz, &prevnnz, 1, MPIType<IT>(), MPI_SUM, commGrid->GetWorld());
701 return (prevnnz - mynnz);
702 }
703
704
705
706 /* old version
707 // - sorts the entries with respect to nonzero values
708 // - ignores structural zeros
709 // - keeps the sparsity structure intact
710 // - returns a permutation representing the mapping from old to new locations
711 template <class IT, class NT>
712 FullyDistSpVec<IT, IT> FullyDistSpVec<IT, NT>::sort()
713 {
714 MPI_Comm World = commGrid->GetWorld();
715 FullyDistSpVec<IT,IT> temp(commGrid);
716 if(getnnz()==0) return temp;
717 IT nnz = getlocnnz();
718 pair<NT,IT> * vecpair = new pair<NT,IT>[nnz];
719
720
721
722 int nprocs, rank;
723 MPI_Comm_size(World, &nprocs);
724 MPI_Comm_rank(World, &rank);
725
726 IT * dist = new IT[nprocs];
727 dist[rank] = nnz;
728 MPI_Allgather(MPI_IN_PLACE, 1, MPIType<IT>(), dist, 1, MPIType<IT>(), World);
729 IT until = LengthUntil();
730 for(IT i=0; i< nnz; ++i)
731 {
732 vecpair[i].first = num[i]; // we'll sort wrt numerical values
733 vecpair[i].second = ind[i] + until;
734
735 }
736 SpParHelper::MemoryEfficientPSort(vecpair, nnz, dist, World);
737
738 vector< IT > nind(nnz);
739 vector< IT > nnum(nnz);
740
741 for(IT i=0; i< nnz; ++i)
742 {
743 num[i] = vecpair[i].first; // sorted range (change the object itself)
744 nind[i] = ind[i]; // make sure the sparsity distribution is the same
745 nnum[i] = vecpair[i].second; // inverse permutation stored as numerical values
746
747 }
748 delete [] vecpair;
749 delete [] dist;
750
751 temp.glen = glen;
752 temp.ind = nind;
753 temp.num = nnum;
754 return temp;
755 }
756 */
757
758
759 /*
760 TODO: This function is just a hack at this moment.
761 The indices of the return vector is not correct.
762 FIX this
763 */
764 // - sorts the entries with respect to nonzero values
765 // - ignores structural zeros
766 // - keeps the sparsity structure intact
767 // - returns a permutation representing the mapping from old to new locations
768 template <class IT, class NT>
sort()769 FullyDistSpVec<IT, IT> FullyDistSpVec<IT, NT>::sort()
770 {
771 MPI_Comm World = commGrid->GetWorld();
772 FullyDistSpVec<IT,IT> temp(commGrid);
773 if(getnnz()==0) return temp;
774 IT nnz = getlocnnz();
775 std::pair<NT,IT> * vecpair = new std::pair<NT,IT>[nnz];
776
777
778
779 int nprocs, rank;
780 MPI_Comm_size(World, &nprocs);
781 MPI_Comm_rank(World, &rank);
782
783 IT * dist = new IT[nprocs];
784 dist[rank] = nnz;
785 MPI_Allgather(MPI_IN_PLACE, 1, MPIType<IT>(), dist, 1, MPIType<IT>(), World);
786 IT until = LengthUntil();
787 #ifdef THREADED
788 #pragma omp parallel for
789 #endif
790 for(IT i=0; i< nnz; ++i)
791 {
792 vecpair[i].first = num[i]; // we'll sort wrt numerical values
793 vecpair[i].second = ind[i] + until;
794
795 }
796 std::vector<std::pair<NT,IT>> sorted = SpParHelper::KeyValuePSort(vecpair, nnz, dist, World);
797
798 nnz = sorted.size();
799 temp.num.resize(nnz);
800 temp.ind.resize(nnz);
801
802 #ifdef THREADED
803 #pragma omp parallel for
804 #endif
805 for(IT i=0; i< nnz; ++i)
806 {
807 //num[i] = sorted[i].first; // sorted range (change the object itself)
808 //nind[i] = ind[i]; // make sure the sparsity distribution is the same
809 temp.num[i] = sorted[i].second; // inverse permutation stored as numerical values
810 temp.ind[i] = i; // we are not using this information at this moment
811 }
812
813 delete [] vecpair;
814 delete [] dist;
815
816 temp.glen = glen;
817 return temp;
818 }
819
820
821
822 /*
823 // - sorts the entries with respect to nonzero values
824 // - ignores structural zeros
825 // - keeps the sparsity structure intact
826 // - returns a permutation representing the mapping from old to new locations
827 template <class IT, class NT>
828 FullyDistSpVec<IT, IT> FullyDistSpVec<IT, NT>::sort()
829 {
830 MPI_Comm World = commGrid->GetWorld();
831 FullyDistSpVec<IT,IT> temp(commGrid);
832 if(getnnz()==0) return temp;
833 IT nnz = getlocnnz();
834 //pair<NT,IT> * vecpair = new pair<NT,IT>[nnz];
835 vector<IndexHolder<NT>> in(nnz);
836 //vector<IndexHolder<NT>> out;
837
838
839
840 int nprocs, rank;
841 MPI_Comm_size(World, &nprocs);
842 MPI_Comm_rank(World, &rank);
843
844 //IT * dist = new IT[nprocs];
845 //dist[rank] = nnz;
846 //MPI_Allgather(MPI_IN_PLACE, 1, MPIType<IT>(), dist, 1, MPIType<IT>(), World);
847 IT until = LengthUntil();
848 for(IT i=0; i< nnz; ++i)
849 {
850 //vecpair[i].first = num[i]; // we'll sort wrt numerical values
851 //vecpair[i].second = ind[i] + until;
852
853 in[i] = IndexHolder<NT>(num[i], static_cast<unsigned long>(ind[i] + until));
854 }
855 //SpParHelper::MemoryEfficientPSort(vecpair, nnz, dist, World);
856
857 //MPI_Barrier(World);
858 //cout << "before sorting " << in.size() << endl;
859 par::sampleSort(in, World);
860 //MPI_Barrier(World);
861 //cout << "after sorting " << in.size() << endl;
862 //MPI_Barrier(World);
863
864 //vector< IT > nind(out.size());
865 //vector< IT > nnum(out.size());
866
867 temp.ind.resize(in.size());
868 temp.num.resize(in.size());
869 for(IT i=0; i< in.size(); ++i)
870 {
871 //num[i] = vecpair[i].first; // sorted range (change the object itself)
872 //nind[i] = ind[i]; // make sure the sparsity distribution is the same
873 //nnum[i] = vecpair[i].second; // inverse permutation stored as numerical values
874
875 //num[i] = out[i].value; // sorted range (change the object itself)
876 //nind[i] = ind[i]; // make sure the sparsity distribution is the same
877 //nnum[i] = static_cast<IT>(out[i].index); // inverse permutation stored as numerical values
878 temp.num[i] = static_cast<IT>(in[i].index); // inverse permutation stored as numerical values
879 //cout << temp.num[i] << " " ;
880 }
881
882 temp.glen = glen;
883 return temp;
884 }
885 */
886
887
888 template <class IT, class NT>
889 template <typename _BinaryOperation >
UniqAll2All(_BinaryOperation __binary_op,MPI_Op mympiop)890 FullyDistSpVec<IT,NT> FullyDistSpVec<IT, NT>::UniqAll2All(_BinaryOperation __binary_op, MPI_Op mympiop)
891 {
892 MPI_Comm World = commGrid->GetWorld();
893 int nprocs = commGrid->GetSize();
894
895 std::vector< std::vector< NT > > datsent(nprocs);
896 std::vector< std::vector< IT > > indsent(nprocs);
897
898 IT locind;
899 size_t locvec = num.size(); // nnz in local vector
900 IT lenuntil = LengthUntil(); // to convert to global index
901 for(size_t i=0; i< locvec; ++i)
902 {
903 uint64_t myhash; // output of MurmurHash3_x64_64 is 64-bits regardless of the input length
904 MurmurHash3_x64_64((const void*) &(num[i]),sizeof(NT), 0, &myhash);
905 double range = static_cast<double>(myhash) * static_cast<double>(glen);
906 NT mapped = range / static_cast<double>(std::numeric_limits<uint64_t>::max()); // mapped is in range [0,n)
907 int owner = Owner(mapped, locind);
908
909 datsent[owner].push_back(num[i]); // all identical entries will be hashed to the same value -> same processor
910 indsent[owner].push_back(ind[i]+lenuntil);
911 }
912 int * sendcnt = new int[nprocs];
913 int * sdispls = new int[nprocs];
914 for(int i=0; i<nprocs; ++i)
915 sendcnt[i] = (int) datsent[i].size();
916
917 int * rdispls = new int[nprocs];
918 int * recvcnt = new int[nprocs];
919 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, World); // share the request counts
920 sdispls[0] = 0;
921 rdispls[0] = 0;
922 for(int i=0; i<nprocs-1; ++i)
923 {
924 sdispls[i+1] = sdispls[i] + sendcnt[i];
925 rdispls[i+1] = rdispls[i] + recvcnt[i];
926 }
927 NT * datbuf = new NT[locvec];
928 for(int i=0; i<nprocs; ++i)
929 {
930 std::copy(datsent[i].begin(), datsent[i].end(), datbuf+sdispls[i]);
931 std::vector<NT>().swap(datsent[i]);
932 }
933 IT * indbuf = new IT[locvec];
934 for(int i=0; i<nprocs; ++i)
935 {
936 std::copy(indsent[i].begin(), indsent[i].end(), indbuf+sdispls[i]);
937 std::vector<IT>().swap(indsent[i]);
938 }
939 IT totrecv = std::accumulate(recvcnt,recvcnt+nprocs, static_cast<IT>(0));
940 NT * recvdatbuf = new NT[totrecv];
941 MPI_Alltoallv(datbuf, sendcnt, sdispls, MPIType<NT>(), recvdatbuf, recvcnt, rdispls, MPIType<NT>(), World);
942 delete [] datbuf;
943
944 IT * recvindbuf = new IT[totrecv];
945 MPI_Alltoallv(indbuf, sendcnt, sdispls, MPIType<IT>(), recvindbuf, recvcnt, rdispls, MPIType<IT>(), World);
946 delete [] indbuf;
947
948 std::vector< std::pair<NT,IT> > tosort; // in fact, tomerge would be a better name but it is unlikely to be faster
949
950 for(int i=0; i<nprocs; ++i)
951 {
952 for(int j = rdispls[i]; j < rdispls[i] + recvcnt[i]; ++j) // fetch the numerical values
953 {
954 tosort.push_back(std::make_pair(recvdatbuf[j], recvindbuf[j]));
955 }
956 }
957 DeleteAll(recvindbuf, recvdatbuf);
958 std::sort(tosort.begin(), tosort.end());
959 //std::unique returns an iterator to the element that follows the last element not removed.
960 typename std::vector< std::pair<NT,IT> >::iterator last;
961 last = std::unique (tosort.begin(), tosort.end(), equal_first<NT,IT>());
962
963 std::vector< std::vector< NT > > datback(nprocs);
964 std::vector< std::vector< IT > > indback(nprocs);
965
966 for(typename std::vector< std::pair<NT,IT> >::iterator itr = tosort.begin(); itr != last; ++itr)
967 {
968 IT locind;
969 int owner = Owner(itr->second, locind);
970
971 datback[owner].push_back(itr->first);
972 indback[owner].push_back(locind);
973 }
974 for(int i=0; i<nprocs; ++i) sendcnt[i] = (int) datback[i].size();
975 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, World); // share the request counts
976 for(int i=0; i<nprocs-1; ++i)
977 {
978 sdispls[i+1] = sdispls[i] + sendcnt[i];
979 rdispls[i+1] = rdispls[i] + recvcnt[i];
980 }
981 datbuf = new NT[tosort.size()];
982 for(int i=0; i<nprocs; ++i)
983 {
984 std::copy(datback[i].begin(), datback[i].end(), datbuf+sdispls[i]);
985 std::vector<NT>().swap(datback[i]);
986 }
987 indbuf = new IT[tosort.size()];
988 for(int i=0; i<nprocs; ++i)
989 {
990 std::copy(indback[i].begin(), indback[i].end(), indbuf+sdispls[i]);
991 std::vector<IT>().swap(indback[i]);
992 }
993 totrecv = std::accumulate(recvcnt,recvcnt+nprocs, static_cast<IT>(0)); // update value
994
995 recvdatbuf = new NT[totrecv];
996 MPI_Alltoallv(datbuf, sendcnt, sdispls, MPIType<NT>(), recvdatbuf, recvcnt, rdispls, MPIType<NT>(), World);
997 delete [] datbuf;
998
999 recvindbuf = new IT[totrecv];
1000 MPI_Alltoallv(indbuf, sendcnt, sdispls, MPIType<IT>(), recvindbuf, recvcnt, rdispls, MPIType<IT>(), World);
1001 delete [] indbuf;
1002
1003 FullyDistSpVec<IT,NT> Indexed(commGrid, glen); // length(Indexed) = length(glen) = length(*this)
1004
1005 std::vector< std::pair<IT,NT> > back2sort;
1006 for(int i=0; i<nprocs; ++i)
1007 {
1008 for(int j = rdispls[i]; j < rdispls[i] + recvcnt[i]; ++j) // fetch the numerical values
1009 {
1010 back2sort.push_back(std::make_pair(recvindbuf[j], recvdatbuf[j]));
1011 }
1012 }
1013 std::sort(back2sort.begin(), back2sort.end());
1014 for(typename std::vector< std::pair<IT,NT> >::iterator itr = back2sort.begin(); itr != back2sort.end(); ++itr)
1015 {
1016 Indexed.ind.push_back(itr->first);
1017 Indexed.num.push_back(itr->second);
1018 }
1019
1020 DeleteAll(sdispls, rdispls, sendcnt, recvcnt);
1021 DeleteAll(recvindbuf, recvdatbuf);
1022 return Indexed;
1023 }
1024
1025
1026 // ABAB: \todo Concept control so it only gets called in integers
1027 template <class IT, class NT>
1028 template <typename _BinaryOperation >
Uniq(_BinaryOperation __binary_op,MPI_Op mympiop)1029 FullyDistSpVec<IT,NT> FullyDistSpVec<IT, NT>::Uniq(_BinaryOperation __binary_op, MPI_Op mympiop)
1030 {
1031 return UniqAll2All(__binary_op, mympiop);
1032 }
1033
1034 template <class IT, class NT>
operator +=(const FullyDistSpVec<IT,NT> & rhs)1035 FullyDistSpVec<IT,NT> & FullyDistSpVec<IT, NT>::operator+=(const FullyDistSpVec<IT,NT> & rhs)
1036 {
1037 if(this != &rhs)
1038 {
1039 if(glen != rhs.glen)
1040 {
1041 std::cerr << "Vector dimensions don't match for addition\n";
1042 return *this;
1043 }
1044 IT lsize = getlocnnz();
1045 IT rsize = rhs.getlocnnz();
1046
1047 std::vector< IT > nind;
1048 std::vector< NT > nnum;
1049 nind.reserve(lsize+rsize);
1050 nnum.reserve(lsize+rsize);
1051
1052 IT i=0, j=0;
1053 while(i < lsize && j < rsize)
1054 {
1055 // assignment won't change the size of vector, push_back is necessary
1056 if(ind[i] > rhs.ind[j])
1057 {
1058 nind.push_back( rhs.ind[j] );
1059 nnum.push_back( rhs.num[j++] );
1060 }
1061 else if(ind[i] < rhs.ind[j])
1062 {
1063 nind.push_back( ind[i] );
1064 nnum.push_back( num[i++] );
1065 }
1066 else
1067 {
1068 nind.push_back( ind[i] );
1069 nnum.push_back( num[i++] + rhs.num[j++] );
1070 }
1071 }
1072 while( i < lsize) // rhs was depleted first
1073 {
1074 nind.push_back( ind[i] );
1075 nnum.push_back( num[i++] );
1076 }
1077 while( j < rsize) // *this was depleted first
1078 {
1079 nind.push_back( rhs.ind[j] );
1080 nnum.push_back( rhs.num[j++] );
1081 }
1082 ind.swap(nind); // ind will contain the elements of nind with capacity shrunk-to-fit size
1083 num.swap(nnum);
1084 }
1085 else
1086 {
1087 typename std::vector<NT>::iterator it;
1088 for(it = num.begin(); it != num.end(); ++it)
1089 (*it) *= 2;
1090 }
1091 return *this;
1092 };
1093 template <class IT, class NT>
operator -=(const FullyDistSpVec<IT,NT> & rhs)1094 FullyDistSpVec<IT,NT> & FullyDistSpVec<IT, NT>::operator-=(const FullyDistSpVec<IT,NT> & rhs)
1095 {
1096 if(this != &rhs)
1097 {
1098 if(glen != rhs.glen)
1099 {
1100 std::cerr << "Vector dimensions don't match for addition\n";
1101 return *this;
1102 }
1103 IT lsize = getlocnnz();
1104 IT rsize = rhs.getlocnnz();
1105 std::vector< IT > nind;
1106 std::vector< NT > nnum;
1107 nind.reserve(lsize+rsize);
1108 nnum.reserve(lsize+rsize);
1109
1110 IT i=0, j=0;
1111 while(i < lsize && j < rsize)
1112 {
1113 // assignment won't change the size of vector, push_back is necessary
1114 if(ind[i] > rhs.ind[j])
1115 {
1116 nind.push_back( rhs.ind[j] );
1117 nnum.push_back( -static_cast<NT>(rhs.num[j++]) );
1118 }
1119 else if(ind[i] < rhs.ind[j])
1120 {
1121 nind.push_back( ind[i] );
1122 nnum.push_back( num[i++] );
1123 }
1124 else
1125 {
1126 nind.push_back( ind[i] );
1127 nnum.push_back( num[i++] - rhs.num[j++] ); // ignore numerical cancellations
1128 }
1129 }
1130 while( i < lsize) // rhs was depleted first
1131 {
1132 nind.push_back( ind[i] );
1133 nnum.push_back( num[i++] );
1134 }
1135 while( j < rsize) // *this was depleted first
1136 {
1137 nind.push_back( rhs.ind[j] );
1138 nnum.push_back( NT() - (rhs.num[j++]) );
1139 }
1140 ind.swap(nind); // ind will contain the elements of nind with capacity shrunk-to-fit size
1141 num.swap(nnum);
1142 }
1143 else
1144 {
1145 ind.clear();
1146 num.clear();
1147 }
1148 return *this;
1149 };
1150
1151
1152 template <class IT, class NT>
1153 template <typename _BinaryOperation>
SparseCommon(std::vector<std::vector<std::pair<IT,NT>>> & data,_BinaryOperation BinOp)1154 void FullyDistSpVec<IT,NT>::SparseCommon(std::vector< std::vector < std::pair<IT,NT> > > & data, _BinaryOperation BinOp)
1155 {
1156 int nprocs = commGrid->GetSize();
1157 int * sendcnt = new int[nprocs];
1158 int * recvcnt = new int[nprocs];
1159 for(int i=0; i<nprocs; ++i)
1160 sendcnt[i] = data[i].size(); // sizes are all the same
1161
1162 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, commGrid->GetWorld()); // share the counts
1163 int * sdispls = new int[nprocs]();
1164 int * rdispls = new int[nprocs]();
1165 std::partial_sum(sendcnt, sendcnt+nprocs-1, sdispls+1);
1166 std::partial_sum(recvcnt, recvcnt+nprocs-1, rdispls+1);
1167 IT totrecv = rdispls[nprocs-1]+recvcnt[nprocs-1];
1168 IT totsend = sdispls[nprocs-1]+sendcnt[nprocs-1];
1169
1170
1171 std::pair<IT,NT> * senddata = new std::pair<IT,NT>[totsend]; // re-used for both rows and columns
1172 for(int i=0; i<nprocs; ++i)
1173 {
1174 std::copy(data[i].begin(), data[i].end(), senddata+sdispls[i]);
1175 std::vector< std::pair<IT,NT> >().swap(data[i]); // clear memory
1176 }
1177 MPI_Datatype MPI_pair;
1178 MPI_Type_contiguous(sizeof(std::tuple<IT,NT>), MPI_CHAR, &MPI_pair);
1179 MPI_Type_commit(&MPI_pair);
1180
1181 std::pair<IT,NT> * recvdata = new std::pair<IT,NT>[totrecv];
1182 MPI_Alltoallv(senddata, sendcnt, sdispls, MPI_pair, recvdata, recvcnt, rdispls, MPI_pair, commGrid->GetWorld());
1183
1184 DeleteAll(senddata, sendcnt, recvcnt, sdispls, rdispls);
1185 MPI_Type_free(&MPI_pair);
1186
1187 if(!is_sorted(recvdata, recvdata+totrecv))
1188 std::sort(recvdata, recvdata+totrecv);
1189
1190 ind.push_back(recvdata[0].first);
1191 num.push_back(recvdata[0].second);
1192 for(IT i=1; i< totrecv; ++i)
1193 {
1194 if(ind.back() == recvdata[i].first)
1195 {
1196 num.back() = BinOp(num.back(), recvdata[i].second);
1197 }
1198 else
1199 {
1200 ind.push_back(recvdata[i].first);
1201 num.push_back(recvdata[i].second);
1202 }
1203 }
1204 delete [] recvdata;
1205 }
1206
1207 template <class IT, class NT>
1208 template <typename _BinaryOperation>
ParallelRead(const std::string & filename,bool onebased,_BinaryOperation BinOp)1209 void FullyDistSpVec<IT,NT>::ParallelRead (const std::string & filename, bool onebased, _BinaryOperation BinOp)
1210 {
1211 int64_t gnnz; // global nonzeros (glen is already declared as part of this class's private data)
1212 int64_t linesread = 0;
1213
1214 FILE *f;
1215 int myrank = commGrid->GetRank();
1216 int nprocs = commGrid->GetSize();
1217 if(myrank == 0)
1218 {
1219 if((f = fopen(filename.c_str(),"r"))==NULL)
1220 {
1221 std::cout << "File failed to open\n";
1222 MPI_Abort(commGrid->commWorld, NOFILE);
1223 }
1224 else
1225 {
1226 fscanf(f,"%lld %lld\n", &glen, &gnnz);
1227 }
1228 std::cout << "Total number of nonzeros expected across all processors is " << gnnz << std::endl;
1229
1230 }
1231 MPI_Bcast(&glen, 1, MPIType<int64_t>(), 0, commGrid->commWorld);
1232 MPI_Bcast(&gnnz, 1, MPIType<int64_t>(), 0, commGrid->commWorld);
1233
1234
1235 struct stat st; // get file size
1236 if (stat(filename.c_str(), &st) == -1)
1237 {
1238 MPI_Abort(commGrid->commWorld, NOFILE);
1239 }
1240 int64_t file_size = st.st_size;
1241 MPI_Offset fpos, end_fpos;
1242 if(myrank == 0) // the offset needs to be for this rank
1243 {
1244 std::cout << "File is " << file_size << " bytes" << std::endl;
1245 fpos = ftell(f);
1246 fclose(f);
1247 }
1248 else
1249 {
1250 fpos = myrank * file_size / nprocs;
1251 }
1252 if(myrank != (nprocs-1)) end_fpos = (myrank + 1) * file_size / nprocs;
1253 else end_fpos = file_size;
1254 MPI_Barrier(commGrid->commWorld);
1255
1256 MPI_File mpi_fh;
1257 MPI_File_open (commGrid->commWorld, const_cast<char*>(filename.c_str()), MPI_MODE_RDONLY, MPI_INFO_NULL, &mpi_fh);
1258
1259 std::vector< std::vector < std::pair<IT,NT> > > data(nprocs); // data to send
1260
1261 std::vector<std::string> lines;
1262
1263 SpParHelper::Print("Fetching first piece\n");
1264
1265 MPI_Barrier(commGrid->commWorld);
1266 bool finished = SpParHelper::FetchBatch(mpi_fh, fpos, end_fpos, true, lines, myrank);
1267 int64_t entriesread = lines.size();
1268 SpParHelper::Print("Fetched first piece\n");
1269
1270 MPI_Barrier(commGrid->commWorld);
1271
1272 IT ii;
1273 NT vv;
1274 for (auto itr=lines.begin(); itr != lines.end(); ++itr)
1275 {
1276 std::stringstream ss(*itr);
1277 ss >> ii >> vv;
1278 if(onebased) ii--;
1279 IT locind;
1280 int owner = Owner(ii, locind); // recipient (owner) processor
1281 data[owner].push_back(std::make_pair(locind,vv));
1282 }
1283
1284 while(!finished)
1285 {
1286 finished = SpParHelper::FetchBatch(mpi_fh, fpos, end_fpos, false, lines, myrank);
1287 entriesread += lines.size();
1288 for (auto itr=lines.begin(); itr != lines.end(); ++itr)
1289 {
1290 std::stringstream ss(*itr);
1291 ss >> ii >> vv;
1292 if(onebased) ii--;
1293 IT locind;
1294 int owner = Owner(ii, locind); // recipient (owner) processor
1295 data[owner].push_back(std::make_pair(locind,vv));
1296 }
1297 }
1298 int64_t allentriesread;
1299 MPI_Reduce(&entriesread, &allentriesread, 1, MPIType<int64_t>(), MPI_SUM, 0, commGrid->commWorld);
1300 #ifdef COMBBLAS_DEBUG
1301 if(myrank == 0)
1302 std::cout << "Reading finished. Total number of entries read across all processors is " << allentriesread << std::endl;
1303 #endif
1304
1305 SparseCommon(data, BinOp);
1306 }
1307
1308 template <class IT, class NT>
1309 template <class HANDLER>
ParallelWrite(const std::string & filename,bool onebased,HANDLER handler,bool includeindices,bool includeheader)1310 void FullyDistSpVec<IT,NT>::ParallelWrite(const std::string & filename, bool onebased, HANDLER handler, bool includeindices, bool includeheader)
1311 {
1312 int myrank = commGrid->GetRank();
1313 int nprocs = commGrid->GetSize();
1314 IT totalLength = TotalLength();
1315 IT totalNNZ = getnnz();
1316
1317 std::stringstream ss;
1318 if(includeheader && myrank == 0)
1319 {
1320 ss << totalLength << '\t' << totalNNZ << '\n'; // rank-0 has the header
1321 }
1322 IT entries = getlocnnz();
1323 IT sizeuntil = 0;
1324 MPI_Exscan( &entries, &sizeuntil, 1, MPIType<IT>(), MPI_SUM, commGrid->GetWorld() );
1325 if(myrank == 0) sizeuntil = 0; // because MPI_Exscan says the recvbuf in process 0 is undefined
1326
1327 if(includeindices)
1328 {
1329 if(onebased) sizeuntil += 1; // increment by 1
1330
1331 for(IT i=0; i< entries; ++i)
1332 {
1333 ss << ind[i]+sizeuntil << '\t';
1334 handler.save(ss, num[i], ind[i]+sizeuntil);
1335 ss << '\n';
1336 }
1337 }
1338 else // the base doesn't matter if we don't include indices
1339 {
1340 IT dummy = 0; // dummy because we don't want indices to be printed
1341 for(IT i=0; i< entries; ++i)
1342 {
1343 handler.save(ss, num[i], dummy);
1344 ss << '\n';
1345 }
1346 }
1347
1348 std::string text = ss.str();
1349
1350 int64_t * bytes = new int64_t[nprocs];
1351 bytes[myrank] = text.size();
1352 MPI_Allgather(MPI_IN_PLACE, 1, MPIType<int64_t>(), bytes, 1, MPIType<int64_t>(), commGrid->GetWorld());
1353 int64_t bytesuntil = std::accumulate(bytes, bytes+myrank, static_cast<int64_t>(0));
1354 int64_t bytestotal = std::accumulate(bytes, bytes+nprocs, static_cast<int64_t>(0));
1355
1356 if(myrank == 0) // only leader rights the original file with no content
1357 {
1358 std::ofstream ofs(filename.c_str(), std::ios::binary | std::ios::out);
1359 #ifdef COMBBLAS_DEBUG
1360 std::cout << "Creating file with " << bytestotal << " bytes" << std::endl;
1361 #endif
1362 ofs.seekp(bytestotal - 1);
1363 ofs.write("", 1); // this will likely create a sparse file so the actual disks won't spin yet
1364 ofs.close();
1365 }
1366 MPI_Barrier(commGrid->GetWorld());
1367
1368 struct stat st; // get file size
1369 if (stat(filename.c_str(), &st) == -1)
1370 {
1371 MPI_Abort(commGrid->GetWorld(), NOFILE);
1372 }
1373 if(myrank == nprocs-1) // let some other processor do the testing
1374 {
1375 #ifdef COMBBLAS_DEBUG
1376 std::cout << "File is actually " << st.st_size << " bytes seen from process " << myrank << std::endl;
1377 #endif
1378 }
1379
1380 FILE *ffinal;
1381 if ((ffinal = fopen(filename.c_str(), "rb+")) == NULL) // then everyone fills it
1382 {
1383 printf("COMBBLAS: Vector output file %s failed to open at process %d\n", filename.c_str(), myrank);
1384 MPI_Abort(commGrid->GetWorld(), NOFILE);
1385 }
1386 fseek (ffinal , bytesuntil , SEEK_SET );
1387 fwrite(text.c_str(),1, bytes[myrank] ,ffinal);
1388 fflush(ffinal);
1389 fclose(ffinal);
1390 delete [] bytes;
1391 }
1392
1393 //! Called on an existing object
1394 //! ABAB: Obsolete, will be deleted once moved to Github (and becomes independent of KDT)
1395 template <class IT, class NT>
1396 template <class HANDLER>
ReadDistribute(std::ifstream & infile,int master,HANDLER handler)1397 std::ifstream& FullyDistSpVec<IT,NT>::ReadDistribute (std::ifstream& infile, int master, HANDLER handler)
1398 {
1399 IT total_nnz;
1400 MPI_Comm World = commGrid->GetWorld();
1401 int neighs = commGrid->GetSize(); // number of neighbors (including oneself)
1402 int buffperneigh = MEMORYINBYTES / (neighs * (sizeof(IT) + sizeof(NT)));
1403
1404 int * displs = new int[neighs];
1405 for (int i=0; i<neighs; ++i)
1406 displs[i] = i*buffperneigh;
1407
1408 int * curptrs = NULL;
1409 int recvcount = 0;
1410 IT * inds = NULL;
1411 NT * vals = NULL;
1412 int rank = commGrid->GetRank();
1413 if(rank == master) // 1 processor only
1414 {
1415 inds = new IT [ buffperneigh * neighs ];
1416 vals = new NT [ buffperneigh * neighs ];
1417 curptrs = new int[neighs];
1418 std::fill_n(curptrs, neighs, 0); // fill with zero
1419 if (infile.is_open())
1420 {
1421 infile.clear();
1422 infile.seekg(0);
1423 IT numrows, numcols;
1424 bool indIsRow = true;
1425 infile >> numrows >> numcols >> total_nnz;
1426 if (numcols == 1)
1427 { // column vector, read vector indices from the row index
1428 indIsRow = true;
1429 glen = numrows;
1430 }
1431 else
1432 { // row vector, read vector indices from the column index
1433 indIsRow = false;
1434 glen = numcols;
1435 }
1436 MPI_Bcast(&glen, 1, MPIType<IT>(), master, World);
1437
1438 IT tempind;
1439 IT temprow, tempcol;
1440 IT cnz = 0;
1441 while ( (!infile.eof()) && cnz < total_nnz)
1442 {
1443 infile >> temprow >> tempcol;
1444 if (indIsRow)
1445 tempind = temprow;
1446 else
1447 tempind = tempcol;
1448 tempind--;
1449 IT locind;
1450 int rec = Owner(tempind, locind); // recipient (owner) processor (ABAB: But if the length is not set yet, this should be wrong)
1451 inds[ rec * buffperneigh + curptrs[rec] ] = locind;
1452 vals[ rec * buffperneigh + curptrs[rec] ] = handler.read(infile, tempind);
1453 ++ (curptrs[rec]);
1454
1455 if(curptrs[rec] == buffperneigh || (cnz == (total_nnz-1)) ) // one buffer is full, or file is done !
1456 {
1457 // first, send the receive counts ...
1458 MPI_Scatter(curptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, master, World);
1459
1460 // generate space for own recv data ... (use arrays because vector<bool> is cripled, if NT=bool)
1461 IT * tempinds = new IT[recvcount];
1462 NT * tempvals = new NT[recvcount];
1463
1464 // then, send all buffers that to their recipients ...
1465 MPI_Scatterv(inds, curptrs, displs, MPIType<IT>(), tempinds, recvcount, MPIType<IT>(), master, World);
1466 MPI_Scatterv(vals, curptrs, displs, MPIType<NT>(), tempvals, recvcount, MPIType<NT>(), master, World);
1467
1468 // now push what is ours to tuples
1469 for(IT i=0; i< recvcount; ++i)
1470 {
1471 ind.push_back( tempinds[i] ); // already offset'd by the sender
1472 num.push_back( tempvals[i] );
1473 }
1474
1475 // reset current pointers so that we can reuse {inds,vals} buffers
1476 std::fill_n(curptrs, neighs, 0);
1477 DeleteAll(tempinds, tempvals);
1478 }
1479 ++ cnz;
1480 }
1481 assert (cnz == total_nnz);
1482
1483 // Signal the end of file to other processors along the diagonal
1484 std::fill_n(curptrs, neighs, std::numeric_limits<int>::max());
1485 MPI_Scatter(curptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, master, World);
1486 }
1487 else // input file does not exist !
1488 {
1489 glen = 0;
1490 MPI_Bcast(&glen, 1, MPIType<IT>(), master, World);
1491 }
1492 DeleteAll(inds,vals, curptrs);
1493 }
1494 else // all other processors
1495 {
1496 MPI_Bcast(&glen, 1, MPIType<IT>(), master, World);
1497 while(glen > 0) // otherwise, input file do not exist
1498 {
1499 // first receive the receive counts ...
1500 MPI_Scatter(curptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, master, World);
1501
1502 if( recvcount == std::numeric_limits<int>::max())
1503 break;
1504
1505 // create space for incoming data ...
1506 IT * tempinds = new IT[recvcount];
1507 NT * tempvals = new NT[recvcount];
1508
1509 // receive actual data ... (first 4 arguments are ignored in the receiver side)
1510 MPI_Scatterv(inds, curptrs, displs, MPIType<IT>(), tempinds, recvcount, MPIType<IT>(), master, World);
1511 MPI_Scatterv(vals, curptrs, displs, MPIType<NT>(), tempvals, recvcount, MPIType<NT>(), master, World);
1512
1513 // now push what is ours to tuples
1514 for(IT i=0; i< recvcount; ++i)
1515 {
1516 ind.push_back( tempinds[i] );
1517 num.push_back( tempvals[i] );
1518 }
1519 DeleteAll(tempinds, tempvals);
1520 }
1521 }
1522 delete [] displs;
1523 MPI_Barrier(World);
1524 return infile;
1525 }
1526
1527 template <class IT, class NT>
1528 template <class HANDLER>
SaveGathered(std::ofstream & outfile,int master,HANDLER handler,bool printProcSplits)1529 void FullyDistSpVec<IT,NT>::SaveGathered(std::ofstream& outfile, int master, HANDLER handler, bool printProcSplits)
1530 {
1531 int rank, nprocs;
1532 MPI_Comm World = commGrid->GetWorld();
1533 MPI_Comm_rank(World, &rank);
1534 MPI_Comm_size(World, &nprocs);
1535 MPI_File thefile;
1536
1537 char _fn[] = "temp_fullydistspvec"; // AL: this is to avoid the problem that C++ string literals are const char* while C string literals are char*, leading to a const warning (technically error, but compilers are tolerant)
1538 int mpi_err = MPI_File_open(World, _fn, MPI_MODE_CREATE | MPI_MODE_WRONLY, MPI_INFO_NULL, &thefile);
1539 if(mpi_err != MPI_SUCCESS)
1540 {
1541 char mpi_err_str[MPI_MAX_ERROR_STRING];
1542 int mpi_err_strlen;
1543 MPI_Error_string(mpi_err, mpi_err_str, &mpi_err_strlen);
1544 printf("MPI_File_open failed (%s)\n", mpi_err_str);
1545 MPI_Abort(World, 1);
1546 }
1547
1548 IT * dist = new IT[nprocs];
1549 dist[rank] = getlocnnz();
1550 MPI_Allgather(MPI_IN_PLACE, 1, MPIType<IT>(), dist, 1, MPIType<IT>(), World);
1551 IT sizeuntil = std::accumulate(dist, dist+rank, static_cast<IT>(0));
1552 IT totalLength = TotalLength();
1553 IT totalNNZ = getnnz();
1554
1555 struct mystruct
1556 {
1557 IT ind;
1558 NT num;
1559 };
1560
1561 MPI_Datatype datatype;
1562 MPI_Type_contiguous(sizeof(mystruct), MPI_CHAR, &datatype );
1563 MPI_Type_commit(&datatype);
1564 int dsize;
1565 MPI_Type_size(datatype, &dsize);
1566
1567 // The disp displacement argument specifies the position
1568 // (absolute offset in bytes from the beginning of the file)
1569 char native[] = "native"; // AL: this is to avoid the problem that C++ string literals are const char* while C string literals are char*, leading to a const warning (technically error, but compilers are tolerant)
1570 MPI_File_set_view(thefile, static_cast<int>(sizeuntil * dsize), datatype, datatype, native, MPI_INFO_NULL);
1571
1572 int count = ind.size();
1573 mystruct * packed = new mystruct[count];
1574 for(int i=0; i<count; ++i)
1575 {
1576 packed[i].ind = ind[i] + sizeuntil;
1577 packed[i].num = num[i];
1578 }
1579
1580 mpi_err = MPI_File_write(thefile, packed, count, datatype, NULL);
1581 if(mpi_err != MPI_SUCCESS)
1582 {
1583 char mpi_err_str[MPI_MAX_ERROR_STRING];
1584 int mpi_err_strlen;
1585 MPI_Error_string(mpi_err, mpi_err_str, &mpi_err_strlen);
1586 printf("MPI_File_write failed (%s)\n", mpi_err_str);
1587 MPI_Abort(World, 1);
1588 }
1589
1590 MPI_Barrier(World);
1591 MPI_File_close(&thefile);
1592 delete [] packed;
1593
1594 // Now let processor-0 read the file and print
1595 if(rank == 0)
1596 {
1597 FILE * f = fopen("temp_fullydistspvec", "r");
1598 if(!f)
1599 {
1600 std::cerr << "Problem reading binary input file\n";
1601 return;
1602 }
1603 IT maxd = *std::max_element(dist, dist+nprocs);
1604 mystruct * data = new mystruct[maxd];
1605
1606 std::streamsize oldPrecision = outfile.precision();
1607 outfile.precision(21);
1608 outfile << totalLength << "\t1\t" << totalNNZ << std::endl;
1609 for(int i=0; i<nprocs; ++i)
1610 {
1611 // read n_per_proc integers and print them
1612 if (fread(data, dsize, dist[i], f) < static_cast<size_t>(dist[i]))
1613 {
1614 std::cout << "fread 660 failed! attempting to continue..." << std::endl;
1615 }
1616
1617 if (printProcSplits)
1618 outfile << "Elements stored on proc " << i << ":" << std::endl;
1619
1620 for (int j = 0; j < dist[i]; j++)
1621 {
1622 outfile << data[j].ind+1 << "\t1\t";
1623 handler.save(outfile, data[j].num, data[j].ind);
1624 outfile << std::endl;
1625 }
1626 }
1627 outfile.precision(oldPrecision);
1628 fclose(f);
1629 remove("temp_fullydistspvec");
1630 delete [] data;
1631 delete [] dist;
1632 }
1633 MPI_Barrier(World);
1634 }
1635
1636
1637 template <class IT, class NT>
1638 template <typename _Predicate>
Count(_Predicate pred) const1639 IT FullyDistSpVec<IT,NT>::Count(_Predicate pred) const
1640 {
1641 IT local = count_if( num.begin(), num.end(), pred );
1642 IT whole = 0;
1643 MPI_Allreduce( &local, &whole, 1, MPIType<IT>(), MPI_SUM, commGrid->GetWorld());
1644 return whole;
1645 }
1646
1647
1648 template <class IT, class NT>
1649 template <typename _BinaryOperation>
Reduce(_BinaryOperation __binary_op,NT init) const1650 NT FullyDistSpVec<IT,NT>::Reduce(_BinaryOperation __binary_op, NT init) const
1651 {
1652 // std::accumulate returns init for empty sequences
1653 // the semantics are init + num[0] + ... + num[n]
1654 NT localsum = std::accumulate( num.begin(), num.end(), init, __binary_op);
1655
1656 NT totalsum = init;
1657 MPI_Allreduce( &localsum, &totalsum, 1, MPIType<NT>(), MPIOp<_BinaryOperation, NT>::op(), commGrid->GetWorld());
1658 return totalsum;
1659 }
1660
1661 template <class IT, class NT>
1662 template <typename OUT, typename _BinaryOperation, typename _UnaryOperation>
1663 OUT FullyDistSpVec<IT,NT>::Reduce(_BinaryOperation __binary_op, OUT default_val, _UnaryOperation __unary_op) const
1664 {
1665 // std::accumulate returns identity for empty sequences
1666 OUT localsum = default_val;
1667
1668 if (num.size() > 0)
1669 {
1670 typename std::vector< NT >::const_iterator iter = num.begin();
1671 //localsum = __unary_op(*iter);
1672 //iter++;
1673 while (iter < num.end())
1674 {
1675 localsum = __binary_op(localsum, __unary_op(*iter));
1676 iter++;
1677 }
1678 }
1679
1680 OUT totalsum = default_val;
1681 MPI_Allreduce( &localsum, &totalsum, 1, MPIType<OUT>(), MPIOp<_BinaryOperation, OUT>::op(), commGrid->GetWorld());
1682 return totalsum;
1683 }
1684
1685 template <class IT, class NT>
PrintInfo(std::string vectorname) const1686 void FullyDistSpVec<IT,NT>::PrintInfo(std::string vectorname) const
1687 {
1688 IT nznz = getnnz();
1689 if (commGrid->GetRank() == 0)
1690 std::cout << "As a whole, " << vectorname << " has: " << nznz << " nonzeros and length " << glen << std::endl;
1691 }
1692
1693 template <class IT, class NT>
DebugPrint()1694 void FullyDistSpVec<IT,NT>::DebugPrint()
1695 {
1696 int rank, nprocs;
1697 MPI_Comm World = commGrid->GetWorld();
1698 MPI_Comm_rank(World, &rank);
1699 MPI_Comm_size(World, &nprocs);
1700 MPI_File thefile;
1701
1702 char tfilename[32] = "temp_fullydistspvec";
1703 MPI_File_open(World, tfilename, MPI_MODE_CREATE | MPI_MODE_WRONLY, MPI_INFO_NULL, &thefile);
1704
1705 IT * dist = new IT[nprocs];
1706 dist[rank] = getlocnnz();
1707 MPI_Allgather(MPI_IN_PLACE, 1, MPIType<IT>(), dist, 1, MPIType<IT>(), World);
1708 IT sizeuntil = std::accumulate(dist, dist+rank, static_cast<IT>(0));
1709
1710 struct mystruct
1711 {
1712 IT ind;
1713 NT num;
1714 };
1715
1716 MPI_Datatype datatype;
1717 MPI_Type_contiguous(sizeof(mystruct), MPI_CHAR, &datatype );
1718 MPI_Type_commit(&datatype);
1719 int dsize;
1720 MPI_Type_size(datatype, &dsize);
1721
1722 // The disp displacement argument specifies the position
1723 // (absolute offset in bytes from the beginning of the file)
1724 char openmode[32] = "native";
1725 MPI_File_set_view(thefile, static_cast<int>(sizeuntil * dsize), datatype, datatype, openmode, MPI_INFO_NULL);
1726
1727 int count = ind.size();
1728 mystruct * packed = new mystruct[count];
1729 for(int i=0; i<count; ++i)
1730 {
1731 packed[i].ind = ind[i];
1732 packed[i].num = num[i];
1733 }
1734 MPI_File_write(thefile, packed, count, datatype, MPI_STATUS_IGNORE);
1735 MPI_Barrier(World);
1736 MPI_File_close(&thefile);
1737 delete [] packed;
1738
1739 // Now let processor-0 read the file and print
1740 if(rank == 0)
1741 {
1742 FILE * f = fopen("temp_fullydistspvec", "r");
1743 if(!f)
1744 {
1745 std::cerr << "Problem reading binary input file\n";
1746 return;
1747 }
1748 IT maxd = *std::max_element(dist, dist+nprocs);
1749 mystruct * data = new mystruct[maxd];
1750
1751 for(int i=0; i<nprocs; ++i)
1752 {
1753 // read n_per_proc integers and print them
1754 if (fread(data, dsize, dist[i],f) < static_cast<size_t>(dist[i]))
1755 {
1756 std::cout << "fread 802 failed! attempting to continue..." << std::endl;
1757 }
1758
1759 std::cout << "Elements stored on proc " << i << ": {";
1760 for (int j = 0; j < dist[i]; j++)
1761 {
1762 std::cout << "(" << data[j].ind << "," << data[j].num << "), ";
1763 }
1764 std::cout << "}" << std::endl;
1765 }
1766 fclose(f);
1767 remove("temp_fullydistspvec");
1768 delete [] data;
1769 delete [] dist;
1770 }
1771 MPI_Barrier(World);
1772 }
1773
1774 template <class IT, class NT>
Reset()1775 void FullyDistSpVec<IT,NT>::Reset()
1776 {
1777 ind.resize(0);
1778 num.resize(0);
1779 }
1780
1781 // Assigns given locations their value, needs to be sorted
1782 template <class IT, class NT>
BulkSet(IT inds[],int count)1783 void FullyDistSpVec<IT,NT>::BulkSet(IT inds[], int count) {
1784 ind.resize(count);
1785 num.resize(count);
1786 std::copy(inds, inds+count, ind.data());
1787 std::copy(inds, inds+count, num.data());
1788 }
1789
1790
1791
1792 /*
1793 ** Create a new sparse vector vout by swaping the indices and values of a sparse vector vin.
1794 ** the length of vout is globallen, which must be greater than the maximum entry of vin.
1795 ** nnz(vin) = nnz(vout)
1796 ** for every nonzero entry vin[k]: vout[vin[k]] = k
1797 */
1798 /*
1799 template <class IT, class NT>
1800 FullyDistSpVec<IT,NT> FullyDistSpVec<IT,NT>::Invert (IT globallen)
1801 {
1802 FullyDistSpVec<IT,NT> Inverted(commGrid, globallen);
1803 IT max_entry = Reduce(maximum<IT>(), (IT) 0 ) ;
1804 if(max_entry >= globallen)
1805 {
1806 cout << "Sparse vector has entries (" << max_entry << ") larger than requested global vector length " << globallen << endl;
1807 return Inverted;
1808 }
1809
1810
1811 int nprocs = commGrid->GetSize();
1812 vector< vector< NT > > datsent(nprocs);
1813 vector< vector< IT > > indsent(nprocs);
1814
1815 IT ploclen = getlocnnz();
1816 for(IT k=0; k < ploclen; ++k)
1817 {
1818 IT locind;
1819 int owner = Inverted.Owner(num[k], locind); // numerical values in rhs are 0-based indices
1820 IT gind = ind[k] + LengthUntil();
1821 datsent[owner].push_back(gind);
1822 indsent[owner].push_back(locind); // so that we don't need no correction at the recipient
1823 }
1824 int * sendcnt = new int[nprocs];
1825 int * sdispls = new int[nprocs];
1826 for(int i=0; i<nprocs; ++i)
1827 sendcnt[i] = (int) datsent[i].size();
1828
1829 int * rdispls = new int[nprocs];
1830 int * recvcnt = new int[nprocs];
1831 MPI_Comm World = commGrid->GetWorld();
1832 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, World); // share the request counts
1833 sdispls[0] = 0;
1834 rdispls[0] = 0;
1835 for(int i=0; i<nprocs-1; ++i)
1836 {
1837 sdispls[i+1] = sdispls[i] + sendcnt[i];
1838 rdispls[i+1] = rdispls[i] + recvcnt[i];
1839 }
1840 NT * datbuf = new NT[ploclen];
1841 for(int i=0; i<nprocs; ++i)
1842 {
1843 copy(datsent[i].begin(), datsent[i].end(), datbuf+sdispls[i]);
1844 vector<NT>().swap(datsent[i]);
1845 }
1846 IT * indbuf = new IT[ploclen];
1847 for(int i=0; i<nprocs; ++i)
1848 {
1849 copy(indsent[i].begin(), indsent[i].end(), indbuf+sdispls[i]);
1850 vector<IT>().swap(indsent[i]);
1851 }
1852 IT totrecv = accumulate(recvcnt,recvcnt+nprocs, static_cast<IT>(0));
1853 NT * recvdatbuf = new NT[totrecv];
1854 MPI_Alltoallv(datbuf, sendcnt, sdispls, MPIType<NT>(), recvdatbuf, recvcnt, rdispls, MPIType<NT>(), World);
1855 delete [] datbuf;
1856
1857 IT * recvindbuf = new IT[totrecv];
1858 MPI_Alltoallv(indbuf, sendcnt, sdispls, MPIType<IT>(), recvindbuf, recvcnt, rdispls, MPIType<IT>(), World);
1859 delete [] indbuf;
1860
1861
1862 vector< pair<IT,NT> > tosort; // in fact, tomerge would be a better name but it is unlikely to be faster
1863
1864 for(int i=0; i<nprocs; ++i)
1865 {
1866 for(int j = rdispls[i]; j < rdispls[i] + recvcnt[i]; ++j) // fetch the numerical values
1867 {
1868 tosort.push_back(make_pair(recvindbuf[j], recvdatbuf[j]));
1869 }
1870 }
1871 DeleteAll(recvindbuf, recvdatbuf);
1872 DeleteAll(sdispls, rdispls, sendcnt, recvcnt);
1873 std::sort(tosort.begin(), tosort.end());
1874
1875 IT lastIndex=-1;
1876 for(typename vector<pair<IT,NT>>::iterator itr = tosort.begin(); itr != tosort.end(); ++itr)
1877 {
1878 if(lastIndex!=itr->first) // avoid duplicate indices
1879 {
1880 Inverted.ind.push_back(itr->first);
1881 Inverted.num.push_back(itr->second);
1882 }
1883 lastIndex = itr->first;
1884
1885 }
1886 return Inverted;
1887
1888 }
1889 */
1890
1891
1892
1893 /*
1894 ** Create a new sparse vector vout by swaping the indices and values of a sparse vector vin.
1895 ** the length of vout is globallen, which must be greater than the maximum entry of vin.
1896 ** nnz(vin) = nnz(vout)
1897 ** for every nonzero entry vin[k]: vout[vin[k]] = k
1898 */
1899
1900 template <class IT, class NT>
Invert(IT globallen)1901 FullyDistSpVec<IT,NT> FullyDistSpVec<IT,NT>::Invert (IT globallen)
1902 {
1903 FullyDistSpVec<IT,NT> Inverted(commGrid, globallen);
1904 IT max_entry = Reduce(maximum<IT>(), (IT) 0 ) ;
1905 if(max_entry >= globallen)
1906 {
1907 std::cout << "Sparse vector has entries (" << max_entry << ") larger than requested global vector length " << globallen << std::endl;
1908 return Inverted;
1909 }
1910
1911 MPI_Comm World = commGrid->GetWorld();
1912 int nprocs = commGrid->GetSize();
1913 int * rdispls = new int[nprocs+1];
1914 int * recvcnt = new int[nprocs];
1915 int * sendcnt = new int[nprocs](); // initialize to 0
1916 int * sdispls = new int[nprocs+1];
1917
1918
1919 IT ploclen = getlocnnz();
1920 #ifdef _OPENMP
1921 #pragma omp parallel for
1922 #endif
1923 for(IT k=0; k < ploclen; ++k)
1924 {
1925 IT locind;
1926 int owner = Inverted.Owner(num[k], locind);
1927 #ifdef _OPENMP
1928 __sync_fetch_and_add(&sendcnt[owner], 1);
1929 #else
1930 sendcnt[owner]++;
1931 #endif
1932 }
1933
1934
1935 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, World); // share the request counts
1936
1937 sdispls[0] = 0;
1938 rdispls[0] = 0;
1939 for(int i=0; i<nprocs; ++i)
1940 {
1941 sdispls[i+1] = sdispls[i] + sendcnt[i];
1942 rdispls[i+1] = rdispls[i] + recvcnt[i];
1943 }
1944
1945
1946
1947 NT * datbuf = new NT[ploclen];
1948 IT * indbuf = new IT[ploclen];
1949 int *count = new int[nprocs](); //current position
1950 #ifdef _OPENMP
1951 #pragma omp parallel for
1952 #endif
1953 for(IT i=0; i < ploclen; ++i)
1954 {
1955 IT locind;
1956 int owner = Inverted.Owner(num[i], locind);
1957 int id;
1958 #ifdef _OPENMP
1959 id = sdispls[owner] + __sync_fetch_and_add(&count[owner], 1);
1960 #else
1961 id = sdispls[owner] + count[owner];
1962 count[owner]++;
1963 #endif
1964 datbuf[id] = ind[i] + LengthUntil();
1965 indbuf[id] = locind;
1966 }
1967 delete [] count;
1968
1969
1970 IT totrecv = rdispls[nprocs];
1971 NT * recvdatbuf = new NT[totrecv];
1972 MPI_Alltoallv(datbuf, sendcnt, sdispls, MPIType<NT>(), recvdatbuf, recvcnt, rdispls, MPIType<NT>(), World);
1973 delete [] datbuf;
1974
1975 IT * recvindbuf = new IT[totrecv];
1976 MPI_Alltoallv(indbuf, sendcnt, sdispls, MPIType<IT>(), recvindbuf, recvcnt, rdispls, MPIType<IT>(), World);
1977 delete [] indbuf;
1978
1979
1980 std::vector< std::pair<IT,NT> > tosort;
1981 tosort.resize(totrecv);
1982 #ifdef _OPENMP
1983 #pragma omp parallel for
1984 #endif
1985 for(int i=0; i<totrecv; ++i)
1986 {
1987 tosort[i] = std::make_pair(recvindbuf[i], recvdatbuf[i]);
1988 }
1989 DeleteAll(recvindbuf, recvdatbuf);
1990 DeleteAll(sdispls, rdispls, sendcnt, recvcnt);
1991
1992 #if defined(GNU_PARALLEL) && defined(_OPENMP)
1993 __gnu_parallel::sort(tosort.begin(), tosort.end());
1994 #else
1995 std::sort(tosort.begin(), tosort.end());
1996 #endif
1997
1998 Inverted.ind.reserve(totrecv);
1999 Inverted.num.reserve(totrecv);
2000 IT lastIndex=-1;
2001
2002 // not threaded because Inverted.ind is kept sorted
2003 for(typename std::vector<std::pair<IT,NT>>::iterator itr = tosort.begin(); itr != tosort.end(); ++itr)
2004 {
2005 if(lastIndex!=itr->first) // avoid duplicate indices
2006 {
2007 Inverted.ind.push_back(itr->first);
2008 Inverted.num.push_back(itr->second);
2009 }
2010 lastIndex = itr->first;
2011
2012 }
2013
2014 return Inverted;
2015
2016 }
2017
2018
2019
2020 /*
2021 // generalized invert taking binary operations to define index and values of the inverted vector
2022 // _BinaryOperationDuplicate: function to reduce duplicate entries
2023 */
2024
2025 template <class IT, class NT>
2026 template <typename _BinaryOperationIdx, typename _BinaryOperationVal, typename _BinaryOperationDuplicate>
Invert(IT globallen,_BinaryOperationIdx __binopIdx,_BinaryOperationVal __binopVal,_BinaryOperationDuplicate __binopDuplicate)2027 FullyDistSpVec<IT,NT> FullyDistSpVec<IT,NT>::Invert (IT globallen, _BinaryOperationIdx __binopIdx, _BinaryOperationVal __binopVal, _BinaryOperationDuplicate __binopDuplicate)
2028
2029 {
2030
2031 FullyDistSpVec<IT,NT> Inverted(commGrid, globallen);
2032
2033
2034 // identify the max index in the composed vector
2035 IT localmax = (IT) 0;
2036 for(size_t k=0; k < num.size(); ++k)
2037 {
2038 localmax = std::max(localmax, __binopIdx(num[k], ind[k] + LengthUntil()));
2039 }
2040 IT globalmax = (IT) 0;
2041 MPI_Allreduce( &localmax, &globalmax, 1, MPIType<IT>(), MPI_MAX, commGrid->GetWorld());
2042
2043 if(globalmax >= globallen)
2044 {
2045 std::cout << "Sparse vector has entries (" << globalmax << ") larger than requested global vector length " << globallen << std::endl;
2046 return Inverted;
2047 }
2048
2049
2050
2051 MPI_Comm World = commGrid->GetWorld();
2052 int nprocs = commGrid->GetSize();
2053 int * rdispls = new int[nprocs+1];
2054 int * recvcnt = new int[nprocs];
2055 int * sendcnt = new int[nprocs](); // initialize to 0
2056 int * sdispls = new int[nprocs+1];
2057
2058
2059 IT ploclen = getlocnnz();
2060 #ifdef _OPENMP
2061 #pragma omp parallel for
2062 #endif
2063 for(IT k=0; k < ploclen; ++k)
2064 {
2065 IT locind;
2066 IT globind = __binopIdx(num[k], ind[k] + LengthUntil()); // get global index of the inverted vector
2067 int owner = Inverted.Owner(globind, locind);
2068
2069 #ifdef _OPENMP
2070 __sync_fetch_and_add(&sendcnt[owner], 1);
2071 #else
2072 sendcnt[owner]++;
2073 #endif
2074 }
2075
2076
2077 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, World);
2078
2079 sdispls[0] = 0;
2080 rdispls[0] = 0;
2081 for(int i=0; i<nprocs; ++i)
2082 {
2083 sdispls[i+1] = sdispls[i] + sendcnt[i];
2084 rdispls[i+1] = rdispls[i] + recvcnt[i];
2085 }
2086
2087
2088 NT * datbuf = new NT[ploclen];
2089 IT * indbuf = new IT[ploclen];
2090 int *count = new int[nprocs](); //current position
2091 #ifdef _OPENMP
2092 #pragma omp parallel for
2093 #endif
2094 for(IT i=0; i < ploclen; ++i)
2095 {
2096 IT locind;
2097 IT globind = __binopIdx(num[i], ind[i] + LengthUntil()); // get global index of the inverted vector
2098 int owner = Inverted.Owner(globind, locind);
2099 int id;
2100 #ifdef _OPENMP
2101 id = sdispls[owner] + __sync_fetch_and_add(&count[owner], 1);
2102 #else
2103 id = sdispls[owner] + count[owner];
2104 count[owner]++;
2105 #endif
2106 datbuf[id] = __binopVal(num[i], ind[i] + LengthUntil());
2107 indbuf[id] = locind;
2108 }
2109 delete [] count;
2110
2111
2112 IT totrecv = rdispls[nprocs];
2113 NT * recvdatbuf = new NT[totrecv];
2114 MPI_Alltoallv(datbuf, sendcnt, sdispls, MPIType<NT>(), recvdatbuf, recvcnt, rdispls, MPIType<NT>(), World);
2115 delete [] datbuf;
2116
2117 IT * recvindbuf = new IT[totrecv];
2118 MPI_Alltoallv(indbuf, sendcnt, sdispls, MPIType<IT>(), recvindbuf, recvcnt, rdispls, MPIType<IT>(), World);
2119 delete [] indbuf;
2120
2121
2122 std::vector< std::pair<IT,NT> > tosort;
2123 tosort.resize(totrecv);
2124 #ifdef _OPENMP
2125 #pragma omp parallel for
2126 #endif
2127 for(int i=0; i<totrecv; ++i)
2128 {
2129 tosort[i] = std::make_pair(recvindbuf[i], recvdatbuf[i]);
2130 }
2131 DeleteAll(recvindbuf, recvdatbuf);
2132 DeleteAll(sdispls, rdispls, sendcnt, recvcnt);
2133
2134 #if defined(GNU_PARALLEL) && defined(_OPENMP)
2135 __gnu_parallel::sort(tosort.begin(), tosort.end());
2136 #else
2137 std::sort(tosort.begin(), tosort.end());
2138 #endif
2139
2140 Inverted.ind.reserve(totrecv);
2141 Inverted.num.reserve(totrecv);
2142
2143
2144 // not threaded because Inverted.ind is kept sorted
2145 for(typename std::vector<std::pair<IT,NT>>::iterator itr = tosort.begin(); itr != tosort.end(); )
2146 {
2147 IT ind = itr->first;
2148 NT val = itr->second;
2149 ++itr;
2150
2151 while(itr != tosort.end() && itr->first == ind)
2152 {
2153 val = __binopDuplicate(val, itr->second);
2154 ++itr;
2155 }
2156
2157
2158 Inverted.ind.push_back(ind);
2159 Inverted.num.push_back(val);
2160
2161 }
2162
2163 return Inverted;
2164
2165 }
2166
2167
2168 // Invert using RMA
2169
2170 template <class IT, class NT>
2171 template <typename _BinaryOperationIdx, typename _BinaryOperationVal>
InvertRMA(IT globallen,_BinaryOperationIdx __binopIdx,_BinaryOperationVal __binopVal)2172 FullyDistSpVec<IT,NT> FullyDistSpVec<IT,NT>::InvertRMA (IT globallen, _BinaryOperationIdx __binopIdx, _BinaryOperationVal __binopVal)
2173
2174 {
2175
2176 FullyDistSpVec<IT,NT> Inverted(commGrid, globallen);
2177 int myrank;
2178 MPI_Comm_rank(commGrid->GetWorld(), &myrank);
2179
2180 // identify the max index in the composed vector
2181 IT localmax = (IT) 0;
2182 for(size_t k=0; k < num.size(); ++k)
2183 {
2184 localmax = std::max(localmax, __binopIdx(num[k], ind[k] + LengthUntil()));
2185 }
2186 IT globalmax = (IT) 0;
2187 MPI_Allreduce( &localmax, &globalmax, 1, MPIType<IT>(), MPI_MAX, commGrid->GetWorld());
2188
2189 if(globalmax >= globallen)
2190 {
2191 std::cout << "Sparse vector has entries (" << globalmax << ") larger than requested global vector length " << globallen << std::endl;
2192 return Inverted;
2193 }
2194
2195
2196
2197 MPI_Comm World = commGrid->GetWorld();
2198 int nprocs = commGrid->GetSize();
2199 int * rdispls = new int[nprocs+1];
2200 int * recvcnt = new int[nprocs](); // initialize to 0
2201 int * sendcnt = new int[nprocs](); // initialize to 0
2202 int * sdispls = new int[nprocs+1];
2203
2204
2205 IT ploclen = getlocnnz();
2206 #ifdef _OPENMP
2207 #pragma omp parallel for
2208 #endif
2209 for(IT k=0; k < ploclen; ++k)
2210 {
2211 IT locind;
2212 IT globind = __binopIdx(num[k], ind[k] + LengthUntil()); // get global index of the inverted vector
2213 int owner = Inverted.Owner(globind, locind);
2214
2215 #ifdef _OPENMP
2216 __sync_fetch_and_add(&sendcnt[owner], 1);
2217 #else
2218 sendcnt[owner]++;
2219 #endif
2220 }
2221
2222
2223 MPI_Win win2;
2224 MPI_Win_create(recvcnt, nprocs * sizeof(MPI_INT), sizeof(MPI_INT), MPI_INFO_NULL, World, &win2);
2225 for(int i=0; i<nprocs; ++i)
2226 {
2227 if(sendcnt[i]>0)
2228 {
2229 MPI_Win_lock(MPI_LOCK_SHARED,i,MPI_MODE_NOCHECK,win2);
2230 MPI_Put(&sendcnt[i], 1, MPI_INT, i, myrank, 1, MPI_INT, win2);
2231 MPI_Win_unlock(i, win2);
2232 }
2233 }
2234 MPI_Win_free(&win2);
2235
2236
2237
2238 sdispls[0] = 0;
2239 rdispls[0] = 0;
2240 for(int i=0; i<nprocs; ++i)
2241 {
2242 sdispls[i+1] = sdispls[i] + sendcnt[i];
2243 rdispls[i+1] = rdispls[i] + recvcnt[i];
2244 }
2245
2246 int * rmadispls = new int[nprocs+1];
2247 MPI_Win win3;
2248 MPI_Win_create(rmadispls, nprocs * sizeof(MPI_INT), sizeof(MPI_INT), MPI_INFO_NULL, World, &win3);
2249 for(int i=0; i<nprocs; ++i)
2250 {
2251 if(recvcnt[i]>0)
2252 {
2253 MPI_Win_lock(MPI_LOCK_SHARED,i,MPI_MODE_NOCHECK,win3);
2254 MPI_Put(&rdispls[i], 1, MPI_INT, i, myrank, 1, MPI_INT, win3);
2255 MPI_Win_unlock(i, win3);
2256 }
2257 }
2258 MPI_Win_free(&win3);
2259
2260
2261 NT * datbuf = new NT[ploclen];
2262 IT * indbuf = new IT[ploclen];
2263 int *count = new int[nprocs](); //current position
2264 #ifdef _OPENMP
2265 #pragma omp parallel for
2266 #endif
2267 for(IT i=0; i < ploclen; ++i)
2268 {
2269 IT locind;
2270 IT globind = __binopIdx(num[i], ind[i] + LengthUntil()); // get global index of the inverted vector
2271 int owner = Inverted.Owner(globind, locind);
2272 int id;
2273 #ifdef _OPENMP
2274 id = sdispls[owner] + __sync_fetch_and_add(&count[owner], 1);
2275 #else
2276 id = sdispls[owner] + count[owner];
2277 count[owner]++;
2278 #endif
2279 datbuf[id] = __binopVal(num[i], ind[i] + LengthUntil());
2280 indbuf[id] = locind;
2281 }
2282 delete [] count;
2283
2284
2285 IT totrecv = rdispls[nprocs];
2286 NT * recvdatbuf = new NT[totrecv];
2287 IT * recvindbuf = new IT[totrecv];
2288 MPI_Win win, win1;
2289 MPI_Win_create(recvdatbuf, totrecv * sizeof(NT), sizeof(NT), MPI_INFO_NULL, commGrid->GetWorld(), &win);
2290 MPI_Win_create(recvindbuf, totrecv * sizeof(IT), sizeof(IT), MPI_INFO_NULL, commGrid->GetWorld(), &win1);
2291 //MPI_Win_fence(0, win);
2292 //MPI_Win_fence(0, win1);
2293 for(int i=0; i<nprocs; ++i)
2294 {
2295 if(sendcnt[i]>0)
2296 {
2297 MPI_Win_lock(MPI_LOCK_SHARED, i, 0, win);
2298 MPI_Put(&datbuf[sdispls[i]], sendcnt[i], MPIType<NT>(), i, rmadispls[i], sendcnt[i], MPIType<NT>(), win);
2299 MPI_Win_unlock(i, win);
2300
2301 MPI_Win_lock(MPI_LOCK_SHARED, i, 0, win1);
2302 MPI_Put(&indbuf[sdispls[i]], sendcnt[i], MPIType<IT>(), i, rmadispls[i], sendcnt[i], MPIType<IT>(), win1);
2303 MPI_Win_unlock(i, win1);
2304 }
2305 }
2306 //MPI_Win_fence(0, win);
2307 //MPI_Win_fence(0, win1);
2308 MPI_Win_free(&win);
2309 MPI_Win_free(&win1);
2310
2311 delete [] datbuf;
2312 delete [] indbuf;
2313 delete [] rmadispls;
2314
2315
2316 std::vector< std::pair<IT,NT> > tosort;
2317 tosort.resize(totrecv);
2318 #ifdef _OPENMP
2319 #pragma omp parallel for
2320 #endif
2321 for(int i=0; i<totrecv; ++i)
2322 {
2323 tosort[i] = std::make_pair(recvindbuf[i], recvdatbuf[i]);
2324 }
2325 DeleteAll(recvindbuf, recvdatbuf);
2326 DeleteAll(sdispls, rdispls, sendcnt, recvcnt);
2327
2328 #if defined(GNU_PARALLEL) && defined(_OPENMP)
2329 __gnu_parallel::sort(tosort.begin(), tosort.end());
2330 #else
2331 std::sort(tosort.begin(), tosort.end());
2332 #endif
2333
2334 Inverted.ind.reserve(totrecv);
2335 Inverted.num.reserve(totrecv);
2336 IT lastIndex=-1;
2337
2338 // not threaded because Inverted.ind is kept sorted
2339 for(typename std::vector<std::pair<IT,NT>>::iterator itr = tosort.begin(); itr != tosort.end(); ++itr)
2340 {
2341 if(lastIndex!=itr->first) // avoid duplicate indices
2342 {
2343 Inverted.ind.push_back(itr->first);
2344 Inverted.num.push_back(itr->second);
2345 }
2346 lastIndex = itr->first;
2347
2348 }
2349
2350 return Inverted;
2351
2352 }
2353
2354
2355
2356 template <typename IT, typename NT>
2357 template <typename NT1, typename _UnaryOperation>
Select(const FullyDistVec<IT,NT1> & denseVec,_UnaryOperation __unop)2358 void FullyDistSpVec<IT,NT>::Select (const FullyDistVec<IT,NT1> & denseVec, _UnaryOperation __unop)
2359 {
2360 if(*commGrid == *(denseVec.commGrid))
2361 {
2362 if(TotalLength() != denseVec.TotalLength())
2363 {
2364 std::ostringstream outs;
2365 outs << "Vector dimensions don't match (" << TotalLength() << " vs " << denseVec.TotalLength() << ") for Select\n";
2366 SpParHelper::Print(outs.str());
2367 MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
2368 }
2369 else
2370 {
2371
2372 IT spsize = getlocnnz();
2373 IT k = 0;
2374 // iterate over the sparse vector
2375 for(IT i=0; i< spsize; ++i)
2376 {
2377 if(__unop(denseVec.arr[ind[i]]))
2378 {
2379 ind[k] = ind[i];
2380 num[k++] = num[i];
2381 }
2382 }
2383 ind.resize(k);
2384 num.resize(k);
2385 }
2386 }
2387 else
2388 {
2389 std::ostringstream outs;
2390 outs << "Grids are not comparable for Select" << std::endl;
2391 SpParHelper::Print(outs.str());
2392 MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2393 }
2394 }
2395
2396
2397 // \todo: Shouldn't this wrap EWiseApply for code maintanence instead?
2398 template <typename IT, typename NT>
2399 template <typename NT1>
Setminus(const FullyDistSpVec<IT,NT1> & other)2400 void FullyDistSpVec<IT,NT>::Setminus (const FullyDistSpVec<IT,NT1> & other)
2401 {
2402 if(*commGrid == *(other.commGrid))
2403 {
2404 if(TotalLength() != other.TotalLength())
2405 {
2406 std::ostringstream outs;
2407 outs << "Vector dimensions don't match (" << TotalLength() << " vs " << other.TotalLength() << ") for Select\n";
2408 SpParHelper::Print(outs.str());
2409 MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
2410 }
2411 else
2412 {
2413
2414 IT mysize = getlocnnz();
2415 IT othersize = other.getlocnnz();
2416 IT k = 0, i=0, j=0;
2417 // iterate over the sparse vector
2418 for(; i< mysize && j < othersize;)
2419 {
2420 if(other.ind[j] == ind[i]) //skip
2421 {
2422 i++; j++;
2423 }
2424 else if(other.ind[j] > ind[i])
2425 {
2426 ind[k] = ind[i];
2427 num[k++] = num[i++];
2428 }
2429 else j++;
2430 }
2431 while(i< mysize)
2432 {
2433 ind[k] = ind[i];
2434 num[k++] = num[i++];
2435 }
2436
2437 ind.resize(k);
2438 num.resize(k);
2439 }
2440 }
2441 else
2442 {
2443 std::ostringstream outs;
2444 outs << "Grids are not comparable for Select" << std::endl;
2445 SpParHelper::Print(outs.str());
2446 MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2447 }
2448 }
2449
2450
2451
2452 template <typename IT, typename NT>
2453 template <typename NT1, typename _UnaryOperation, typename _BinaryOperation>
SelectApply(const FullyDistVec<IT,NT1> & denseVec,_UnaryOperation __unop,_BinaryOperation __binop)2454 void FullyDistSpVec<IT,NT>::SelectApply (const FullyDistVec<IT,NT1> & denseVec, _UnaryOperation __unop, _BinaryOperation __binop)
2455 {
2456 if(*commGrid == *(denseVec.commGrid))
2457 {
2458 if(TotalLength() != denseVec.TotalLength())
2459 {
2460 std::ostringstream outs;
2461 outs << "Vector dimensions don't match (" << TotalLength() << " vs " << denseVec.TotalLength() << ") for Select\n";
2462 SpParHelper::Print(outs.str());
2463 MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
2464 }
2465 else
2466 {
2467
2468 IT spsize = getlocnnz();
2469 IT k = 0;
2470 // iterate over the sparse vector
2471 for(IT i=0; i< spsize; ++i)
2472 {
2473 if(__unop(denseVec.arr[ind[i]]))
2474 {
2475 ind[k] = ind[i];
2476 num[k++] = __binop(num[i], denseVec.arr[ind[i]]);
2477 }
2478 }
2479 ind.resize(k);
2480 num.resize(k);
2481 }
2482 }
2483 else
2484 {
2485 std::ostringstream outs;
2486 outs << "Grids are not comparable for Select" << std::endl;
2487 SpParHelper::Print(outs.str());
2488 MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2489 }
2490 }
2491
2492
2493 // apply an unary function to each nnz and return a new vector
2494 // can be a constrauctor
2495 /*
2496 template <typename IT, typename NT>
2497 template <typename NT1, typename _UnaryOperation>
2498 FullyDistSpVec<IT,NT1> FullyDistSpVec<IT,NT>::Apply(_UnaryOperation __unop)
2499 {
2500 FullyDistSpVec<IT,NT1> composed(commGrid, TotalLength());
2501 IT spsize = getlocnnz();
2502 for(IT i=0; i< spsize; ++i)
2503 {
2504 composed.ind.push_back(ind[i]);
2505 composed.num.push_back( __unop(num[i]));
2506 }
2507 return composed;
2508 }
2509 */
2510
2511
2512
2513 /* exp version
2514 */
2515 template <class IT, class NT>
2516 template <typename _UnaryOperation>
FilterByVal(FullyDistSpVec<IT,IT> Selector,_UnaryOperation __unop,bool filterByIndex)2517 void FullyDistSpVec<IT,NT>::FilterByVal (FullyDistSpVec<IT,IT> Selector, _UnaryOperation __unop, bool filterByIndex)
2518 {
2519 if(*commGrid != *(Selector.commGrid))
2520 {
2521 std::ostringstream outs;
2522 outs << "Grids are not comparable for Filter" << std::endl;
2523 SpParHelper::Print(outs.str());
2524 MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2525 }
2526 int nprocs = commGrid->GetSize();
2527 MPI_Comm World = commGrid->GetWorld();
2528
2529
2530 int * rdispls = new int[nprocs];
2531 int sendcnt = Selector.ind.size();
2532 int * recvcnt = new int[nprocs];
2533 MPI_Allgather(&sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, World);
2534
2535 rdispls[0] = 0;
2536 for(int i=0; i<nprocs-1; ++i)
2537 {
2538 rdispls[i+1] = rdispls[i] + recvcnt[i];
2539 }
2540
2541
2542 IT * sendbuf = new IT[sendcnt];
2543 #ifdef _OPENMP
2544 #pragma omp parallel for
2545 #endif
2546 for(int k=0; k<sendcnt; k++)
2547 {
2548 if(filterByIndex)
2549 sendbuf[k] = Selector.ind[k] + Selector.LengthUntil();
2550 else
2551 sendbuf[k] = Selector.num[k];
2552 }
2553
2554 IT totrecv = std::accumulate(recvcnt,recvcnt+nprocs, static_cast<IT>(0));
2555
2556 std::vector<IT> recvbuf;
2557 recvbuf.resize(totrecv);
2558
2559 MPI_Allgatherv(sendbuf, sendcnt, MPIType<IT>(), recvbuf.data(), recvcnt, rdispls, MPIType<IT>(), World);
2560 delete [] sendbuf;
2561 DeleteAll(rdispls,recvcnt);
2562
2563 if(!filterByIndex) // need to sort
2564 {
2565 #if defined(GNU_PARALLEL) && defined(_OPENMP)
2566 __gnu_parallel::sort(recvbuf.begin(), recvbuf.end());
2567 #else
2568 std::sort(recvbuf.begin(), recvbuf.end());
2569 #endif
2570 }
2571
2572 // now perform filter (recvbuf is sorted) // TODO: OpenMP parallel and keep things sorted
2573 IT k=0;
2574
2575 for(IT i=0; i<num.size(); i++)
2576 {
2577 IT val = __unop(num[i]);
2578 if(!std::binary_search(recvbuf.begin(), recvbuf.end(), val))
2579 {
2580 ind[k] = ind[i];
2581 num[k++] = num[i];
2582 }
2583 }
2584 ind.resize(k);
2585 num.resize(k);
2586
2587
2588 }
2589
2590 }
2591