1 /****************************************************************/
2 /* Parallel Combinatorial BLAS Library (for Graph Computations) */
3 /* version 1.6 -------------------------------------------------*/
4 /* date: 6/15/2017 ---------------------------------------------*/
5 /* authors: Ariful Azad, Aydin Buluc --------------------------*/
6 /****************************************************************/
7 /*
8 Copyright (c) 2010-2017, The Regents of the University of California
9
10 Permission is hereby granted, free of charge, to any person obtaining a copy
11 of this software and associated documentation files (the "Software"), to deal
12 in the Software without restriction, including without limitation the rights
13 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 copies of the Software, and to permit persons to whom the Software is
15 furnished to do so, subject to the following conditions:
16
17 The above copyright notice and this permission notice shall be included in
18 all copies or substantial portions of the Software.
19
20 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26 THE SOFTWARE.
27 */
28
29
30 #ifndef _PAR_FRIENDS_H_
31 #define _PAR_FRIENDS_H_
32
33 #include "mpi.h"
34 #include <iostream>
35 #include <cstdarg>
36 #include "SpParMat.h"
37 #include "SpParHelper.h"
38 #include "MPIType.h"
39 #include "Friends.h"
40 #include "OptBuf.h"
41 #include "mtSpGEMM.h"
42 #include "MultiwayMerge.h"
43
44
45 namespace combblas {
46
47 template <class IT, class NT, class DER>
48 class SpParMat;
49
50 /*************************************************************************************************/
51 /**************************** FRIEND FUNCTIONS FOR PARALLEL CLASSES ******************************/
52 /*************************************************************************************************/
53
54
55 /**
56 ** Concatenate all the FullyDistVec<IT,NT> objects into a single one
57 **/
58 template <typename IT, typename NT>
Concatenate(std::vector<FullyDistVec<IT,NT>> & vecs)59 FullyDistVec<IT,NT> Concatenate ( std::vector< FullyDistVec<IT,NT> > & vecs)
60 {
61 if(vecs.size() < 1)
62 {
63 SpParHelper::Print("Warning: Nothing to concatenate, returning empty ");
64 return FullyDistVec<IT,NT>();
65 }
66 else if (vecs.size() < 2)
67 {
68 return vecs[1];
69
70 }
71 else
72 {
73 typename std::vector< FullyDistVec<IT,NT> >::iterator it = vecs.begin();
74 std::shared_ptr<CommGrid> commGridPtr = it->getcommgrid();
75 MPI_Comm World = commGridPtr->GetWorld();
76
77 IT nglen = it->TotalLength(); // new global length
78 IT cumloclen = it->MyLocLength(); // existing cumulative local lengths
79 ++it;
80 for(; it != vecs.end(); ++it)
81 {
82 if(*(commGridPtr) != *(it->getcommgrid()))
83 {
84 SpParHelper::Print("Grids are not comparable for FullyDistVec<IT,NT>::EWiseApply\n");
85 MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
86 }
87 nglen += it->TotalLength();
88 cumloclen += it->MyLocLength();
89 }
90 FullyDistVec<IT,NT> ConCat (commGridPtr, nglen, NT());
91 int nprocs = commGridPtr->GetSize();
92
93 std::vector< std::vector< NT > > data(nprocs);
94 std::vector< std::vector< IT > > inds(nprocs);
95 IT gloffset = 0;
96 for(it = vecs.begin(); it != vecs.end(); ++it)
97 {
98 IT loclen = it->LocArrSize();
99 for(IT i=0; i < loclen; ++i)
100 {
101 IT locind;
102 IT loffset = it->LengthUntil();
103 int owner = ConCat.Owner(gloffset+loffset+i, locind);
104 data[owner].push_back(it->arr[i]);
105 inds[owner].push_back(locind);
106 }
107 gloffset += it->TotalLength();
108 }
109
110 int * sendcnt = new int[nprocs];
111 int * sdispls = new int[nprocs];
112 for(int i=0; i<nprocs; ++i)
113 sendcnt[i] = (int) data[i].size();
114
115 int * rdispls = new int[nprocs];
116 int * recvcnt = new int[nprocs];
117 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, World); // share the request counts
118 sdispls[0] = 0;
119 rdispls[0] = 0;
120 for(int i=0; i<nprocs-1; ++i)
121 {
122 sdispls[i+1] = sdispls[i] + sendcnt[i];
123 rdispls[i+1] = rdispls[i] + recvcnt[i];
124 }
125 IT totrecv = std::accumulate(recvcnt,recvcnt+nprocs,static_cast<IT>(0));
126 NT * senddatabuf = new NT[cumloclen];
127 for(int i=0; i<nprocs; ++i)
128 {
129 std::copy(data[i].begin(), data[i].end(), senddatabuf+sdispls[i]);
130 std::vector<NT>().swap(data[i]); // delete data vectors
131 }
132 NT * recvdatabuf = new NT[totrecv];
133 MPI_Alltoallv(senddatabuf, sendcnt, sdispls, MPIType<NT>(), recvdatabuf, recvcnt, rdispls, MPIType<NT>(), World); // send data
134 delete [] senddatabuf;
135
136 IT * sendindsbuf = new IT[cumloclen];
137 for(int i=0; i<nprocs; ++i)
138 {
139 std::copy(inds[i].begin(), inds[i].end(), sendindsbuf+sdispls[i]);
140 std::vector<IT>().swap(inds[i]); // delete inds vectors
141 }
142 IT * recvindsbuf = new IT[totrecv];
143 MPI_Alltoallv(sendindsbuf, sendcnt, sdispls, MPIType<IT>(), recvindsbuf, recvcnt, rdispls, MPIType<IT>(), World); // send new inds
144 DeleteAll(sendindsbuf, sendcnt, sdispls);
145
146 for(int i=0; i<nprocs; ++i)
147 {
148 for(int j = rdispls[i]; j < rdispls[i] + recvcnt[i]; ++j)
149 {
150 ConCat.arr[recvindsbuf[j]] = recvdatabuf[j];
151 }
152 }
153 DeleteAll(recvindsbuf, recvcnt, rdispls);
154 return ConCat;
155 }
156 }
157
158 template <typename MATRIXA, typename MATRIXB>
CheckSpGEMMCompliance(const MATRIXA & A,const MATRIXB & B)159 bool CheckSpGEMMCompliance(const MATRIXA & A, const MATRIXB & B)
160 {
161 if(A.getncol() != B.getnrow())
162 {
163 std::ostringstream outs;
164 outs << "Can not multiply, dimensions does not match"<< std::endl;
165 outs << A.getncol() << " != " << B.getnrow() << std::endl;
166 SpParHelper::Print(outs.str());
167 MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
168 return false;
169 }
170 if((void*) &A == (void*) &B)
171 {
172 std::ostringstream outs;
173 outs << "Can not multiply, inputs alias (make a temporary copy of one of them first)"<< std::endl;
174 SpParHelper::Print(outs.str());
175 MPI_Abort(MPI_COMM_WORLD, MATRIXALIAS);
176 return false;
177 }
178 return true;
179 }
180
181
182 // Combined logic for prune, recovery, and select
183 template <typename IT, typename NT, typename DER>
MCLPruneRecoverySelect(SpParMat<IT,NT,DER> & A,NT hardThreshold,IT selectNum,IT recoverNum,NT recoverPct,int kselectVersion)184 void MCLPruneRecoverySelect(SpParMat<IT,NT,DER> & A, NT hardThreshold, IT selectNum, IT recoverNum, NT recoverPct, int kselectVersion)
185 {
186
187 #ifdef TIMING
188 double t0, t1;
189 #endif
190 // Prune and create a new pruned matrix
191 SpParMat<IT,NT,DER> PrunedA = A.Prune(std::bind2nd(std::less_equal<NT>(), hardThreshold), false);
192 // column-wise statistics of the pruned matrix
193 FullyDistVec<IT,NT> colSums = PrunedA.Reduce(Column, std::plus<NT>(), 0.0);
194 FullyDistVec<IT,NT> nnzPerColumn = PrunedA.Reduce(Column, std::plus<NT>(), 0.0, [](NT val){return 1.0;});
195 FullyDistVec<IT,NT> pruneCols(A.getcommgrid(), A.getncol(), hardThreshold);
196 PrunedA.FreeMemory();
197
198
199 // Check if we need recovery
200 // columns with nnz < recoverNum (r)
201 FullyDistSpVec<IT,NT> recoverCols(nnzPerColumn, std::bind2nd(std::less<NT>(), recoverNum));
202 recoverCols = recoverPct;
203 // columns with nnz < r AND sum < recoverPct (pct)
204 recoverCols = EWiseApply<NT>(recoverCols, colSums,
205 [](NT spval, NT dval){return spval;},
206 [](NT spval, NT dval){return dval < spval;},
207 false, NT());
208
209 IT nrecover = recoverCols.getnnz();
210 if(nrecover > 0)
211 {
212 #ifdef TIMING
213 t0=MPI_Wtime();
214 #endif
215 A.Kselect(recoverCols, recoverNum, kselectVersion);
216
217 #ifdef TIMING
218 t1=MPI_Wtime();
219 mcl_kselecttime += (t1-t0);
220 #endif
221
222 pruneCols.Set(recoverCols);
223
224 #ifdef COMBBLAS_DEBUG
225 std::ostringstream outs;
226 outs << "Number of columns needing recovery: " << nrecover << std::endl;
227 SpParHelper::Print(outs.str());
228 #endif
229
230 }
231
232
233 if(selectNum>0)
234 {
235 // remaining columns will be up for selection
236 FullyDistSpVec<IT,NT> selectCols = EWiseApply<NT>(recoverCols, colSums,
237 [](NT spval, NT dval){return spval;},
238 [](NT spval, NT dval){return spval==-1;},
239 true, static_cast<NT>(-1));
240
241 selectCols = selectNum;
242 selectCols = EWiseApply<NT>(selectCols, nnzPerColumn,
243 [](NT spval, NT dval){return spval;},
244 [](NT spval, NT dval){return dval > spval;},
245 false, NT());
246 IT nselect = selectCols.getnnz();
247
248 if(nselect > 0 )
249 {
250 #ifdef TIMING
251 t0=MPI_Wtime();
252 #endif
253 A.Kselect(selectCols, selectNum, kselectVersion); // PrunedA would also work
254 #ifdef TIMING
255 t1=MPI_Wtime();
256 mcl_kselecttime += (t1-t0);
257 #endif
258
259 pruneCols.Set(selectCols);
260 #ifdef COMBBLAS_DEBUG
261 std::ostringstream outs;
262 outs << "Number of columns needing selection: " << nselect << std::endl;
263 SpParHelper::Print(outs.str());
264 #endif
265 #ifdef TIMING
266 t0=MPI_Wtime();
267 #endif
268 SpParMat<IT,NT,DER> selectedA = A.PruneColumn(pruneCols, std::less<NT>(), false);
269 #ifdef TIMING
270 t1=MPI_Wtime();
271 mcl_prunecolumntime += (t1-t0);
272 #endif
273 if(recoverNum>0 ) // recovery can be attempted after selection
274 {
275
276 FullyDistVec<IT,NT> nnzPerColumn1 = selectedA.Reduce(Column, std::plus<NT>(), 0.0, [](NT val){return 1.0;});
277 FullyDistVec<IT,NT> colSums1 = selectedA.Reduce(Column, std::plus<NT>(), 0.0);
278 selectedA.FreeMemory();
279
280 // slected columns with nnz < recoverNum (r)
281 selectCols = recoverNum;
282 selectCols = EWiseApply<NT>(selectCols, nnzPerColumn1,
283 [](NT spval, NT dval){return spval;},
284 [](NT spval, NT dval){return dval < spval;},
285 false, NT());
286
287 // selected columns with sum < recoverPct (pct)
288 selectCols = recoverPct;
289 selectCols = EWiseApply<NT>(selectCols, colSums1,
290 [](NT spval, NT dval){return spval;},
291 [](NT spval, NT dval){return dval < spval;},
292 false, NT());
293
294 IT n_recovery_after_select = selectCols.getnnz();
295 if(n_recovery_after_select>0)
296 {
297 // mclExpandVector2 does it on the original vector
298 // mclExpandVector1 does it one pruned vector
299 #ifdef TIMING
300 t0=MPI_Wtime();
301 #endif
302 A.Kselect(selectCols, recoverNum, kselectVersion); // Kselect on PrunedA might give different result
303 #ifdef TIMING
304 t1=MPI_Wtime();
305 mcl_kselecttime += (t1-t0);
306 #endif
307 pruneCols.Set(selectCols);
308
309 #ifdef COMBBLAS_DEBUG
310 std::ostringstream outs1;
311 outs1 << "Number of columns needing recovery after selection: " << nselect << std::endl;
312 SpParHelper::Print(outs1.str());
313 #endif
314 }
315
316 }
317 }
318 }
319
320
321 // final prune
322 #ifdef TIMING
323 t0=MPI_Wtime();
324 #endif
325 A.PruneColumn(pruneCols, std::less<NT>(), true);
326 #ifdef TIMING
327 t1=MPI_Wtime();
328 mcl_prunecolumntime += (t1-t0);
329 #endif
330 // Add loops for empty columns
331 if(recoverNum<=0 ) // if recoverNum>0, recovery would have added nonzeros in empty columns
332 {
333 FullyDistVec<IT,NT> nnzPerColumnA = A.Reduce(Column, std::plus<NT>(), 0.0, [](NT val){return 1.0;});
334 FullyDistSpVec<IT,NT> emptyColumns(nnzPerColumnA, std::bind2nd(std::equal_to<NT>(), 0.0));
335 emptyColumns = 1.00;
336 //Ariful: We need a selective AddLoops function with a sparse vector
337 //A.AddLoops(emptyColumns);
338 }
339 }
340
341
342
343
344 /**
345 * Broadcasts A multiple times (#phases) in order to save storage in the output
346 * Only uses 1/phases of C memory if the threshold/max limits are proper
347 */
348 template <typename SR, typename NUO, typename UDERO, typename IU, typename NU1, typename NU2, typename UDERA, typename UDERB>
MemEfficientSpGEMM(SpParMat<IU,NU1,UDERA> & A,SpParMat<IU,NU2,UDERB> & B,int phases,NUO hardThreshold,IU selectNum,IU recoverNum,NUO recoverPct,int kselectVersion,int64_t perProcessMemory)349 SpParMat<IU,NUO,UDERO> MemEfficientSpGEMM (SpParMat<IU,NU1,UDERA> & A, SpParMat<IU,NU2,UDERB> & B,
350 int phases, NUO hardThreshold, IU selectNum, IU recoverNum, NUO recoverPct, int kselectVersion, int64_t perProcessMemory)
351 {
352 int myrank;
353 MPI_Comm_rank(MPI_COMM_WORLD,&myrank);
354 if(A.getncol() != B.getnrow())
355 {
356 std::ostringstream outs;
357 outs << "Can not multiply, dimensions does not match"<< std::endl;
358 outs << A.getncol() << " != " << B.getnrow() << std::endl;
359 SpParHelper::Print(outs.str());
360 MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
361 return SpParMat< IU,NUO,UDERO >();
362 }
363 if(phases <1 || phases >= A.getncol())
364 {
365 SpParHelper::Print("MemEfficientSpGEMM: The value of phases is too small or large. Resetting to 1.\n");
366 phases = 1;
367 }
368
369 int stages, dummy; // last two parameters of ProductGrid are ignored for Synch multiplication
370 std::shared_ptr<CommGrid> GridC = ProductGrid((A.commGrid).get(), (B.commGrid).get(), stages, dummy, dummy);
371
372
373 if(perProcessMemory>0) // estimate the number of phases permitted by memory
374 {
375 int p;
376 MPI_Comm World = GridC->GetWorld();
377 MPI_Comm_size(World,&p);
378
379 int64_t perNNZMem_in = sizeof(IU)*2 + sizeof(NU1);
380 int64_t perNNZMem_out = sizeof(IU)*2 + sizeof(NUO);
381
382 // max nnz(A) in a porcess
383 int64_t lannz = A.getlocalnnz();
384 int64_t gannz;
385 MPI_Allreduce(&lannz, &gannz, 1, MPIType<int64_t>(), MPI_MAX, World);
386 int64_t inputMem = gannz * perNNZMem_in * 4; // for four copies (two for SUMMA)
387
388 // max nnz(A^2) stored by summa in a porcess
389 int64_t asquareNNZ = EstPerProcessNnzSUMMA(A,B);
390 int64_t asquareMem = asquareNNZ * perNNZMem_out * 2; // an extra copy in multiway merge and in selection/recovery step
391
392
393 // estimate kselect memory
394 int64_t d = ceil( (asquareNNZ * sqrt(p))/ B.getlocalcols() ); // average nnz per column in A^2 (it is an overestimate because asquareNNZ is estimated based on unmerged matrices)
395 // this is equivalent to (asquareNNZ * p) / B.getcol()
396 int64_t k = std::min(int64_t(std::max(selectNum, recoverNum)), d );
397 int64_t kselectmem = B.getlocalcols() * k * 8 * 3;
398
399 // estimate output memory
400 int64_t outputNNZ = (B.getlocalcols() * k)/sqrt(p);
401 int64_t outputMem = outputNNZ * perNNZMem_in * 2;
402
403 //inputMem + outputMem + asquareMem/phases + kselectmem/phases < memory
404 int64_t remainingMem = perProcessMemory*1000000000 - inputMem - outputMem;
405 if(remainingMem > 0)
406 {
407 phases = 1 + (asquareMem+kselectmem) / remainingMem;
408 }
409
410
411
412
413 if(myrank==0)
414 {
415 if(remainingMem < 0)
416 {
417 std::cout << "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n Warning: input and output memory requirement is greater than per-process avaiable memory. Keeping phase to the value supplied at the command line. The program may go out of memory and crash! \n !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" << std::endl;
418 }
419 #ifdef SHOW_MEMORY_USAGE
420 int64_t maxMemory = kselectmem/phases + inputMem + outputMem + asquareMem / phases;
421 if(maxMemory>1000000000)
422 std::cout << "phases: " << phases << ": per process memory: " << perProcessMemory << " GB asquareMem: " << asquareMem/1000000000.00 << " GB" << " inputMem: " << inputMem/1000000000.00 << " GB" << " outputMem: " << outputMem/1000000000.00 << " GB" << " kselectmem: " << kselectmem/1000000000.00 << " GB" << std::endl;
423 else
424 std::cout << "phases: " << phases << ": per process memory: " << perProcessMemory << " GB asquareMem: " << asquareMem/1000000.00 << " MB" << " inputMem: " << inputMem/1000000.00 << " MB" << " outputMem: " << outputMem/1000000.00 << " MB" << " kselectmem: " << kselectmem/1000000.00 << " MB" << std::endl;
425 #endif
426
427 }
428 }
429
430 IU C_m = A.spSeq->getnrow();
431 IU C_n = B.spSeq->getncol();
432
433 std::vector< UDERB > PiecesOfB;
434 UDERB CopyB = *(B.spSeq); // we allow alias matrices as input because of this local copy
435
436 CopyB.ColSplit(phases, PiecesOfB); // CopyB's memory is destroyed at this point
437 MPI_Barrier(GridC->GetWorld());
438
439
440 IU ** ARecvSizes = SpHelper::allocate2D<IU>(UDERA::esscount, stages);
441 IU ** BRecvSizes = SpHelper::allocate2D<IU>(UDERB::esscount, stages);
442
443
444
445 SpParHelper::GetSetSizes( *(A.spSeq), ARecvSizes, (A.commGrid)->GetRowWorld());
446
447 // Remotely fetched matrices are stored as pointers
448 UDERA * ARecv;
449 UDERB * BRecv;
450
451 std::vector< UDERO > toconcatenate;
452
453 int Aself = (A.commGrid)->GetRankInProcRow();
454 int Bself = (B.commGrid)->GetRankInProcCol();
455
456 for(int p = 0; p< phases; ++p)
457 {
458 SpParHelper::GetSetSizes( PiecesOfB[p], BRecvSizes, (B.commGrid)->GetColWorld());
459 std::vector< SpTuples<IU,NUO> *> tomerge;
460 for(int i = 0; i < stages; ++i)
461 {
462 std::vector<IU> ess;
463 if(i == Aself) ARecv = A.spSeq; // shallow-copy
464 else
465 {
466 ess.resize(UDERA::esscount);
467 for(int j=0; j< UDERA::esscount; ++j)
468 ess[j] = ARecvSizes[j][i]; // essentials of the ith matrix in this row
469 ARecv = new UDERA(); // first, create the object
470 }
471
472 #ifdef TIMING
473 double t0=MPI_Wtime();
474 #endif
475 SpParHelper::BCastMatrix(GridC->GetRowWorld(), *ARecv, ess, i); // then, receive its elements
476 #ifdef TIMING
477 double t1=MPI_Wtime();
478 mcl_Abcasttime += (t1-t0);
479 #endif
480 ess.clear();
481
482 if(i == Bself) BRecv = &(PiecesOfB[p]); // shallow-copy
483 else
484 {
485 ess.resize(UDERB::esscount);
486 for(int j=0; j< UDERB::esscount; ++j)
487 ess[j] = BRecvSizes[j][i];
488 BRecv = new UDERB();
489 }
490 #ifdef TIMING
491 double t2=MPI_Wtime();
492 #endif
493 SpParHelper::BCastMatrix(GridC->GetColWorld(), *BRecv, ess, i); // then, receive its elements
494 #ifdef TIMING
495 double t3=MPI_Wtime();
496 mcl_Bbcasttime += (t3-t2);
497 #endif
498
499
500 #ifdef TIMING
501 double t4=MPI_Wtime();
502 #endif
503 SpTuples<IU,NUO> * C_cont = LocalSpGEMM<SR, NUO>(*ARecv, *BRecv,i != Aself, i != Bself);
504
505 #ifdef TIMING
506 double t5=MPI_Wtime();
507 mcl_localspgemmtime += (t5-t4);
508 #endif
509
510 if(!C_cont->isZero())
511 tomerge.push_back(C_cont);
512 else
513 delete C_cont;
514
515 } // all stages executed
516
517 #ifdef SHOW_MEMORY_USAGE
518 int64_t gcnnz_unmerged, lcnnz_unmerged = 0;
519 for(size_t i = 0; i < tomerge.size(); ++i)
520 {
521 lcnnz_unmerged += tomerge[i]->getnnz();
522 }
523 MPI_Allreduce(&lcnnz_unmerged, &gcnnz_unmerged, 1, MPIType<int64_t>(), MPI_MAX, MPI_COMM_WORLD);
524 int64_t summa_memory = gcnnz_unmerged*20;//(gannz*2 + phase_nnz + gcnnz_unmerged + gannz + gannz/phases) * 20; // last two for broadcasts
525
526 if(myrank==0)
527 {
528 if(summa_memory>1000000000)
529 std::cout << p+1 << ". unmerged: " << summa_memory/1000000000.00 << "GB " ;
530 else
531 std::cout << p+1 << ". unmerged: " << summa_memory/1000000.00 << " MB " ;
532
533 }
534 #endif
535
536 #ifdef TIMING
537 double t6=MPI_Wtime();
538 #endif
539 //UDERO OnePieceOfC(MergeAll<SR>(tomerge, C_m, PiecesOfB[p].getncol(),true), false);
540 // TODO: MultiwayMerge can directly return UDERO inorder to avoid the extra copy
541 SpTuples<IU,NUO> * OnePieceOfC_tuples = MultiwayMerge<SR>(tomerge, C_m, PiecesOfB[p].getncol(),true);
542
543 #ifdef SHOW_MEMORY_USAGE
544 int64_t gcnnz_merged, lcnnz_merged ;
545 lcnnz_merged = OnePieceOfC_tuples->getnnz();
546 MPI_Allreduce(&lcnnz_merged, &gcnnz_merged, 1, MPIType<int64_t>(), MPI_MAX, MPI_COMM_WORLD);
547
548 // TODO: we can remove gcnnz_merged memory here because we don't need to concatenate anymore
549 int64_t merge_memory = gcnnz_merged*2*20;//(gannz*2 + phase_nnz + gcnnz_unmerged + gcnnz_merged*2) * 20;
550
551 if(myrank==0)
552 {
553 if(merge_memory>1000000000)
554 std::cout << " merged: " << merge_memory/1000000000.00 << "GB " ;
555 else
556 std::cout << " merged: " << merge_memory/1000000.00 << " MB " ;
557
558 }
559 #endif
560
561
562 #ifdef TIMING
563 double t7=MPI_Wtime();
564 mcl_multiwaymergetime += (t7-t6);
565 #endif
566 UDERO * OnePieceOfC = new UDERO(* OnePieceOfC_tuples, false);
567 delete OnePieceOfC_tuples;
568
569 SpParMat<IU,NUO,UDERO> OnePieceOfC_mat(OnePieceOfC, GridC);
570 MCLPruneRecoverySelect(OnePieceOfC_mat, hardThreshold, selectNum, recoverNum, recoverPct, kselectVersion);
571
572 #ifdef SHOW_MEMORY_USAGE
573 int64_t gcnnz_pruned, lcnnz_pruned ;
574 lcnnz_pruned = OnePieceOfC_mat.getlocalnnz();
575 MPI_Allreduce(&lcnnz_pruned, &gcnnz_pruned, 1, MPIType<int64_t>(), MPI_MAX, MPI_COMM_WORLD);
576
577
578 // TODO: we can remove gcnnz_merged memory here because we don't need to concatenate anymore
579 int64_t prune_memory = gcnnz_pruned*2*20;//(gannz*2 + phase_nnz + gcnnz_pruned*2) * 20 + kselectmem; // 3 extra copies of OnePieceOfC_mat, we can make it one extra copy!
580 //phase_nnz += gcnnz_pruned;
581
582 if(myrank==0)
583 {
584 if(prune_memory>1000000000)
585 std::cout << "Prune: " << prune_memory/1000000000.00 << "GB " << std::endl ;
586 else
587 std::cout << "Prune: " << prune_memory/1000000.00 << " MB " << std::endl ;
588
589 }
590 #endif
591
592 // ABAB: Change this to accept pointers to objects
593 toconcatenate.push_back(OnePieceOfC_mat.seq());
594 }
595
596
597 UDERO * C = new UDERO(0,C_m, C_n,0);
598 C->ColConcatenate(toconcatenate); // ABAB: Change this to accept a vector of pointers to pointers to DER objects
599
600
601 SpHelper::deallocate2D(ARecvSizes, UDERA::esscount);
602 SpHelper::deallocate2D(BRecvSizes, UDERA::esscount);
603 return SpParMat<IU,NUO,UDERO> (C, GridC);
604 }
605
606
607
608 /**
609 * Parallel C = A*B routine that uses a double buffered broadcasting scheme
610 * @pre { Input matrices, A and B, should not alias }
611 * Most memory efficient version available. Total stages: 2*sqrt(p)
612 * Memory requirement during first sqrt(p) stages: <= (3/2)*(nnz(A)+nnz(B))+(1/2)*nnz(C)
613 * Memory requirement during second sqrt(p) stages: <= nnz(A)+nnz(B)+nnz(C)
614 * Final memory requirement: nnz(C) if clearA and clearB are true
615 **/
616 template <typename SR, typename NUO, typename UDERO, typename IU, typename NU1, typename NU2, typename UDERA, typename UDERB>
617 SpParMat<IU,NUO,UDERO> Mult_AnXBn_DoubleBuff
618 (SpParMat<IU,NU1,UDERA> & A, SpParMat<IU,NU2,UDERB> & B, bool clearA = false, bool clearB = false )
619
620 {
621 if(!CheckSpGEMMCompliance(A,B) )
622 {
623 return SpParMat< IU,NUO,UDERO >();
624 }
625
626 int stages, dummy; // last two parameters of ProductGrid are ignored for Synch multiplication
627 std::shared_ptr<CommGrid> GridC = ProductGrid((A.commGrid).get(), (B.commGrid).get(), stages, dummy, dummy);
628 IU C_m = A.spSeq->getnrow();
629 IU C_n = B.spSeq->getncol();
630
631 UDERA * A1seq = new UDERA();
632 UDERA * A2seq = new UDERA();
633 UDERB * B1seq = new UDERB();
634 UDERB * B2seq = new UDERB();
635 (A.spSeq)->Split( *A1seq, *A2seq);
636 const_cast< UDERB* >(B.spSeq)->Transpose();
637 (B.spSeq)->Split( *B1seq, *B2seq);
638 MPI_Barrier(GridC->GetWorld());
639
640 IU ** ARecvSizes = SpHelper::allocate2D<IU>(UDERA::esscount, stages);
641 IU ** BRecvSizes = SpHelper::allocate2D<IU>(UDERB::esscount, stages);
642
643 SpParHelper::GetSetSizes( *A1seq, ARecvSizes, (A.commGrid)->GetRowWorld());
644 SpParHelper::GetSetSizes( *B1seq, BRecvSizes, (B.commGrid)->GetColWorld());
645
646 // Remotely fetched matrices are stored as pointers
647 UDERA * ARecv;
648 UDERB * BRecv;
649 std::vector< SpTuples<IU,NUO> *> tomerge;
650
651 int Aself = (A.commGrid)->GetRankInProcRow();
652 int Bself = (B.commGrid)->GetRankInProcCol();
653
654 for(int i = 0; i < stages; ++i)
655 {
656 std::vector<IU> ess;
657 if(i == Aself)
658 {
659 ARecv = A1seq; // shallow-copy
660 }
661 else
662 {
663 ess.resize(UDERA::esscount);
664 for(int j=0; j< UDERA::esscount; ++j)
665 {
666 ess[j] = ARecvSizes[j][i]; // essentials of the ith matrix in this row
667 }
668 ARecv = new UDERA(); // first, create the object
669 }
670 SpParHelper::BCastMatrix(GridC->GetRowWorld(), *ARecv, ess, i); // then, receive its elements
671 ess.clear();
672 if(i == Bself)
673 {
674 BRecv = B1seq; // shallow-copy
675 }
676 else
677 {
678 ess.resize(UDERB::esscount);
679 for(int j=0; j< UDERB::esscount; ++j)
680 {
681 ess[j] = BRecvSizes[j][i];
682 }
683 BRecv = new UDERB();
684 }
685 SpParHelper::BCastMatrix(GridC->GetColWorld(), *BRecv, ess, i); // then, receive its elements
686
687
688 SpTuples<IU,NUO> * C_cont = MultiplyReturnTuples<SR, NUO>
689 (*ARecv, *BRecv, // parameters themselves
690 false, true, // transpose information (B is transposed)
691 i != Aself, // 'delete A' condition
692 i != Bself); // 'delete B' condition
693
694 if(!C_cont->isZero())
695 tomerge.push_back(C_cont);
696 else
697 delete C_cont;
698 }
699 if(clearA) delete A1seq;
700 if(clearB) delete B1seq;
701
702 // Set the new dimensions
703 SpParHelper::GetSetSizes( *A2seq, ARecvSizes, (A.commGrid)->GetRowWorld());
704 SpParHelper::GetSetSizes( *B2seq, BRecvSizes, (B.commGrid)->GetColWorld());
705
706 // Start the second round
707 for(int i = 0; i < stages; ++i)
708 {
709 std::vector<IU> ess;
710 if(i == Aself)
711 {
712 ARecv = A2seq; // shallow-copy
713 }
714 else
715 {
716 ess.resize(UDERA::esscount);
717 for(int j=0; j< UDERA::esscount; ++j)
718 {
719 ess[j] = ARecvSizes[j][i]; // essentials of the ith matrix in this row
720 }
721 ARecv = new UDERA(); // first, create the object
722 }
723
724 SpParHelper::BCastMatrix(GridC->GetRowWorld(), *ARecv, ess, i); // then, receive its elements
725 ess.clear();
726
727 if(i == Bself)
728 {
729 BRecv = B2seq; // shallow-copy
730 }
731 else
732 {
733 ess.resize(UDERB::esscount);
734 for(int j=0; j< UDERB::esscount; ++j)
735 {
736 ess[j] = BRecvSizes[j][i];
737 }
738 BRecv = new UDERB();
739 }
740 SpParHelper::BCastMatrix(GridC->GetColWorld(), *BRecv, ess, i); // then, receive its elements
741
742 SpTuples<IU,NUO> * C_cont = MultiplyReturnTuples<SR, NUO>
743 (*ARecv, *BRecv, // parameters themselves
744 false, true, // transpose information (B is transposed)
745 i != Aself, // 'delete A' condition
746 i != Bself); // 'delete B' condition
747
748 if(!C_cont->isZero())
749 tomerge.push_back(C_cont);
750 else
751 delete C_cont;
752 }
753 SpHelper::deallocate2D(ARecvSizes, UDERA::esscount);
754 SpHelper::deallocate2D(BRecvSizes, UDERB::esscount);
755 if(clearA)
756 {
757 delete A2seq;
758 delete A.spSeq;
759 A.spSeq = NULL;
760 }
761 else
762 {
763 (A.spSeq)->Merge(*A1seq, *A2seq);
764 delete A1seq;
765 delete A2seq;
766 }
767 if(clearB)
768 {
769 delete B2seq;
770 delete B.spSeq;
771 B.spSeq = NULL;
772 }
773 else
774 {
775 (B.spSeq)->Merge(*B1seq, *B2seq);
776 delete B1seq;
777 delete B2seq;
778 const_cast< UDERB* >(B.spSeq)->Transpose(); // transpose back to original
779 }
780
781 UDERO * C = new UDERO(MergeAll<SR>(tomerge, C_m, C_n,true), false);
782 return SpParMat<IU,NUO,UDERO> (C, GridC); // return the result object
783 }
784
785
786 /**
787 * Parallel A = B*C routine that uses only MPI-1 features
788 * Relies on simple blocking broadcast
789 * @pre { Input matrices, A and B, should not alias }
790 **/
791 template <typename SR, typename NUO, typename UDERO, typename IU, typename NU1, typename NU2, typename UDERA, typename UDERB>
792 SpParMat<IU, NUO, UDERO> Mult_AnXBn_Synch
793 (SpParMat<IU,NU1,UDERA> & A, SpParMat<IU,NU2,UDERB> & B, bool clearA = false, bool clearB = false )
794
795 {
796 if(!CheckSpGEMMCompliance(A,B) )
797 {
798 return SpParMat< IU,NUO,UDERO >();
799 }
800 int stages, dummy; // last two parameters of ProductGrid are ignored for Synch multiplication
801 std::shared_ptr<CommGrid> GridC = ProductGrid((A.commGrid).get(), (B.commGrid).get(), stages, dummy, dummy);
802 IU C_m = A.spSeq->getnrow();
803 IU C_n = B.spSeq->getncol();
804
805 //const_cast< UDERB* >(B.spSeq)->Transpose(); // do not transpose for colum-by-column multiplication
806 MPI_Barrier(GridC->GetWorld());
807
808 IU ** ARecvSizes = SpHelper::allocate2D<IU>(UDERA::esscount, stages);
809 IU ** BRecvSizes = SpHelper::allocate2D<IU>(UDERB::esscount, stages);
810
811 SpParHelper::GetSetSizes( *(A.spSeq), ARecvSizes, (A.commGrid)->GetRowWorld());
812 SpParHelper::GetSetSizes( *(B.spSeq), BRecvSizes, (B.commGrid)->GetColWorld());
813
814 // Remotely fetched matrices are stored as pointers
815 UDERA * ARecv;
816 UDERB * BRecv;
817 std::vector< SpTuples<IU,NUO> *> tomerge;
818
819 int Aself = (A.commGrid)->GetRankInProcRow();
820 int Bself = (B.commGrid)->GetRankInProcCol();
821
822 for(int i = 0; i < stages; ++i)
823 {
824 std::vector<IU> ess;
825 if(i == Aself)
826 {
827 ARecv = A.spSeq; // shallow-copy
828 }
829 else
830 {
831 ess.resize(UDERA::esscount);
832 for(int j=0; j< UDERA::esscount; ++j)
833 {
834 ess[j] = ARecvSizes[j][i]; // essentials of the ith matrix in this row
835 }
836 ARecv = new UDERA(); // first, create the object
837 }
838
839 SpParHelper::BCastMatrix(GridC->GetRowWorld(), *ARecv, ess, i); // then, receive its elements
840 ess.clear();
841
842 if(i == Bself)
843 {
844 BRecv = B.spSeq; // shallow-copy
845 }
846 else
847 {
848 ess.resize(UDERB::esscount);
849 for(int j=0; j< UDERB::esscount; ++j)
850 {
851 ess[j] = BRecvSizes[j][i];
852 }
853 BRecv = new UDERB();
854 }
855
856 SpParHelper::BCastMatrix(GridC->GetColWorld(), *BRecv, ess, i); // then, receive its elements
857
858 /*
859 // before activating this transpose B first
860 SpTuples<IU,NUO> * C_cont = MultiplyReturnTuples<SR, NUO>
861 (*ARecv, *BRecv, // parameters themselves
862 false, true, // transpose information (B is transposed)
863 i != Aself, // 'delete A' condition
864 i != Bself); // 'delete B' condition
865 */
866
867 SpTuples<IU,NUO> * C_cont = LocalSpGEMM<SR, NUO>
868 (*ARecv, *BRecv, // parameters themselves
869 i != Aself, // 'delete A' condition
870 i != Bself); // 'delete B' condition
871
872
873 if(!C_cont->isZero())
874 tomerge.push_back(C_cont);
875
876 #ifdef COMBBLAS_DEBUG
877 std::ostringstream outs;
878 outs << i << "th SUMMA iteration"<< std::endl;
879 SpParHelper::Print(outs.str());
880 #endif
881 }
882 if(clearA && A.spSeq != NULL)
883 {
884 delete A.spSeq;
885 A.spSeq = NULL;
886 }
887 if(clearB && B.spSeq != NULL)
888 {
889 delete B.spSeq;
890 B.spSeq = NULL;
891 }
892
893 SpHelper::deallocate2D(ARecvSizes, UDERA::esscount);
894 SpHelper::deallocate2D(BRecvSizes, UDERB::esscount);
895
896 //UDERO * C = new UDERO(MergeAll<SR>(tomerge, C_m, C_n,true), false);
897 // First get the result in SpTuples, then convert to UDER
898 // the last parameter to MergeAll deletes tomerge arrays
899
900 SpTuples<IU,NUO> * C_tuples = MultiwayMerge<SR>(tomerge, C_m, C_n,true);
901 UDERO * C = new UDERO(*C_tuples, false);
902
903 //if(!clearB)
904 // const_cast< UDERB* >(B.spSeq)->Transpose(); // transpose back to original
905
906 return SpParMat<IU,NUO,UDERO> (C, GridC); // return the result object
907 }
908
909
910
911 /**
912 * Estimate the maximum nnz needed to store in a process from all stages of SUMMA before reduction
913 * @pre { Input matrices, A and B, should not alias }
914 **/
915 template <typename IU, typename NU1, typename NU2, typename UDERA, typename UDERB>
EstPerProcessNnzSUMMA(SpParMat<IU,NU1,UDERA> & A,SpParMat<IU,NU2,UDERB> & B)916 int64_t EstPerProcessNnzSUMMA(SpParMat<IU,NU1,UDERA> & A, SpParMat<IU,NU2,UDERB> & B)
917
918 {
919 int64_t nnzC_SUMMA = 0;
920
921 if(A.getncol() != B.getnrow())
922 {
923 std::ostringstream outs;
924 outs << "Can not multiply, dimensions does not match"<< std::endl;
925 outs << A.getncol() << " != " << B.getnrow() << std::endl;
926 SpParHelper::Print(outs.str());
927 MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
928 return nnzC_SUMMA;
929 }
930
931 int stages, dummy; // last two parameters of ProductGrid are ignored for Synch multiplication
932 std::shared_ptr<CommGrid> GridC = ProductGrid((A.commGrid).get(), (B.commGrid).get(), stages, dummy, dummy);
933
934 MPI_Barrier(GridC->GetWorld());
935
936 IU ** ARecvSizes = SpHelper::allocate2D<IU>(UDERA::esscount, stages);
937 IU ** BRecvSizes = SpHelper::allocate2D<IU>(UDERB::esscount, stages);
938 SpParHelper::GetSetSizes( *(A.spSeq), ARecvSizes, (A.commGrid)->GetRowWorld());
939 SpParHelper::GetSetSizes( *(B.spSeq), BRecvSizes, (B.commGrid)->GetColWorld());
940
941 // Remotely fetched matrices are stored as pointers
942 UDERA * ARecv;
943 UDERB * BRecv;
944
945 int Aself = (A.commGrid)->GetRankInProcRow();
946 int Bself = (B.commGrid)->GetRankInProcCol();
947
948
949 for(int i = 0; i < stages; ++i)
950 {
951 std::vector<IU> ess;
952 if(i == Aself)
953 {
954 ARecv = A.spSeq; // shallow-copy
955 }
956 else
957 {
958 ess.resize(UDERA::esscount);
959 for(int j=0; j< UDERA::esscount; ++j)
960 {
961 ess[j] = ARecvSizes[j][i]; // essentials of the ith matrix in this row
962 }
963 ARecv = new UDERA(); // first, create the object
964 }
965
966 SpParHelper::BCastMatrix(GridC->GetRowWorld(), *ARecv, ess, i); // then, receive its elements
967 ess.clear();
968
969 if(i == Bself)
970 {
971 BRecv = B.spSeq; // shallow-copy
972 }
973 else
974 {
975 ess.resize(UDERB::esscount);
976 for(int j=0; j< UDERB::esscount; ++j)
977 {
978 ess[j] = BRecvSizes[j][i];
979 }
980 BRecv = new UDERB();
981 }
982
983 SpParHelper::BCastMatrix(GridC->GetColWorld(), *BRecv, ess, i); // then, receive its elements
984
985
986 IU* colnnzC = estimateNNZ(*ARecv, *BRecv);
987
988
989 IU nzc = BRecv->GetDCSC()->nzc;
990 IU nnzC_stage = 0;
991 #ifdef THREADED
992 #pragma omp parallel for reduction (+:nnzC_stage)
993 #endif
994 for (IU k=0; k<nzc; k++)
995 {
996 nnzC_stage = nnzC_stage + colnnzC[k];
997 }
998 nnzC_SUMMA += nnzC_stage;
999
1000 // delete received data
1001 if(i != Aself)
1002 delete ARecv;
1003 if(i != Bself)
1004 delete BRecv;
1005 }
1006
1007 SpHelper::deallocate2D(ARecvSizes, UDERA::esscount);
1008 SpHelper::deallocate2D(BRecvSizes, UDERB::esscount);
1009
1010 int64_t nnzC_SUMMA_max = 0;
1011 MPI_Allreduce(&nnzC_SUMMA, &nnzC_SUMMA_max, 1, MPIType<int64_t>(), MPI_MAX, GridC->GetWorld());
1012
1013 return nnzC_SUMMA_max;
1014 }
1015
1016
1017
1018
1019 template <typename MATRIX, typename VECTOR>
CheckSpMVCompliance(const MATRIX & A,const VECTOR & x)1020 void CheckSpMVCompliance(const MATRIX & A, const VECTOR & x)
1021 {
1022 if(A.getncol() != x.TotalLength())
1023 {
1024 std::ostringstream outs;
1025 outs << "Can not multiply, dimensions does not match"<< std::endl;
1026 outs << A.getncol() << " != " << x.TotalLength() << std::endl;
1027 SpParHelper::Print(outs.str());
1028 MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
1029 }
1030 if(! ( *(A.getcommgrid()) == *(x.getcommgrid())) )
1031 {
1032 std::cout << "Grids are not comparable for SpMV" << std::endl;
1033 MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
1034 }
1035 }
1036
1037
1038 template <typename SR, typename IU, typename NUM, typename UDER>
1039 FullyDistSpVec<IU,typename promote_trait<NUM,IU>::T_promote> SpMV
1040 (const SpParMat<IU,NUM,UDER> & A, const FullyDistSpVec<IU,IU> & x, bool indexisvalue, OptBuf<int32_t, typename promote_trait<NUM,IU>::T_promote > & optbuf);
1041
1042 template <typename SR, typename IU, typename NUM, typename UDER>
SpMV(const SpParMat<IU,NUM,UDER> & A,const FullyDistSpVec<IU,IU> & x,bool indexisvalue)1043 FullyDistSpVec<IU,typename promote_trait<NUM,IU>::T_promote> SpMV
1044 (const SpParMat<IU,NUM,UDER> & A, const FullyDistSpVec<IU,IU> & x, bool indexisvalue)
1045 {
1046 typedef typename promote_trait<NUM,IU>::T_promote T_promote;
1047 OptBuf<int32_t, T_promote > optbuf = OptBuf<int32_t, T_promote >();
1048 return SpMV<SR>(A, x, indexisvalue, optbuf);
1049 }
1050
1051 /**
1052 * Step 1 of the sparse SpMV algorithm
1053 * @param[in,out] trxlocnz, lenuntil,trxinds,trxnums { set or allocated }
1054 * @param[in] indexisvalue
1055 **/
1056 template<typename IU, typename NV>
TransposeVector(MPI_Comm & World,const FullyDistSpVec<IU,NV> & x,int32_t & trxlocnz,IU & lenuntil,int32_t * & trxinds,NV * & trxnums,bool indexisvalue)1057 void TransposeVector(MPI_Comm & World, const FullyDistSpVec<IU,NV> & x, int32_t & trxlocnz, IU & lenuntil, int32_t * & trxinds, NV * & trxnums, bool indexisvalue)
1058 {
1059 int32_t xlocnz = (int32_t) x.getlocnnz();
1060 int32_t roffst = (int32_t) x.RowLenUntil(); // since trxinds is int32_t
1061 int32_t roffset;
1062 IU luntil = x.LengthUntil();
1063 int diagneigh = x.commGrid->GetComplementRank();
1064
1065 MPI_Status status;
1066 MPI_Sendrecv(&roffst, 1, MPIType<int32_t>(), diagneigh, TROST, &roffset, 1, MPIType<int32_t>(), diagneigh, TROST, World, &status);
1067 MPI_Sendrecv(&xlocnz, 1, MPIType<int32_t>(), diagneigh, TRNNZ, &trxlocnz, 1, MPIType<int32_t>(), diagneigh, TRNNZ, World, &status);
1068 MPI_Sendrecv(&luntil, 1, MPIType<IU>(), diagneigh, TRLUT, &lenuntil, 1, MPIType<IU>(), diagneigh, TRLUT, World, &status);
1069
1070 // ABAB: Important observation is that local indices (given by x.ind) is 32-bit addressible
1071 // Copy them to 32 bit integers and transfer that to save 50% of off-node bandwidth
1072 trxinds = new int32_t[trxlocnz];
1073 int32_t * temp_xind = new int32_t[xlocnz];
1074 #ifdef THREADED
1075 #pragma omp parallel for
1076 #endif
1077 for(int i=0; i< xlocnz; ++i)
1078 temp_xind[i] = (int32_t) x.ind[i];
1079 MPI_Sendrecv(temp_xind, xlocnz, MPIType<int32_t>(), diagneigh, TRI, trxinds, trxlocnz, MPIType<int32_t>(), diagneigh, TRI, World, &status);
1080 delete [] temp_xind;
1081 if(!indexisvalue)
1082 {
1083 trxnums = new NV[trxlocnz];
1084 MPI_Sendrecv(const_cast<NV*>(SpHelper::p2a(x.num)), xlocnz, MPIType<NV>(), diagneigh, TRX, trxnums, trxlocnz, MPIType<NV>(), diagneigh, TRX, World, &status);
1085 }
1086
1087 std::transform(trxinds, trxinds+trxlocnz, trxinds, std::bind2nd(std::plus<int32_t>(), roffset)); // fullydist indexing (p pieces) -> matrix indexing (sqrt(p) pieces)
1088 }
1089
1090
1091 /**
1092 * Step 2 of the sparse SpMV algorithm
1093 * @param[in,out] trxinds, trxnums { deallocated }
1094 * @param[in,out] indacc, numacc { allocated }
1095 * @param[in,out] accnz { set }
1096 * @param[in] trxlocnz, lenuntil, indexisvalue
1097 **/
1098 template<typename IU, typename NV>
AllGatherVector(MPI_Comm & ColWorld,int trxlocnz,IU lenuntil,int32_t * & trxinds,NV * & trxnums,int32_t * & indacc,NV * & numacc,int & accnz,bool indexisvalue)1099 void AllGatherVector(MPI_Comm & ColWorld, int trxlocnz, IU lenuntil, int32_t * & trxinds, NV * & trxnums,
1100 int32_t * & indacc, NV * & numacc, int & accnz, bool indexisvalue)
1101 {
1102 int colneighs, colrank;
1103 MPI_Comm_size(ColWorld, &colneighs);
1104 MPI_Comm_rank(ColWorld, &colrank);
1105 int * colnz = new int[colneighs];
1106 colnz[colrank] = trxlocnz;
1107 MPI_Allgather(MPI_IN_PLACE, 1, MPI_INT, colnz, 1, MPI_INT, ColWorld);
1108 int * dpls = new int[colneighs](); // displacements (zero initialized pid)
1109 std::partial_sum(colnz, colnz+colneighs-1, dpls+1);
1110 accnz = std::accumulate(colnz, colnz+colneighs, 0);
1111 indacc = new int32_t[accnz];
1112 numacc = new NV[accnz];
1113
1114 // ABAB: Future issues here, colnz is of type int (MPI limitation)
1115 // What if the aggregate vector size along the processor row/column is not 32-bit addressible?
1116 // This will happen when n/sqrt(p) > 2^31
1117 // Currently we can solve a small problem (scale 32) with 4096 processor
1118 // For a medium problem (scale 35), we'll need 32K processors which gives sqrt(p) ~ 180
1119 // 2^35 / 180 ~ 2^29 / 3 which is not an issue !
1120
1121 #ifdef TIMING
1122 double t0=MPI_Wtime();
1123 #endif
1124 MPI_Allgatherv(trxinds, trxlocnz, MPIType<int32_t>(), indacc, colnz, dpls, MPIType<int32_t>(), ColWorld);
1125
1126 delete [] trxinds;
1127 if(indexisvalue)
1128 {
1129 IU lenuntilcol;
1130 if(colrank == 0) lenuntilcol = lenuntil;
1131 MPI_Bcast(&lenuntilcol, 1, MPIType<IU>(), 0, ColWorld);
1132 for(int i=0; i< accnz; ++i) // fill numerical values from indices
1133 {
1134 numacc[i] = indacc[i] + lenuntilcol;
1135 }
1136 }
1137 else
1138 {
1139 MPI_Allgatherv(trxnums, trxlocnz, MPIType<NV>(), numacc, colnz, dpls, MPIType<NV>(), ColWorld);
1140 delete [] trxnums;
1141 }
1142 #ifdef TIMING
1143 double t1=MPI_Wtime();
1144 cblas_allgathertime += (t1-t0);
1145 #endif
1146 DeleteAll(colnz,dpls);
1147 }
1148
1149
1150
1151 /**
1152 * Step 3 of the sparse SpMV algorithm, with the semiring
1153 * @param[in,out] optbuf {scratch space for all-to-all (fold) communication}
1154 * @param[in,out] indacc, numacc {index and values of the input vector, deleted upon exit}
1155 * @param[in,out] sendindbuf, sendnumbuf {index and values of the output vector, created}
1156 **/
1157 template<typename SR, typename IVT, typename OVT, typename IU, typename NUM, typename UDER>
LocalSpMV(const SpParMat<IU,NUM,UDER> & A,int rowneighs,OptBuf<int32_t,OVT> & optbuf,int32_t * & indacc,IVT * & numacc,int32_t * & sendindbuf,OVT * & sendnumbuf,int * & sdispls,int * sendcnt,int accnz,bool indexisvalue,PreAllocatedSPA<OVT> & SPA)1158 void LocalSpMV(const SpParMat<IU,NUM,UDER> & A, int rowneighs, OptBuf<int32_t, OVT > & optbuf, int32_t * & indacc, IVT * & numacc,
1159 int32_t * & sendindbuf, OVT * & sendnumbuf, int * & sdispls, int * sendcnt, int accnz, bool indexisvalue, PreAllocatedSPA<OVT> & SPA)
1160 {
1161 if(optbuf.totmax > 0) // graph500 optimization enabled
1162 {
1163 if(A.spSeq->getnsplit() > 0)
1164 {
1165 // optbuf.{inds/nums/dspls} and sendcnt are all pre-allocated and only filled by dcsc_gespmv_threaded
1166 generic_gespmv_threaded_setbuffers<SR> (*(A.spSeq), indacc, numacc, accnz, optbuf.inds, optbuf.nums, sendcnt, optbuf.dspls, rowneighs);
1167 }
1168 else
1169 {
1170 generic_gespmv<SR> (*(A.spSeq), indacc, numacc, accnz, optbuf.inds, optbuf.nums, sendcnt, optbuf.dspls, rowneighs, indexisvalue);
1171 }
1172 DeleteAll(indacc,numacc);
1173 }
1174 else
1175 {
1176 if(A.spSeq->getnsplit() > 0)
1177 {
1178 // sendindbuf/sendnumbuf/sdispls are all allocated and filled by dcsc_gespmv_threaded
1179 int totalsent = generic_gespmv_threaded<SR> (*(A.spSeq), indacc, numacc, accnz, sendindbuf, sendnumbuf, sdispls, rowneighs, SPA);
1180
1181 DeleteAll(indacc, numacc);
1182 for(int i=0; i<rowneighs-1; ++i)
1183 sendcnt[i] = sdispls[i+1] - sdispls[i];
1184 sendcnt[rowneighs-1] = totalsent - sdispls[rowneighs-1];
1185 }
1186 else
1187 {
1188 // default SpMSpV
1189 std::vector< int32_t > indy;
1190 std::vector< OVT > numy;
1191 generic_gespmv<SR>(*(A.spSeq), indacc, numacc, accnz, indy, numy, SPA);
1192
1193 DeleteAll(indacc, numacc);
1194
1195 int32_t bufsize = indy.size(); // as compact as possible
1196 sendindbuf = new int32_t[bufsize];
1197 sendnumbuf = new OVT[bufsize];
1198 int32_t perproc = A.getlocalrows() / rowneighs;
1199
1200 int k = 0; // index to buffer
1201 for(int i=0; i<rowneighs; ++i)
1202 {
1203 int32_t end_this = (i==rowneighs-1) ? A.getlocalrows(): (i+1)*perproc;
1204 while(k < bufsize && indy[k] < end_this)
1205 {
1206 sendindbuf[k] = indy[k] - i*perproc;
1207 sendnumbuf[k] = numy[k];
1208 ++sendcnt[i];
1209 ++k;
1210 }
1211 }
1212 sdispls = new int[rowneighs]();
1213 std::partial_sum(sendcnt, sendcnt+rowneighs-1, sdispls+1);
1214
1215 //#endif
1216
1217 }
1218 }
1219
1220 }
1221
1222
1223
1224 // non threaded
1225 template <typename SR, typename IU, typename OVT>
MergeContributions(int * listSizes,std::vector<int32_t * > & indsvec,std::vector<OVT * > & numsvec,std::vector<IU> & mergedind,std::vector<OVT> & mergednum)1226 void MergeContributions(int* listSizes, std::vector<int32_t *> & indsvec, std::vector<OVT *> & numsvec, std::vector<IU>& mergedind, std::vector<OVT>& mergednum)
1227 {
1228
1229 int nlists = indsvec.size();
1230 // this condition is checked in the caller SpMV function.
1231 // I am still putting it here for completeness
1232 if(nlists == 1)
1233 {
1234 // simply copy data
1235 int veclen = listSizes[0];
1236 mergedind.resize(veclen);
1237 mergednum.resize(veclen);
1238 for(int i=0; i<veclen; i++)
1239 {
1240 mergedind[i] = indsvec[0][i];
1241 mergednum[i] = numsvec[0][i];
1242 }
1243 return;
1244 }
1245
1246 int32_t hsize = 0;
1247 int32_t inf = std::numeric_limits<int32_t>::min();
1248 int32_t sup = std::numeric_limits<int32_t>::max();
1249 KNHeap< int32_t, int32_t > sHeap(sup, inf);
1250 int * processed = new int[nlists]();
1251 for(int i=0; i<nlists; ++i)
1252 {
1253 if(listSizes[i] > 0)
1254 {
1255 // key, list_id
1256 sHeap.insert(indsvec[i][0], i);
1257 ++hsize;
1258 }
1259 }
1260 int32_t key, locv;
1261 if(hsize > 0)
1262 {
1263 sHeap.deleteMin(&key, &locv);
1264 mergedind.push_back( static_cast<IU>(key));
1265 mergednum.push_back(numsvec[locv][0]); // nothing is processed yet
1266
1267 if( (++(processed[locv])) < listSizes[locv] )
1268 sHeap.insert(indsvec[locv][processed[locv]], locv);
1269 else
1270 --hsize;
1271 }
1272 while(hsize > 0)
1273 {
1274 sHeap.deleteMin(&key, &locv);
1275 if(mergedind.back() == static_cast<IU>(key))
1276 {
1277 mergednum.back() = SR::add(mergednum.back(), numsvec[locv][processed[locv]]);
1278 // ABAB: Benchmark actually allows us to be non-deterministic in terms of parent selection
1279 // We can just skip this addition operator (if it's a max/min select)
1280 }
1281 else
1282 {
1283 mergedind.push_back(static_cast<IU>(key));
1284 mergednum.push_back(numsvec[locv][processed[locv]]);
1285 }
1286
1287 if( (++(processed[locv])) < listSizes[locv] )
1288 sHeap.insert(indsvec[locv][processed[locv]], locv);
1289 else
1290 --hsize;
1291 }
1292 DeleteAll(processed);
1293 }
1294
1295
1296
1297 template <typename SR, typename IU, typename OVT>
MergeContributions_threaded(int * & listSizes,std::vector<int32_t * > & indsvec,std::vector<OVT * > & numsvec,std::vector<IU> & mergedind,std::vector<OVT> & mergednum,IU maxindex)1298 void MergeContributions_threaded(int * & listSizes, std::vector<int32_t *> & indsvec, std::vector<OVT *> & numsvec, std::vector<IU> & mergedind, std::vector<OVT> & mergednum, IU maxindex)
1299 {
1300
1301 int nlists = indsvec.size();
1302 // this condition is checked in the caller SpMV function.
1303 // I am still putting it here for completeness
1304 if(nlists == 1)
1305 {
1306 // simply copy data
1307 int veclen = listSizes[0];
1308 mergedind.resize(veclen);
1309 mergednum.resize(veclen);
1310
1311 #ifdef THREADED
1312 #pragma omp parallel for
1313 #endif
1314 for(int i=0; i<veclen; i++)
1315 {
1316 mergedind[i] = indsvec[0][i];
1317 mergednum[i] = numsvec[0][i];
1318 }
1319 return;
1320 }
1321
1322 int nthreads=1;
1323 #ifdef THREADED
1324 #pragma omp parallel
1325 {
1326 nthreads = omp_get_num_threads();
1327 }
1328 #endif
1329 int nsplits = 4*nthreads; // oversplit for load balance
1330 nsplits = std::min(nsplits, (int)maxindex);
1331 std::vector< std::vector<int32_t> > splitters(nlists);
1332 for(int k=0; k< nlists; k++)
1333 {
1334 splitters[k].resize(nsplits+1);
1335 splitters[k][0] = static_cast<int32_t>(0);
1336 #pragma omp parallel for
1337 for(int i=1; i< nsplits; i++)
1338 {
1339 IU cur_idx = i * (maxindex/nsplits);
1340 auto it = std::lower_bound (indsvec[k], indsvec[k] + listSizes[k], cur_idx);
1341 splitters[k][i] = (int32_t) (it - indsvec[k]);
1342 }
1343 splitters[k][nsplits] = listSizes[k];
1344 }
1345
1346 // ------ perform merge in parallel ------
1347 std::vector<std::vector<IU>> indsBuf(nsplits);
1348 std::vector<std::vector<OVT>> numsBuf(nsplits);
1349 //TODO: allocate these vectors here before calling MergeContributions
1350 #pragma omp parallel for schedule(dynamic)
1351 for(int i=0; i< nsplits; i++)
1352 {
1353 std::vector<int32_t *> tIndsVec(nlists);
1354 std::vector<OVT *> tNumsVec(nlists);
1355 std::vector<int> tLengths(nlists);
1356 for(int j=0; j< nlists; ++j)
1357 {
1358 tIndsVec[j] = indsvec[j] + splitters[j][i];
1359 tNumsVec[j] = numsvec[j] + splitters[j][i];
1360 tLengths[j]= splitters[j][i+1] - splitters[j][i];
1361
1362 }
1363 MergeContributions<SR>(tLengths.data(), tIndsVec, tNumsVec, indsBuf[i], numsBuf[i]);
1364 }
1365
1366 // ------ concatenate merged tuples processed by threads ------
1367 std::vector<IU> tdisp(nsplits+1);
1368 tdisp[0] = 0;
1369 for(int i=0; i<nsplits; ++i)
1370 {
1371 tdisp[i+1] = tdisp[i] + indsBuf[i].size();
1372 }
1373
1374 mergedind.resize(tdisp[nsplits]);
1375 mergednum.resize(tdisp[nsplits]);
1376
1377
1378 #pragma omp parallel for schedule(dynamic)
1379 for(int i=0; i< nsplits; i++)
1380 {
1381 std::copy(indsBuf[i].data() , indsBuf[i].data() + indsBuf[i].size(), mergedind.data() + tdisp[i]);
1382 std::copy(numsBuf[i].data() , numsBuf[i].data() + numsBuf[i].size(), mergednum.data() + tdisp[i]);
1383 }
1384 }
1385
1386
1387 /**
1388 * This version is the most flexible sparse matrix X sparse vector [Used in KDT]
1389 * It accepts different types for the matrix (NUM), the input vector (IVT) and the output vector (OVT)
1390 * without relying on automatic type promotion
1391 * Input (x) and output (y) vectors can be ALIASED because y is not written until the algorithm is done with x.
1392 */
1393 template <typename SR, typename IVT, typename OVT, typename IU, typename NUM, typename UDER>
SpMV(const SpParMat<IU,NUM,UDER> & A,const FullyDistSpVec<IU,IVT> & x,FullyDistSpVec<IU,OVT> & y,bool indexisvalue,OptBuf<int32_t,OVT> & optbuf,PreAllocatedSPA<OVT> & SPA)1394 void SpMV (const SpParMat<IU,NUM,UDER> & A, const FullyDistSpVec<IU,IVT> & x, FullyDistSpVec<IU,OVT> & y,
1395 bool indexisvalue, OptBuf<int32_t, OVT > & optbuf, PreAllocatedSPA<OVT> & SPA)
1396 {
1397 CheckSpMVCompliance(A,x);
1398 optbuf.MarkEmpty();
1399 y.glen = A.getnrow(); // in case it is not set already
1400
1401 MPI_Comm World = x.commGrid->GetWorld();
1402 MPI_Comm ColWorld = x.commGrid->GetColWorld();
1403 MPI_Comm RowWorld = x.commGrid->GetRowWorld();
1404
1405 int accnz;
1406 int32_t trxlocnz;
1407 IU lenuntil;
1408 int32_t *trxinds, *indacc;
1409 IVT *trxnums, *numacc;
1410
1411 #ifdef TIMING
1412 double t0=MPI_Wtime();
1413 #endif
1414
1415 TransposeVector(World, x, trxlocnz, lenuntil, trxinds, trxnums, indexisvalue);
1416
1417 #ifdef TIMING
1418 double t1=MPI_Wtime();
1419 cblas_transvectime += (t1-t0);
1420 #endif
1421
1422 if(x.commGrid->GetGridRows() > 1)
1423 {
1424 AllGatherVector(ColWorld, trxlocnz, lenuntil, trxinds, trxnums, indacc, numacc, accnz, indexisvalue); // trxindS/trxnums deallocated, indacc/numacc allocated, accnz set
1425 }
1426 else
1427 {
1428 accnz = trxlocnz;
1429 indacc = trxinds; // aliasing ptr
1430 numacc = trxnums; // aliasing ptr
1431 }
1432
1433 int rowneighs;
1434 MPI_Comm_size(RowWorld, &rowneighs);
1435 int * sendcnt = new int[rowneighs]();
1436 int32_t * sendindbuf;
1437 OVT * sendnumbuf;
1438 int * sdispls;
1439
1440 #ifdef TIMING
1441 double t2=MPI_Wtime();
1442 #endif
1443
1444 LocalSpMV<SR>(A, rowneighs, optbuf, indacc, numacc, sendindbuf, sendnumbuf, sdispls, sendcnt, accnz, indexisvalue, SPA); // indacc/numacc deallocated, sendindbuf/sendnumbuf/sdispls allocated
1445
1446 #ifdef TIMING
1447 double t3=MPI_Wtime();
1448 cblas_localspmvtime += (t3-t2);
1449 #endif
1450
1451
1452 if(x.commGrid->GetGridCols() == 1)
1453 {
1454 y.ind.resize(sendcnt[0]);
1455 y.num.resize(sendcnt[0]);
1456
1457
1458 if(optbuf.totmax > 0 ) // graph500 optimization enabled
1459 {
1460 #ifdef THREADED
1461 #pragma omp parallel for
1462 #endif
1463 for(int i=0; i<sendcnt[0]; i++)
1464 {
1465 y.ind[i] = optbuf.inds[i];
1466 y.num[i] = optbuf.nums[i];
1467 }
1468 }
1469 else
1470 {
1471 #ifdef THREADED
1472 #pragma omp parallel for
1473 #endif
1474 for(int i=0; i<sendcnt[0]; i++)
1475 {
1476 y.ind[i] = sendindbuf[i];
1477 y.num[i] = sendnumbuf[i];
1478 }
1479 DeleteAll(sendindbuf, sendnumbuf,sdispls);
1480 }
1481 delete [] sendcnt;
1482 return;
1483 }
1484 int * rdispls = new int[rowneighs];
1485 int * recvcnt = new int[rowneighs];
1486 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, RowWorld); // share the request counts
1487
1488 // receive displacements are exact whereas send displacements have slack
1489 rdispls[0] = 0;
1490 for(int i=0; i<rowneighs-1; ++i)
1491 {
1492 rdispls[i+1] = rdispls[i] + recvcnt[i];
1493 }
1494
1495 int totrecv = std::accumulate(recvcnt,recvcnt+rowneighs,0);
1496 int32_t * recvindbuf = new int32_t[totrecv];
1497 OVT * recvnumbuf = new OVT[totrecv];
1498
1499 #ifdef TIMING
1500 double t4=MPI_Wtime();
1501 #endif
1502 if(optbuf.totmax > 0 ) // graph500 optimization enabled
1503 {
1504 MPI_Alltoallv(optbuf.inds, sendcnt, optbuf.dspls, MPIType<int32_t>(), recvindbuf, recvcnt, rdispls, MPIType<int32_t>(), RowWorld);
1505 MPI_Alltoallv(optbuf.nums, sendcnt, optbuf.dspls, MPIType<OVT>(), recvnumbuf, recvcnt, rdispls, MPIType<OVT>(), RowWorld);
1506 delete [] sendcnt;
1507 }
1508 else
1509 {
1510 MPI_Alltoallv(sendindbuf, sendcnt, sdispls, MPIType<int32_t>(), recvindbuf, recvcnt, rdispls, MPIType<int32_t>(), RowWorld);
1511 MPI_Alltoallv(sendnumbuf, sendcnt, sdispls, MPIType<OVT>(), recvnumbuf, recvcnt, rdispls, MPIType<OVT>(), RowWorld);
1512 DeleteAll(sendindbuf, sendnumbuf, sendcnt, sdispls);
1513 }
1514 #ifdef TIMING
1515 double t5=MPI_Wtime();
1516 cblas_alltoalltime += (t5-t4);
1517 #endif
1518
1519 #ifdef TIMING
1520 double t6=MPI_Wtime();
1521 #endif
1522 //MergeContributions<SR>(y,recvcnt, rdispls, recvindbuf, recvnumbuf, rowneighs);
1523 // free memory of y, in case it was aliased
1524 std::vector<IU>().swap(y.ind);
1525 std::vector<OVT>().swap(y.num);
1526
1527 std::vector<int32_t *> indsvec(rowneighs);
1528 std::vector<OVT *> numsvec(rowneighs);
1529
1530 #ifdef THREADED
1531 #pragma omp parallel for
1532 #endif
1533 for(int i=0; i<rowneighs; i++)
1534 {
1535 indsvec[i] = recvindbuf+rdispls[i];
1536 numsvec[i] = recvnumbuf+rdispls[i];
1537 }
1538 #ifdef THREADED
1539 MergeContributions_threaded<SR>(recvcnt, indsvec, numsvec, y.ind, y.num, y.MyLocLength());
1540 #else
1541 MergeContributions<SR>(recvcnt, indsvec, numsvec, y.ind, y.num);
1542 #endif
1543
1544 DeleteAll(recvcnt, rdispls,recvindbuf, recvnumbuf);
1545 #ifdef TIMING
1546 double t7=MPI_Wtime();
1547 cblas_mergeconttime += (t7-t6);
1548 #endif
1549
1550 }
1551
1552
1553 template <typename SR, typename IVT, typename OVT, typename IU, typename NUM, typename UDER>
SpMV(const SpParMat<IU,NUM,UDER> & A,const FullyDistSpVec<IU,IVT> & x,FullyDistSpVec<IU,OVT> & y,bool indexisvalue,PreAllocatedSPA<OVT> & SPA)1554 void SpMV (const SpParMat<IU,NUM,UDER> & A, const FullyDistSpVec<IU,IVT> & x, FullyDistSpVec<IU,OVT> & y, bool indexisvalue, PreAllocatedSPA<OVT> & SPA)
1555 {
1556 OptBuf< int32_t, OVT > optbuf = OptBuf< int32_t,OVT >();
1557 SpMV<SR>(A, x, y, indexisvalue, optbuf, SPA);
1558 }
1559
1560 template <typename SR, typename IVT, typename OVT, typename IU, typename NUM, typename UDER>
SpMV(const SpParMat<IU,NUM,UDER> & A,const FullyDistSpVec<IU,IVT> & x,FullyDistSpVec<IU,OVT> & y,bool indexisvalue)1561 void SpMV (const SpParMat<IU,NUM,UDER> & A, const FullyDistSpVec<IU,IVT> & x, FullyDistSpVec<IU,OVT> & y, bool indexisvalue)
1562 {
1563 OptBuf< int32_t, OVT > optbuf = OptBuf< int32_t,OVT >();
1564 PreAllocatedSPA<OVT> SPA;
1565 SpMV<SR>(A, x, y, indexisvalue, optbuf, SPA);
1566 }
1567
1568 template <typename SR, typename IVT, typename OVT, typename IU, typename NUM, typename UDER>
SpMV(const SpParMat<IU,NUM,UDER> & A,const FullyDistSpVec<IU,IVT> & x,FullyDistSpVec<IU,OVT> & y,bool indexisvalue,OptBuf<int32_t,OVT> & optbuf)1569 void SpMV (const SpParMat<IU,NUM,UDER> & A, const FullyDistSpVec<IU,IVT> & x, FullyDistSpVec<IU,OVT> & y, bool indexisvalue, OptBuf<int32_t, OVT > & optbuf)
1570 {
1571 PreAllocatedSPA<OVT> SPA;
1572 SpMV<SR>(A, x, y, indexisvalue, optbuf, SPA);
1573 }
1574
1575
1576 /**
1577 * Automatic type promotion is ONLY done here, all the callee functions (in Friends.h and below) are initialized with the promoted type
1578 * If indexisvalues = true, then we do not need to transfer values for x (happens for BFS iterations with boolean matrices and integer rhs vectors)
1579 **/
1580 template <typename SR, typename IU, typename NUM, typename UDER>
SpMV(const SpParMat<IU,NUM,UDER> & A,const FullyDistSpVec<IU,IU> & x,bool indexisvalue,OptBuf<int32_t,typename promote_trait<NUM,IU>::T_promote> & optbuf)1581 FullyDistSpVec<IU,typename promote_trait<NUM,IU>::T_promote> SpMV
1582 (const SpParMat<IU,NUM,UDER> & A, const FullyDistSpVec<IU,IU> & x, bool indexisvalue, OptBuf<int32_t, typename promote_trait<NUM,IU>::T_promote > & optbuf)
1583 {
1584 typedef typename promote_trait<NUM,IU>::T_promote T_promote;
1585 FullyDistSpVec<IU, T_promote> y ( x.getcommgrid(), A.getnrow()); // identity doesn't matter for sparse vectors
1586 SpMV<SR>(A, x, y, indexisvalue, optbuf);
1587 return y;
1588 }
1589
1590 /**
1591 * Parallel dense SpMV
1592 **/
1593 template <typename SR, typename IU, typename NUM, typename NUV, typename UDER>
SpMV(const SpParMat<IU,NUM,UDER> & A,const FullyDistVec<IU,NUV> & x)1594 FullyDistVec<IU,typename promote_trait<NUM,NUV>::T_promote> SpMV
1595 (const SpParMat<IU,NUM,UDER> & A, const FullyDistVec<IU,NUV> & x )
1596 {
1597 typedef typename promote_trait<NUM,NUV>::T_promote T_promote;
1598 CheckSpMVCompliance(A, x);
1599
1600 MPI_Comm World = x.commGrid->GetWorld();
1601 MPI_Comm ColWorld = x.commGrid->GetColWorld();
1602 MPI_Comm RowWorld = x.commGrid->GetRowWorld();
1603
1604 int xsize = (int) x.LocArrSize();
1605 int trxsize = 0;
1606
1607 int diagneigh = x.commGrid->GetComplementRank();
1608 MPI_Status status;
1609 MPI_Sendrecv(&xsize, 1, MPI_INT, diagneigh, TRX, &trxsize, 1, MPI_INT, diagneigh, TRX, World, &status);
1610
1611 NUV * trxnums = new NUV[trxsize];
1612 MPI_Sendrecv(const_cast<NUV*>(SpHelper::p2a(x.arr)), xsize, MPIType<NUV>(), diagneigh, TRX, trxnums, trxsize, MPIType<NUV>(), diagneigh, TRX, World, &status);
1613
1614 int colneighs, colrank;
1615 MPI_Comm_size(ColWorld, &colneighs);
1616 MPI_Comm_rank(ColWorld, &colrank);
1617 int * colsize = new int[colneighs];
1618 colsize[colrank] = trxsize;
1619 MPI_Allgather(MPI_IN_PLACE, 1, MPI_INT, colsize, 1, MPI_INT, ColWorld);
1620 int * dpls = new int[colneighs](); // displacements (zero initialized pid)
1621 std::partial_sum(colsize, colsize+colneighs-1, dpls+1);
1622 int accsize = std::accumulate(colsize, colsize+colneighs, 0);
1623 NUV * numacc = new NUV[accsize];
1624
1625 MPI_Allgatherv(trxnums, trxsize, MPIType<NUV>(), numacc, colsize, dpls, MPIType<NUV>(), ColWorld);
1626 delete [] trxnums;
1627
1628 // serial SpMV with dense vector
1629 T_promote id = SR::id();
1630 IU ysize = A.getlocalrows();
1631 T_promote * localy = new T_promote[ysize];
1632 std::fill_n(localy, ysize, id);
1633
1634 #ifdef THREADED
1635 dcsc_gespmv_threaded<SR>(*(A.spSeq), numacc, localy);
1636 #else
1637 dcsc_gespmv<SR>(*(A.spSeq), numacc, localy);
1638 #endif
1639
1640
1641 DeleteAll(numacc,colsize, dpls);
1642
1643 // FullyDistVec<IT,NT>(shared_ptr<CommGrid> grid, IT globallen, NT initval, NT id)
1644 FullyDistVec<IU, T_promote> y ( x.commGrid, A.getnrow(), id);
1645
1646 int rowneighs;
1647 MPI_Comm_size(RowWorld, &rowneighs);
1648
1649 IU begptr, endptr;
1650 for(int i=0; i< rowneighs; ++i)
1651 {
1652 begptr = y.RowLenUntil(i);
1653 if(i == rowneighs-1)
1654 {
1655 endptr = ysize;
1656 }
1657 else
1658 {
1659 endptr = y.RowLenUntil(i+1);
1660 }
1661 MPI_Reduce(localy+begptr, SpHelper::p2a(y.arr), endptr-begptr, MPIType<T_promote>(), SR::mpi_op(), i, RowWorld);
1662 }
1663 delete [] localy;
1664 return y;
1665 }
1666
1667
1668 /**
1669 * \TODO: Old version that is no longer considered optimal
1670 * Kept for legacy purposes
1671 * To be removed when other functionals are fully tested.
1672 **/
1673 template <typename SR, typename IU, typename NUM, typename NUV, typename UDER>
SpMV(const SpParMat<IU,NUM,UDER> & A,const FullyDistSpVec<IU,NUV> & x)1674 FullyDistSpVec<IU,typename promote_trait<NUM,NUV>::T_promote> SpMV
1675 (const SpParMat<IU,NUM,UDER> & A, const FullyDistSpVec<IU,NUV> & x)
1676 {
1677 typedef typename promote_trait<NUM,NUV>::T_promote T_promote;
1678 CheckSpMVCompliance(A, x);
1679
1680 MPI_Comm World = x.commGrid->GetWorld();
1681 MPI_Comm ColWorld = x.commGrid->GetColWorld();
1682 MPI_Comm RowWorld = x.commGrid->GetRowWorld();
1683
1684 int xlocnz = (int) x.getlocnnz();
1685 int trxlocnz = 0;
1686 int roffst = x.RowLenUntil();
1687 int offset;
1688
1689 int diagneigh = x.commGrid->GetComplementRank();
1690 MPI_Status status;
1691 MPI_Sendrecv(&xlocnz, 1, MPI_INT, diagneigh, TRX, &trxlocnz, 1, MPI_INT, diagneigh, TRX, World, &status);
1692 MPI_Sendrecv(&roffst, 1, MPI_INT, diagneigh, TROST, &offset, 1, MPI_INT, diagneigh, TROST, World, &status);
1693
1694 IU * trxinds = new IU[trxlocnz];
1695 NUV * trxnums = new NUV[trxlocnz];
1696 MPI_Sendrecv(const_cast<IU*>(SpHelper::p2a(x.ind)), xlocnz, MPIType<IU>(), diagneigh, TRX, trxinds, trxlocnz, MPIType<IU>(), diagneigh, TRX, World, &status);
1697 MPI_Sendrecv(const_cast<NUV*>(SpHelper::p2a(x.num)), xlocnz, MPIType<NUV>(), diagneigh, TRX, trxnums, trxlocnz, MPIType<NUV>(), diagneigh, TRX, World, &status);
1698 std::transform(trxinds, trxinds+trxlocnz, trxinds, std::bind2nd(std::plus<IU>(), offset)); // fullydist indexing (n pieces) -> matrix indexing (sqrt(p) pieces)
1699
1700 int colneighs, colrank;
1701 MPI_Comm_size(ColWorld, &colneighs);
1702 MPI_Comm_rank(ColWorld, &colrank);
1703 int * colnz = new int[colneighs];
1704 colnz[colrank] = trxlocnz;
1705 MPI_Allgather(MPI_IN_PLACE, 1, MPI_INT, colnz, 1, MPI_INT, ColWorld);
1706 int * dpls = new int[colneighs](); // displacements (zero initialized pid)
1707 std::partial_sum(colnz, colnz+colneighs-1, dpls+1);
1708 int accnz = std::accumulate(colnz, colnz+colneighs, 0);
1709 IU * indacc = new IU[accnz];
1710 NUV * numacc = new NUV[accnz];
1711
1712 // ABAB: Future issues here, colnz is of type int (MPI limitation)
1713 // What if the aggregate vector size along the processor row/column is not 32-bit addressible?
1714 MPI_Allgatherv(trxinds, trxlocnz, MPIType<IU>(), indacc, colnz, dpls, MPIType<IU>(), ColWorld);
1715 MPI_Allgatherv(trxnums, trxlocnz, MPIType<NUV>(), numacc, colnz, dpls, MPIType<NUV>(), ColWorld);
1716 DeleteAll(trxinds, trxnums);
1717
1718 // serial SpMV with sparse vector
1719 std::vector< int32_t > indy;
1720 std::vector< T_promote > numy;
1721
1722 int32_t * tmpindacc = new int32_t[accnz];
1723 for(int i=0; i< accnz; ++i) tmpindacc[i] = indacc[i];
1724 delete [] indacc;
1725
1726 dcsc_gespmv<SR>(*(A.spSeq), tmpindacc, numacc, accnz, indy, numy); // actual multiplication
1727
1728 DeleteAll(tmpindacc, numacc);
1729 DeleteAll(colnz, dpls);
1730
1731 FullyDistSpVec<IU, T_promote> y ( x.commGrid, A.getnrow()); // identity doesn't matter for sparse vectors
1732 IU yintlen = y.MyRowLength();
1733
1734 int rowneighs;
1735 MPI_Comm_size(RowWorld,&rowneighs);
1736 std::vector< std::vector<IU> > sendind(rowneighs);
1737 std::vector< std::vector<T_promote> > sendnum(rowneighs);
1738 typename std::vector<int32_t>::size_type outnz = indy.size();
1739 for(typename std::vector<IU>::size_type i=0; i< outnz; ++i)
1740 {
1741 IU locind;
1742 int rown = y.OwnerWithinRow(yintlen, static_cast<IU>(indy[i]), locind);
1743 sendind[rown].push_back(locind);
1744 sendnum[rown].push_back(numy[i]);
1745 }
1746
1747 IU * sendindbuf = new IU[outnz];
1748 T_promote * sendnumbuf = new T_promote[outnz];
1749 int * sendcnt = new int[rowneighs];
1750 int * sdispls = new int[rowneighs];
1751 for(int i=0; i<rowneighs; ++i)
1752 sendcnt[i] = sendind[i].size();
1753
1754 int * rdispls = new int[rowneighs];
1755 int * recvcnt = new int[rowneighs];
1756 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, RowWorld); // share the request counts
1757
1758 sdispls[0] = 0;
1759 rdispls[0] = 0;
1760 for(int i=0; i<rowneighs-1; ++i)
1761 {
1762 sdispls[i+1] = sdispls[i] + sendcnt[i];
1763 rdispls[i+1] = rdispls[i] + recvcnt[i];
1764 }
1765 int totrecv = std::accumulate(recvcnt,recvcnt+rowneighs,0);
1766 IU * recvindbuf = new IU[totrecv];
1767 T_promote * recvnumbuf = new T_promote[totrecv];
1768
1769 for(int i=0; i<rowneighs; ++i)
1770 {
1771 std::copy(sendind[i].begin(), sendind[i].end(), sendindbuf+sdispls[i]);
1772 std::vector<IU>().swap(sendind[i]);
1773 }
1774 for(int i=0; i<rowneighs; ++i)
1775 {
1776 std::copy(sendnum[i].begin(), sendnum[i].end(), sendnumbuf+sdispls[i]);
1777 std::vector<T_promote>().swap(sendnum[i]);
1778 }
1779 MPI_Alltoallv(sendindbuf, sendcnt, sdispls, MPIType<IU>(), recvindbuf, recvcnt, rdispls, MPIType<IU>(), RowWorld);
1780 MPI_Alltoallv(sendnumbuf, sendcnt, sdispls, MPIType<T_promote>(), recvnumbuf, recvcnt, rdispls, MPIType<T_promote>(), RowWorld);
1781
1782 DeleteAll(sendindbuf, sendnumbuf);
1783 DeleteAll(sendcnt, recvcnt, sdispls, rdispls);
1784
1785 // define a SPA-like data structure
1786 IU ysize = y.MyLocLength();
1787 T_promote * localy = new T_promote[ysize];
1788 bool * isthere = new bool[ysize];
1789 std::vector<IU> nzinds; // nonzero indices
1790 std::fill_n(isthere, ysize, false);
1791
1792 for(int i=0; i< totrecv; ++i)
1793 {
1794 if(!isthere[recvindbuf[i]])
1795 {
1796 localy[recvindbuf[i]] = recvnumbuf[i]; // initial assignment
1797 nzinds.push_back(recvindbuf[i]);
1798 isthere[recvindbuf[i]] = true;
1799 }
1800 else
1801 {
1802 localy[recvindbuf[i]] = SR::add(localy[recvindbuf[i]], recvnumbuf[i]);
1803 }
1804 }
1805 DeleteAll(isthere, recvindbuf, recvnumbuf);
1806 sort(nzinds.begin(), nzinds.end());
1807 int nnzy = nzinds.size();
1808 y.ind.resize(nnzy);
1809 y.num.resize(nnzy);
1810 for(int i=0; i< nnzy; ++i)
1811 {
1812 y.ind[i] = nzinds[i];
1813 y.num[i] = localy[nzinds[i]];
1814 }
1815 delete [] localy;
1816 return y;
1817 }
1818
1819
1820 template <typename IU, typename NU1, typename NU2, typename UDERA, typename UDERB>
EWiseMult(const SpParMat<IU,NU1,UDERA> & A,const SpParMat<IU,NU2,UDERB> & B,bool exclude)1821 SpParMat<IU,typename promote_trait<NU1,NU2>::T_promote,typename promote_trait<UDERA,UDERB>::T_promote> EWiseMult
1822 (const SpParMat<IU,NU1,UDERA> & A, const SpParMat<IU,NU2,UDERB> & B , bool exclude)
1823 {
1824 typedef typename promote_trait<NU1,NU2>::T_promote N_promote;
1825 typedef typename promote_trait<UDERA,UDERB>::T_promote DER_promote;
1826
1827 if(*(A.commGrid) == *(B.commGrid))
1828 {
1829 DER_promote * result = new DER_promote( EWiseMult(*(A.spSeq),*(B.spSeq),exclude) );
1830 return SpParMat<IU, N_promote, DER_promote> (result, A.commGrid);
1831 }
1832 else
1833 {
1834 std::cout << "Grids are not comparable elementwise multiplication" << std::endl;
1835 MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
1836 return SpParMat< IU,N_promote,DER_promote >();
1837 }
1838 }
1839
1840 template <typename RETT, typename RETDER, typename IU, typename NU1, typename NU2, typename UDERA, typename UDERB, typename _BinaryOperation>
EWiseApply(const SpParMat<IU,NU1,UDERA> & A,const SpParMat<IU,NU2,UDERB> & B,_BinaryOperation __binary_op,bool notB,const NU2 & defaultBVal)1841 SpParMat<IU,RETT,RETDER> EWiseApply
1842 (const SpParMat<IU,NU1,UDERA> & A, const SpParMat<IU,NU2,UDERB> & B, _BinaryOperation __binary_op, bool notB, const NU2& defaultBVal)
1843 {
1844 if(*(A.commGrid) == *(B.commGrid))
1845 {
1846 RETDER * result = new RETDER( EWiseApply<RETT>(*(A.spSeq),*(B.spSeq), __binary_op, notB, defaultBVal) );
1847 return SpParMat<IU, RETT, RETDER> (result, A.commGrid);
1848 }
1849 else
1850 {
1851 std::cout << "Grids are not comparable elementwise apply" << std::endl;
1852 MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
1853 return SpParMat< IU,RETT,RETDER >();
1854 }
1855 }
1856
1857 template <typename RETT, typename RETDER, typename IU, typename NU1, typename NU2, typename UDERA, typename UDERB, typename _BinaryOperation, typename _BinaryPredicate>
EWiseApply(const SpParMat<IU,NU1,UDERA> & A,const SpParMat<IU,NU2,UDERB> & B,_BinaryOperation __binary_op,_BinaryPredicate do_op,bool allowANulls,bool allowBNulls,const NU1 & ANullVal,const NU2 & BNullVal,const bool allowIntersect,const bool useExtendedBinOp)1858 SpParMat<IU,RETT,RETDER> EWiseApply
1859 (const SpParMat<IU,NU1,UDERA> & A, const SpParMat<IU,NU2,UDERB> & B, _BinaryOperation __binary_op, _BinaryPredicate do_op, bool allowANulls, bool allowBNulls, const NU1& ANullVal, const NU2& BNullVal, const bool allowIntersect, const bool useExtendedBinOp)
1860 {
1861 if(*(A.commGrid) == *(B.commGrid))
1862 {
1863 RETDER * result = new RETDER( EWiseApply<RETT>(*(A.spSeq),*(B.spSeq), __binary_op, do_op, allowANulls, allowBNulls, ANullVal, BNullVal, allowIntersect) );
1864 return SpParMat<IU, RETT, RETDER> (result, A.commGrid);
1865 }
1866 else
1867 {
1868 std::cout << "Grids are not comparable elementwise apply" << std::endl;
1869 MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
1870 return SpParMat< IU,RETT,RETDER >();
1871 }
1872 }
1873
1874 // plain adapter
1875 template <typename RETT, typename RETDER, typename IU, typename NU1, typename NU2, typename UDERA, typename UDERB, typename _BinaryOperation, typename _BinaryPredicate>
1876 SpParMat<IU,RETT,RETDER>
1877 EWiseApply (const SpParMat<IU,NU1,UDERA> & A, const SpParMat<IU,NU2,UDERB> & B, _BinaryOperation __binary_op, _BinaryPredicate do_op, bool allowANulls, bool allowBNulls, const NU1& ANullVal, const NU2& BNullVal, const bool allowIntersect = true)
1878 {
1879 return EWiseApply<RETT, RETDER>(A, B,
1880 EWiseExtToPlainAdapter<RETT, NU1, NU2, _BinaryOperation>(__binary_op),
1881 EWiseExtToPlainAdapter<bool, NU1, NU2, _BinaryPredicate>(do_op),
1882 allowANulls, allowBNulls, ANullVal, BNullVal, allowIntersect, true);
1883 }
1884 // end adapter
1885
1886 /**
1887 * if exclude is true, then we prune all entries W[i] != zero from V
1888 * if exclude is false, then we perform a proper elementwise multiplication
1889 **/
1890 template <typename IU, typename NU1, typename NU2>
EWiseMult(const FullyDistSpVec<IU,NU1> & V,const FullyDistVec<IU,NU2> & W,bool exclude,NU2 zero)1891 FullyDistSpVec<IU,typename promote_trait<NU1,NU2>::T_promote> EWiseMult
1892 (const FullyDistSpVec<IU,NU1> & V, const FullyDistVec<IU,NU2> & W , bool exclude, NU2 zero)
1893 {
1894 typedef typename promote_trait<NU1,NU2>::T_promote T_promote;
1895
1896 if(*(V.commGrid) == *(W.commGrid))
1897 {
1898 FullyDistSpVec< IU, T_promote> Product(V.commGrid);
1899 if(V.glen != W.glen)
1900 {
1901 std::cerr << "Vector dimensions don't match for EWiseMult\n";
1902 MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
1903 }
1904 else
1905 {
1906 Product.glen = V.glen;
1907 IU size= V.getlocnnz();
1908 if(exclude)
1909 {
1910 #if defined(_OPENMP) && defined(CBLAS_EXPERIMENTAL) // not faster than serial
1911 int actual_splits = cblas_splits * 1; // 1 is the parallel slackness
1912 std::vector <IU> tlosizes (actual_splits, 0);
1913 std::vector < std::vector<IU> > tlinds(actual_splits);
1914 std::vector < std::vector<T_promote> > tlnums(actual_splits);
1915 IU tlsize = size / actual_splits;
1916 #pragma omp parallel for //schedule(dynamic, 1)
1917 for(IU t = 0; t < actual_splits; ++t)
1918 {
1919 IU tlbegin = t*tlsize;
1920 IU tlend = (t==actual_splits-1)? size : (t+1)*tlsize;
1921 for(IU i=tlbegin; i<tlend; ++i)
1922 {
1923 if(W.arr[V.ind[i]] == zero) // keep only those
1924 {
1925 tlinds[t].push_back(V.ind[i]);
1926 tlnums[t].push_back(V.num[i]);
1927 tlosizes[t]++;
1928 }
1929 }
1930 }
1931 std::vector<IU> prefix_sum(actual_splits+1,0);
1932 std::partial_sum(tlosizes.begin(), tlosizes.end(), prefix_sum.begin()+1);
1933 Product.ind.resize(prefix_sum[actual_splits]);
1934 Product.num.resize(prefix_sum[actual_splits]);
1935
1936 #pragma omp parallel for //schedule(dynamic, 1)
1937 for(IU t=0; t< actual_splits; ++t)
1938 {
1939 std::copy(tlinds[t].begin(), tlinds[t].end(), Product.ind.begin()+prefix_sum[t]);
1940 std::copy(tlnums[t].begin(), tlnums[t].end(), Product.num.begin()+prefix_sum[t]);
1941 }
1942 #else
1943 for(IU i=0; i<size; ++i)
1944 {
1945 if(W.arr[V.ind[i]] == zero) // keep only those
1946 {
1947 Product.ind.push_back(V.ind[i]);
1948 Product.num.push_back(V.num[i]);
1949 }
1950 }
1951 #endif
1952 }
1953 else
1954 {
1955 for(IU i=0; i<size; ++i)
1956 {
1957 if(W.arr[V.ind[i]] != zero) // keep only those
1958 {
1959 Product.ind.push_back(V.ind[i]);
1960 Product.num.push_back(V.num[i] * W.arr[V.ind[i]]);
1961 }
1962 }
1963 }
1964 }
1965 return Product;
1966 }
1967 else
1968 {
1969 std::cout << "Grids are not comparable elementwise multiplication" << std::endl;
1970 MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
1971 return FullyDistSpVec< IU,T_promote>();
1972 }
1973 }
1974
1975
1976 /**
1977 Threaded EWiseApply. Only called internally from EWiseApply.
1978 **/
1979 template <typename RET, typename IU, typename NU1, typename NU2, typename _BinaryOperation, typename _BinaryPredicate>
EWiseApply_threaded(const FullyDistSpVec<IU,NU1> & V,const FullyDistVec<IU,NU2> & W,_BinaryOperation _binary_op,_BinaryPredicate _doOp,bool allowVNulls,NU1 Vzero,const bool useExtendedBinOp)1980 FullyDistSpVec<IU,RET> EWiseApply_threaded
1981 (const FullyDistSpVec<IU,NU1> & V, const FullyDistVec<IU,NU2> & W , _BinaryOperation _binary_op, _BinaryPredicate _doOp, bool allowVNulls, NU1 Vzero, const bool useExtendedBinOp)
1982 {
1983 typedef RET T_promote; //typedef typename promote_trait<NU1,NU2>::T_promote T_promote;
1984 if(*(V.commGrid) == *(W.commGrid))
1985 {
1986 FullyDistSpVec< IU, T_promote> Product(V.commGrid);
1987 if(V.TotalLength() != W.TotalLength())
1988 {
1989 std::ostringstream outs;
1990 outs << "Vector dimensions don't match (" << V.TotalLength() << " vs " << W.TotalLength() << ") for EWiseApply (short version)\n";
1991 SpParHelper::Print(outs.str());
1992 MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
1993 }
1994 else
1995 {
1996 int nthreads=1;
1997 #ifdef _OPENMP
1998 #pragma omp parallel
1999 {
2000 nthreads = omp_get_num_threads();
2001 }
2002 #endif
2003
2004 Product.glen = V.glen;
2005 IU size= W.LocArrSize();
2006 IU spsize = V.getlocnnz();
2007
2008 // temporary result vectors per thread
2009 std::vector<std::vector<IU>> tProductInd(nthreads);
2010 std::vector<std::vector<T_promote>> tProductVal(nthreads);
2011 IU perthread; //chunk of tProductInd or tProductVal allocated to each thread
2012 if (allowVNulls)
2013 perthread = size/nthreads;
2014 else
2015 perthread = spsize/nthreads;
2016
2017 #ifdef _OPENMP
2018 #pragma omp parallel
2019 #endif
2020 {
2021 int curthread = 0;
2022 #ifdef _OPENMP
2023 curthread = omp_get_thread_num();
2024 #endif
2025 IU tStartIdx = perthread * curthread;
2026 IU tNextIdx = perthread * (curthread+1);
2027
2028 if (allowVNulls)
2029 {
2030 if(curthread == nthreads-1) tNextIdx = size;
2031
2032 // get sparse part for the current thread
2033 auto it = std::lower_bound (V.ind.begin(), V.ind.end(), tStartIdx);
2034 IU tSpIdx = (IU) std::distance(V.ind.begin(), it);
2035
2036 // iterate over the dense vector
2037 for(IU tIdx=tStartIdx; tIdx < tNextIdx; ++tIdx)
2038 {
2039 if(tSpIdx < spsize && V.ind[tSpIdx] < tNextIdx && V.ind[tSpIdx] == tIdx)
2040 {
2041 if (_doOp(V.num[tSpIdx], W.arr[tIdx], false, false))
2042 {
2043 tProductInd[curthread].push_back(tIdx);
2044 tProductVal[curthread].push_back (_binary_op(V.num[tSpIdx], W.arr[tIdx], false, false));
2045 }
2046 tSpIdx++;
2047 }
2048 else
2049 {
2050 if (_doOp(Vzero, W.arr[tIdx], true, false))
2051 {
2052 tProductInd[curthread].push_back(tIdx);
2053 tProductVal[curthread].push_back (_binary_op(Vzero, W.arr[tIdx], true, false));
2054 }
2055 }
2056 }
2057 }
2058 else // iterate over the sparse vector
2059 {
2060 if(curthread == nthreads-1) tNextIdx = spsize;
2061 for(IU tSpIdx=tStartIdx; tSpIdx < tNextIdx; ++tSpIdx)
2062 {
2063 if (_doOp(V.num[tSpIdx], W.arr[V.ind[tSpIdx]], false, false))
2064 {
2065
2066 tProductInd[curthread].push_back( V.ind[tSpIdx]);
2067 tProductVal[curthread].push_back (_binary_op(V.num[tSpIdx], W.arr[V.ind[tSpIdx]], false, false));
2068 }
2069 }
2070 }
2071 }
2072
2073 std::vector<IU> tdisp(nthreads+1);
2074 tdisp[0] = 0;
2075 for(int i=0; i<nthreads; ++i)
2076 {
2077 tdisp[i+1] = tdisp[i] + tProductInd[i].size();
2078 }
2079
2080 // copy results from temporary vectors
2081 Product.ind.resize(tdisp[nthreads]);
2082 Product.num.resize(tdisp[nthreads]);
2083
2084 #ifdef _OPENMP
2085 #pragma omp parallel
2086 #endif
2087 {
2088 int curthread = 0;
2089 #ifdef _OPENMP
2090 curthread = omp_get_thread_num();
2091 #endif
2092 std::copy(tProductInd[curthread].begin(), tProductInd[curthread].end(), Product.ind.data() + tdisp[curthread]);
2093 std::copy(tProductVal[curthread].begin() , tProductVal[curthread].end(), Product.num.data() + tdisp[curthread]);
2094 }
2095 }
2096 return Product;
2097 }
2098 else
2099 {
2100 std::cout << "Grids are not comparable for EWiseApply" << std::endl;
2101 MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2102 return FullyDistSpVec< IU,T_promote>();
2103 }
2104 }
2105
2106
2107
2108 /**
2109 * Performs an arbitrary binary operation _binary_op on the corresponding elements of two vectors with the result stored in a return vector ret.
2110 * The binary operatiation is only performed if the binary predicate _doOp returns true for those elements. Otherwise the binary operation is not
2111 * performed and ret does not contain an element at that position.
2112 * More formally the operation is defined as:
2113 * if (_doOp(V[i], W[i]))
2114 * ret[i] = _binary_op(V[i], W[i])
2115 * else
2116 * // ret[i] is not set
2117 * Hence _doOp can be used to implement a filter on either of the vectors.
2118 *
2119 * The above is only defined if both V[i] and W[i] exist (i.e. an intersection). To allow a union operation (ex. when V[i] doesn't exist but W[i] does)
2120 * the allowVNulls flag is set to true and the Vzero argument is used as the missing V[i] value.
2121 *
2122 * The type of each element of ret must not necessarily be related to the types of V or W, so the return type must be explicitly specified as a template parameter:
2123 * FullyDistSpVec<int, double> r = EWiseApply<double>(V, W, plus, retTrue, false, 0)
2124 **/
2125 template <typename RET, typename IU, typename NU1, typename NU2, typename _BinaryOperation, typename _BinaryPredicate>
EWiseApply(const FullyDistSpVec<IU,NU1> & V,const FullyDistVec<IU,NU2> & W,_BinaryOperation _binary_op,_BinaryPredicate _doOp,bool allowVNulls,NU1 Vzero,const bool useExtendedBinOp)2126 FullyDistSpVec<IU,RET> EWiseApply
2127 (const FullyDistSpVec<IU,NU1> & V, const FullyDistVec<IU,NU2> & W , _BinaryOperation _binary_op, _BinaryPredicate _doOp, bool allowVNulls, NU1 Vzero, const bool useExtendedBinOp)
2128 {
2129
2130 #ifdef _OPENMP
2131 return EWiseApply_threaded<RET>(V, W, _binary_op, _doOp, allowVNulls, Vzero, useExtendedBinOp);
2132
2133 #else
2134 typedef RET T_promote; //typedef typename promote_trait<NU1,NU2>::T_promote T_promote;
2135 if(*(V.commGrid) == *(W.commGrid))
2136 {
2137 FullyDistSpVec< IU, T_promote> Product(V.commGrid);
2138 //FullyDistVec< IU, NU1> DV (V); // Ariful: I am not sure why it was there??
2139 if(V.TotalLength() != W.TotalLength())
2140 {
2141 std::ostringstream outs;
2142 outs << "Vector dimensions don't match (" << V.TotalLength() << " vs " << W.TotalLength() << ") for EWiseApply (short version)\n";
2143 SpParHelper::Print(outs.str());
2144 MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
2145 }
2146 else
2147 {
2148 Product.glen = V.glen;
2149 IU size= W.LocArrSize();
2150 IU spsize = V.getlocnnz();
2151 IU sp_iter = 0;
2152 if (allowVNulls)
2153 {
2154 // iterate over the dense vector
2155 for(IU i=0; i<size; ++i)
2156 {
2157 if(sp_iter < spsize && V.ind[sp_iter] == i)
2158 {
2159 if (_doOp(V.num[sp_iter], W.arr[i], false, false))
2160 {
2161 Product.ind.push_back(i);
2162 Product.num.push_back(_binary_op(V.num[sp_iter], W.arr[i], false, false));
2163 }
2164 sp_iter++;
2165 }
2166 else
2167 {
2168 if (_doOp(Vzero, W.arr[i], true, false))
2169 {
2170 Product.ind.push_back(i);
2171 Product.num.push_back(_binary_op(Vzero, W.arr[i], true, false));
2172 }
2173 }
2174 }
2175 }
2176 else
2177 {
2178 // iterate over the sparse vector
2179 for(sp_iter = 0; sp_iter < spsize; ++sp_iter)
2180 {
2181 if (_doOp(V.num[sp_iter], W.arr[V.ind[sp_iter]], false, false))
2182 {
2183 Product.ind.push_back(V.ind[sp_iter]);
2184 Product.num.push_back(_binary_op(V.num[sp_iter], W.arr[V.ind[sp_iter]], false, false));
2185 }
2186 }
2187
2188 }
2189 }
2190 return Product;
2191 }
2192 else
2193 {
2194 std::cout << "Grids are not comparable for EWiseApply" << std::endl;
2195 MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2196 return FullyDistSpVec< IU,T_promote>();
2197 }
2198 #endif
2199 }
2200
2201
2202
2203 /**
2204 * Performs an arbitrary binary operation _binary_op on the corresponding elements of two vectors with the result stored in a return vector ret.
2205 * The binary operatiation is only performed if the binary predicate _doOp returns true for those elements. Otherwise the binary operation is not
2206 * performed and ret does not contain an element at that position.
2207 * More formally the operation is defined as:
2208 * if (_doOp(V[i], W[i]))
2209 * ret[i] = _binary_op(V[i], W[i])
2210 * else
2211 * // ret[i] is not set
2212 * Hence _doOp can be used to implement a filter on either of the vectors.
2213 *
2214 * The above is only defined if both V[i] and W[i] exist (i.e. an intersection). To allow a union operation (ex. when V[i] doesn't exist but W[i] does)
2215 * the allowVNulls flag is set to true and the Vzero argument is used as the missing V[i] value.
2216 * !allowVNulls && !allowWNulls => intersection
2217 * !allowVNulls && allowWNulls => operate on all elements of V
2218 * allowVNulls && !allowWNulls => operate on all elements of W
2219 * allowVNulls && allowWNulls => union
2220 *
2221 * The type of each element of ret must not necessarily be related to the types of V or W, so the return type must be explicitly specified as a template parameter:
2222 * FullyDistSpVec<int, double> r = EWiseApply<double>(V, W, plus, ...)
2223 * For intersection, Vzero and Wzero are irrelevant
2224 * ABAB: \todo: Should allowIntersect be "false" for all SetDifference uses?
2225 **/
2226 template <typename RET, typename IU, typename NU1, typename NU2, typename _BinaryOperation, typename _BinaryPredicate>
EWiseApply(const FullyDistSpVec<IU,NU1> & V,const FullyDistSpVec<IU,NU2> & W,_BinaryOperation _binary_op,_BinaryPredicate _doOp,bool allowVNulls,bool allowWNulls,NU1 Vzero,NU2 Wzero,const bool allowIntersect,const bool useExtendedBinOp)2227 FullyDistSpVec<IU,RET> EWiseApply
2228 (const FullyDistSpVec<IU,NU1> & V, const FullyDistSpVec<IU,NU2> & W , _BinaryOperation _binary_op, _BinaryPredicate _doOp, bool allowVNulls, bool allowWNulls, NU1 Vzero, NU2 Wzero, const bool allowIntersect, const bool useExtendedBinOp)
2229 {
2230
2231 typedef RET T_promote; // typename promote_trait<NU1,NU2>::T_promote T_promote;
2232 if(*(V.commGrid) == *(W.commGrid))
2233 {
2234 FullyDistSpVec< IU, T_promote> Product(V.commGrid);
2235 if(V.glen != W.glen)
2236 {
2237 std::ostringstream outs;
2238 outs << "Vector dimensions don't match (" << V.glen << " vs " << W.glen << ") for EWiseApply (full version)\n";
2239 SpParHelper::Print(outs.str());
2240 MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
2241 }
2242 else
2243 {
2244 Product.glen = V.glen;
2245 typename std::vector< IU >::const_iterator indV = V.ind.begin();
2246 typename std::vector< NU1 >::const_iterator numV = V.num.begin();
2247 typename std::vector< IU >::const_iterator indW = W.ind.begin();
2248 typename std::vector< NU2 >::const_iterator numW = W.num.begin();
2249
2250 while (indV < V.ind.end() && indW < W.ind.end())
2251 {
2252 if (*indV == *indW)
2253 {
2254 // overlap
2255 if (allowIntersect)
2256 {
2257 if (_doOp(*numV, *numW, false, false))
2258 {
2259 Product.ind.push_back(*indV);
2260 Product.num.push_back(_binary_op(*numV, *numW, false, false));
2261 }
2262 }
2263 indV++; numV++;
2264 indW++; numW++;
2265 }
2266 else if (*indV < *indW)
2267 {
2268 // V has value but W does not
2269 if (allowWNulls)
2270 {
2271 if (_doOp(*numV, Wzero, false, true))
2272 {
2273 Product.ind.push_back(*indV);
2274 Product.num.push_back(_binary_op(*numV, Wzero, false, true));
2275 }
2276 }
2277 indV++; numV++;
2278 }
2279 else //(*indV > *indW)
2280 {
2281 // W has value but V does not
2282 if (allowVNulls)
2283 {
2284 if (_doOp(Vzero, *numW, true, false))
2285 {
2286 Product.ind.push_back(*indW);
2287 Product.num.push_back(_binary_op(Vzero, *numW, true, false));
2288 }
2289 }
2290 indW++; numW++;
2291 }
2292 }
2293 // clean up
2294 while (allowWNulls && indV < V.ind.end())
2295 {
2296 if (_doOp(*numV, Wzero, false, true))
2297 {
2298 Product.ind.push_back(*indV);
2299 Product.num.push_back(_binary_op(*numV, Wzero, false, true));
2300 }
2301 indV++; numV++;
2302 }
2303 while (allowVNulls && indW < W.ind.end())
2304 {
2305 if (_doOp(Vzero, *numW, true, false))
2306 {
2307 Product.ind.push_back(*indW);
2308 Product.num.push_back(_binary_op(Vzero, *numW, true, false));
2309 }
2310 indW++; numW++;
2311 }
2312 }
2313 return Product;
2314 }
2315 else
2316 {
2317 std::cout << "Grids are not comparable for EWiseApply" << std::endl;
2318 MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2319 return FullyDistSpVec< IU,T_promote>();
2320 }
2321 }
2322
2323 // plain callback versions
2324 template <typename RET, typename IU, typename NU1, typename NU2, typename _BinaryOperation, typename _BinaryPredicate>
EWiseApply(const FullyDistSpVec<IU,NU1> & V,const FullyDistVec<IU,NU2> & W,_BinaryOperation _binary_op,_BinaryPredicate _doOp,bool allowVNulls,NU1 Vzero)2325 FullyDistSpVec<IU,RET> EWiseApply
2326 (const FullyDistSpVec<IU,NU1> & V, const FullyDistVec<IU,NU2> & W , _BinaryOperation _binary_op, _BinaryPredicate _doOp, bool allowVNulls, NU1 Vzero)
2327 {
2328
2329
2330 return EWiseApply<RET>(V, W,
2331 EWiseExtToPlainAdapter<RET, NU1, NU2, _BinaryOperation>(_binary_op),
2332 EWiseExtToPlainAdapter<bool, NU1, NU2, _BinaryPredicate>(_doOp),
2333 allowVNulls, Vzero, true);
2334 }
2335
2336
2337
2338 template <typename RET, typename IU, typename NU1, typename NU2, typename _BinaryOperation, typename _BinaryPredicate>
2339 FullyDistSpVec<IU,RET> EWiseApply
2340 (const FullyDistSpVec<IU,NU1> & V, const FullyDistSpVec<IU,NU2> & W , _BinaryOperation _binary_op, _BinaryPredicate _doOp, bool allowVNulls, bool allowWNulls, NU1 Vzero, NU2 Wzero, const bool allowIntersect = true)
2341 {
2342 return EWiseApply<RET>(V, W,
2343 EWiseExtToPlainAdapter<RET, NU1, NU2, _BinaryOperation>(_binary_op),
2344 EWiseExtToPlainAdapter<bool, NU1, NU2, _BinaryPredicate>(_doOp),
2345 allowVNulls, allowWNulls, Vzero, Wzero, allowIntersect, true);
2346 }
2347
2348 }
2349
2350
2351 #endif
2352
2353