1 /****************************************************************/
2 /* Parallel Combinatorial BLAS Library (for Graph Computations) */
3 /* version 1.6 -------------------------------------------------*/
4 /* date: 6/15/2017 ---------------------------------------------*/
5 /* authors: Ariful Azad, Aydin Buluc  --------------------------*/
6 /****************************************************************/
7 /*
8  Copyright (c) 2010-2017, The Regents of the University of California
9 
10  Permission is hereby granted, free of charge, to any person obtaining a copy
11  of this software and associated documentation files (the "Software"), to deal
12  in the Software without restriction, including without limitation the rights
13  to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14  copies of the Software, and to permit persons to whom the Software is
15  furnished to do so, subject to the following conditions:
16 
17  The above copyright notice and this permission notice shall be included in
18  all copies or substantial portions of the Software.
19 
20  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25  OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26  THE SOFTWARE.
27  */
28 
29 
30 #ifndef _PAR_FRIENDS_H_
31 #define _PAR_FRIENDS_H_
32 
33 #include "mpi.h"
34 #include <iostream>
35 #include <cstdarg>
36 #include "SpParMat.h"
37 #include "SpParHelper.h"
38 #include "MPIType.h"
39 #include "Friends.h"
40 #include "OptBuf.h"
41 #include "mtSpGEMM.h"
42 #include "MultiwayMerge.h"
43 
44 
45 namespace combblas {
46 
47 template <class IT, class NT, class DER>
48 class SpParMat;
49 
50 /*************************************************************************************************/
51 /**************************** FRIEND FUNCTIONS FOR PARALLEL CLASSES ******************************/
52 /*************************************************************************************************/
53 
54 
55 /**
56  ** Concatenate all the FullyDistVec<IT,NT> objects into a single one
57  **/
58 template <typename IT, typename NT>
Concatenate(std::vector<FullyDistVec<IT,NT>> & vecs)59 FullyDistVec<IT,NT> Concatenate ( std::vector< FullyDistVec<IT,NT> > & vecs)
60 {
61 	if(vecs.size() < 1)
62 	{
63 		SpParHelper::Print("Warning: Nothing to concatenate, returning empty ");
64 		return FullyDistVec<IT,NT>();
65 	}
66 	else if (vecs.size() < 2)
67 	{
68 		return vecs[1];
69 
70 	}
71 	else
72 	{
73 		typename std::vector< FullyDistVec<IT,NT> >::iterator it = vecs.begin();
74 		std::shared_ptr<CommGrid> commGridPtr = it->getcommgrid();
75 		MPI_Comm World = commGridPtr->GetWorld();
76 
77 		IT nglen = it->TotalLength();	// new global length
78 		IT cumloclen = it->MyLocLength();	// existing cumulative local lengths
79 		++it;
80 		for(; it != vecs.end(); ++it)
81 		{
82 			if(*(commGridPtr) != *(it->getcommgrid()))
83 			{
84 				SpParHelper::Print("Grids are not comparable for FullyDistVec<IT,NT>::EWiseApply\n");
85 				MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
86 			}
87 			nglen += it->TotalLength();
88 			cumloclen += it->MyLocLength();
89 		}
90 		FullyDistVec<IT,NT> ConCat (commGridPtr, nglen, NT());
91 		int nprocs = commGridPtr->GetSize();
92 
93 		std::vector< std::vector< NT > > data(nprocs);
94 		std::vector< std::vector< IT > > inds(nprocs);
95 		IT gloffset = 0;
96 		for(it = vecs.begin(); it != vecs.end(); ++it)
97 		{
98 			IT loclen = it->LocArrSize();
99 			for(IT i=0; i < loclen; ++i)
100 			{
101 				IT locind;
102 				IT loffset = it->LengthUntil();
103 				int owner = ConCat.Owner(gloffset+loffset+i, locind);
104 				data[owner].push_back(it->arr[i]);
105 				inds[owner].push_back(locind);
106 			}
107 			gloffset += it->TotalLength();
108 		}
109 
110 		int * sendcnt = new int[nprocs];
111 		int * sdispls = new int[nprocs];
112 		for(int i=0; i<nprocs; ++i)
113 			sendcnt[i] = (int) data[i].size();
114 
115 		int * rdispls = new int[nprocs];
116 		int * recvcnt = new int[nprocs];
117 		MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, World);  // share the request counts
118 		sdispls[0] = 0;
119 		rdispls[0] = 0;
120 		for(int i=0; i<nprocs-1; ++i)
121 		{
122 			sdispls[i+1] = sdispls[i] + sendcnt[i];
123 			rdispls[i+1] = rdispls[i] + recvcnt[i];
124 		}
125 		IT totrecv = std::accumulate(recvcnt,recvcnt+nprocs,static_cast<IT>(0));
126 		NT * senddatabuf = new NT[cumloclen];
127 		for(int i=0; i<nprocs; ++i)
128 		{
129       std::copy(data[i].begin(), data[i].end(), senddatabuf+sdispls[i]);
130 			std::vector<NT>().swap(data[i]);	// delete data vectors
131 		}
132 		NT * recvdatabuf = new NT[totrecv];
133 		MPI_Alltoallv(senddatabuf, sendcnt, sdispls, MPIType<NT>(), recvdatabuf, recvcnt, rdispls, MPIType<NT>(), World);  // send data
134 		delete [] senddatabuf;
135 
136 		IT * sendindsbuf = new IT[cumloclen];
137 		for(int i=0; i<nprocs; ++i)
138 		{
139       std::copy(inds[i].begin(), inds[i].end(), sendindsbuf+sdispls[i]);
140 			std::vector<IT>().swap(inds[i]);	// delete inds vectors
141 		}
142 		IT * recvindsbuf = new IT[totrecv];
143 		MPI_Alltoallv(sendindsbuf, sendcnt, sdispls, MPIType<IT>(), recvindsbuf, recvcnt, rdispls, MPIType<IT>(), World);  // send new inds
144 		DeleteAll(sendindsbuf, sendcnt, sdispls);
145 
146 		for(int i=0; i<nprocs; ++i)
147 		{
148 			for(int j = rdispls[i]; j < rdispls[i] + recvcnt[i]; ++j)
149 			{
150 				ConCat.arr[recvindsbuf[j]] = recvdatabuf[j];
151 			}
152 		}
153 		DeleteAll(recvindsbuf, recvcnt, rdispls);
154 		return ConCat;
155 	}
156 }
157 
158 template <typename MATRIXA, typename MATRIXB>
CheckSpGEMMCompliance(const MATRIXA & A,const MATRIXB & B)159 bool CheckSpGEMMCompliance(const MATRIXA & A, const MATRIXB & B)
160 {
161 	if(A.getncol() != B.getnrow())
162 	{
163 		std::ostringstream outs;
164 		outs << "Can not multiply, dimensions does not match"<< std::endl;
165 		outs << A.getncol() << " != " << B.getnrow() << std::endl;
166 		SpParHelper::Print(outs.str());
167 		MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
168 		return false;
169 	}
170 	if((void*) &A == (void*) &B)
171 	{
172 		std::ostringstream outs;
173 		outs << "Can not multiply, inputs alias (make a temporary copy of one of them first)"<< std::endl;
174 		SpParHelper::Print(outs.str());
175 		MPI_Abort(MPI_COMM_WORLD, MATRIXALIAS);
176 		return false;
177 	}
178 	return true;
179 }
180 
181 
182 // Combined logic for prune, recovery, and select
183 template <typename IT, typename NT, typename DER>
MCLPruneRecoverySelect(SpParMat<IT,NT,DER> & A,NT hardThreshold,IT selectNum,IT recoverNum,NT recoverPct,int kselectVersion)184 void MCLPruneRecoverySelect(SpParMat<IT,NT,DER> & A, NT hardThreshold, IT selectNum, IT recoverNum, NT recoverPct, int kselectVersion)
185 {
186 
187 #ifdef TIMING
188     double t0, t1;
189 #endif
190     // Prune and create a new pruned matrix
191     SpParMat<IT,NT,DER> PrunedA = A.Prune(std::bind2nd(std::less_equal<NT>(), hardThreshold), false);
192     // column-wise statistics of the pruned matrix
193     FullyDistVec<IT,NT> colSums = PrunedA.Reduce(Column, std::plus<NT>(), 0.0);
194     FullyDistVec<IT,NT> nnzPerColumn = PrunedA.Reduce(Column, std::plus<NT>(), 0.0, [](NT val){return 1.0;});
195     FullyDistVec<IT,NT> pruneCols(A.getcommgrid(), A.getncol(), hardThreshold);
196     PrunedA.FreeMemory();
197 
198 
199     // Check if we need recovery
200     // columns with nnz < recoverNum (r)
201     FullyDistSpVec<IT,NT> recoverCols(nnzPerColumn, std::bind2nd(std::less<NT>(), recoverNum));
202     recoverCols = recoverPct;
203     // columns with nnz < r AND sum < recoverPct (pct)
204     recoverCols = EWiseApply<NT>(recoverCols, colSums,
205                                  [](NT spval, NT dval){return spval;},
206                                  [](NT spval, NT dval){return dval < spval;},
207                                  false, NT());
208 
209     IT nrecover = recoverCols.getnnz();
210     if(nrecover > 0)
211     {
212 #ifdef TIMING
213         t0=MPI_Wtime();
214 #endif
215         A.Kselect(recoverCols, recoverNum, kselectVersion);
216 
217 #ifdef TIMING
218         t1=MPI_Wtime();
219         mcl_kselecttime += (t1-t0);
220 #endif
221 
222         pruneCols.Set(recoverCols);
223 
224 #ifdef COMBBLAS_DEBUG
225         std::ostringstream outs;
226         outs << "Number of columns needing recovery: " << nrecover << std::endl;
227         SpParHelper::Print(outs.str());
228 #endif
229 
230     }
231 
232 
233     if(selectNum>0)
234     {
235         // remaining columns will be up for selection
236         FullyDistSpVec<IT,NT> selectCols = EWiseApply<NT>(recoverCols, colSums,
237                                                           [](NT spval, NT dval){return spval;},
238                                                           [](NT spval, NT dval){return spval==-1;},
239                                                           true, static_cast<NT>(-1));
240 
241         selectCols = selectNum;
242         selectCols = EWiseApply<NT>(selectCols, nnzPerColumn,
243                                     [](NT spval, NT dval){return spval;},
244                                     [](NT spval, NT dval){return dval > spval;},
245                                     false, NT());
246         IT nselect = selectCols.getnnz();
247 
248         if(nselect > 0 )
249         {
250 #ifdef TIMING
251             t0=MPI_Wtime();
252 #endif
253             A.Kselect(selectCols, selectNum, kselectVersion); // PrunedA would also work
254 #ifdef TIMING
255             t1=MPI_Wtime();
256             mcl_kselecttime += (t1-t0);
257 #endif
258 
259             pruneCols.Set(selectCols);
260 #ifdef COMBBLAS_DEBUG
261             std::ostringstream outs;
262             outs << "Number of columns needing selection: " << nselect << std::endl;
263             SpParHelper::Print(outs.str());
264 #endif
265 #ifdef TIMING
266             t0=MPI_Wtime();
267 #endif
268             SpParMat<IT,NT,DER> selectedA = A.PruneColumn(pruneCols, std::less<NT>(), false);
269 #ifdef TIMING
270             t1=MPI_Wtime();
271             mcl_prunecolumntime += (t1-t0);
272 #endif
273             if(recoverNum>0 ) // recovery can be attempted after selection
274             {
275 
276                 FullyDistVec<IT,NT> nnzPerColumn1 = selectedA.Reduce(Column, std::plus<NT>(), 0.0, [](NT val){return 1.0;});
277                 FullyDistVec<IT,NT> colSums1 = selectedA.Reduce(Column, std::plus<NT>(), 0.0);
278                 selectedA.FreeMemory();
279 
280                 // slected columns with nnz < recoverNum (r)
281                 selectCols = recoverNum;
282                 selectCols = EWiseApply<NT>(selectCols, nnzPerColumn1,
283                                             [](NT spval, NT dval){return spval;},
284                                             [](NT spval, NT dval){return dval < spval;},
285                                             false, NT());
286 
287                 // selected columns with sum < recoverPct (pct)
288                 selectCols = recoverPct;
289                 selectCols = EWiseApply<NT>(selectCols, colSums1,
290                                             [](NT spval, NT dval){return spval;},
291                                             [](NT spval, NT dval){return dval < spval;},
292                                             false, NT());
293 
294                 IT n_recovery_after_select = selectCols.getnnz();
295                 if(n_recovery_after_select>0)
296                 {
297                     // mclExpandVector2 does it on the original vector
298                     // mclExpandVector1 does it one pruned vector
299 #ifdef TIMING
300                     t0=MPI_Wtime();
301 #endif
302                     A.Kselect(selectCols, recoverNum, kselectVersion); // Kselect on PrunedA might give different result
303 #ifdef TIMING
304                     t1=MPI_Wtime();
305                     mcl_kselecttime += (t1-t0);
306 #endif
307                     pruneCols.Set(selectCols);
308 
309 #ifdef COMBBLAS_DEBUG
310                     std::ostringstream outs1;
311                     outs1 << "Number of columns needing recovery after selection: " << nselect << std::endl;
312                     SpParHelper::Print(outs1.str());
313 #endif
314                 }
315 
316             }
317         }
318     }
319 
320 
321     // final prune
322 #ifdef TIMING
323     t0=MPI_Wtime();
324 #endif
325     A.PruneColumn(pruneCols, std::less<NT>(), true);
326 #ifdef TIMING
327     t1=MPI_Wtime();
328     mcl_prunecolumntime += (t1-t0);
329 #endif
330     // Add loops for empty columns
331     if(recoverNum<=0 ) // if recoverNum>0, recovery would have added nonzeros in empty columns
332     {
333         FullyDistVec<IT,NT> nnzPerColumnA = A.Reduce(Column, std::plus<NT>(), 0.0, [](NT val){return 1.0;});
334         FullyDistSpVec<IT,NT> emptyColumns(nnzPerColumnA, std::bind2nd(std::equal_to<NT>(), 0.0));
335         emptyColumns = 1.00;
336         //Ariful: We need a selective AddLoops function with a sparse vector
337         //A.AddLoops(emptyColumns);
338     }
339 }
340 
341 
342 
343 
344 /**
345  * Broadcasts A multiple times (#phases) in order to save storage in the output
346  * Only uses 1/phases of C memory if the threshold/max limits are proper
347  */
348 template <typename SR, typename NUO, typename UDERO, typename IU, typename NU1, typename NU2, typename UDERA, typename UDERB>
MemEfficientSpGEMM(SpParMat<IU,NU1,UDERA> & A,SpParMat<IU,NU2,UDERB> & B,int phases,NUO hardThreshold,IU selectNum,IU recoverNum,NUO recoverPct,int kselectVersion,int64_t perProcessMemory)349 SpParMat<IU,NUO,UDERO> MemEfficientSpGEMM (SpParMat<IU,NU1,UDERA> & A, SpParMat<IU,NU2,UDERB> & B,
350                                            int phases, NUO hardThreshold, IU selectNum, IU recoverNum, NUO recoverPct, int kselectVersion, int64_t perProcessMemory)
351 {
352     int myrank;
353     MPI_Comm_rank(MPI_COMM_WORLD,&myrank);
354     if(A.getncol() != B.getnrow())
355     {
356         std::ostringstream outs;
357         outs << "Can not multiply, dimensions does not match"<< std::endl;
358         outs << A.getncol() << " != " << B.getnrow() << std::endl;
359         SpParHelper::Print(outs.str());
360         MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
361         return SpParMat< IU,NUO,UDERO >();
362     }
363     if(phases <1 || phases >= A.getncol())
364     {
365         SpParHelper::Print("MemEfficientSpGEMM: The value of phases is too small or large. Resetting to 1.\n");
366         phases = 1;
367     }
368 
369     int stages, dummy; 	// last two parameters of ProductGrid are ignored for Synch multiplication
370     std::shared_ptr<CommGrid> GridC = ProductGrid((A.commGrid).get(), (B.commGrid).get(), stages, dummy, dummy);
371 
372 
373     if(perProcessMemory>0) // estimate the number of phases permitted by memory
374     {
375         int p;
376         MPI_Comm World = GridC->GetWorld();
377         MPI_Comm_size(World,&p);
378 
379         int64_t perNNZMem_in = sizeof(IU)*2 + sizeof(NU1);
380         int64_t perNNZMem_out = sizeof(IU)*2 + sizeof(NUO);
381 
382         // max nnz(A) in a porcess
383         int64_t lannz = A.getlocalnnz();
384         int64_t gannz;
385         MPI_Allreduce(&lannz, &gannz, 1, MPIType<int64_t>(), MPI_MAX, World);
386         int64_t inputMem = gannz * perNNZMem_in * 4; // for four copies (two for SUMMA)
387 
388         // max nnz(A^2) stored by summa in a porcess
389         int64_t asquareNNZ = EstPerProcessNnzSUMMA(A,B);
390         int64_t asquareMem = asquareNNZ * perNNZMem_out * 2; // an extra copy in multiway merge and in selection/recovery step
391 
392 
393         // estimate kselect memory
394         int64_t d = ceil( (asquareNNZ * sqrt(p))/ B.getlocalcols() ); // average nnz per column in A^2 (it is an overestimate because asquareNNZ is estimated based on unmerged matrices)
395         // this is equivalent to (asquareNNZ * p) / B.getcol()
396         int64_t k = std::min(int64_t(std::max(selectNum, recoverNum)), d );
397         int64_t kselectmem = B.getlocalcols() * k * 8 * 3;
398 
399         // estimate output memory
400         int64_t outputNNZ = (B.getlocalcols() * k)/sqrt(p);
401         int64_t outputMem = outputNNZ * perNNZMem_in * 2;
402 
403         //inputMem + outputMem + asquareMem/phases + kselectmem/phases < memory
404         int64_t remainingMem = perProcessMemory*1000000000 - inputMem - outputMem;
405         if(remainingMem > 0)
406         {
407             phases = 1 + (asquareMem+kselectmem) / remainingMem;
408         }
409 
410 
411 
412 
413         if(myrank==0)
414         {
415             if(remainingMem < 0)
416             {
417                 std::cout << "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n Warning: input and output memory requirement is greater than per-process avaiable memory. Keeping phase to the value supplied at the command line. The program may go out of memory and crash! \n !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" << std::endl;
418             }
419 #ifdef SHOW_MEMORY_USAGE
420             int64_t maxMemory = kselectmem/phases + inputMem + outputMem + asquareMem / phases;
421             if(maxMemory>1000000000)
422             std::cout << "phases: " << phases << ": per process memory: " << perProcessMemory << " GB asquareMem: " << asquareMem/1000000000.00 << " GB" << " inputMem: " << inputMem/1000000000.00 << " GB" << " outputMem: " << outputMem/1000000000.00 << " GB" << " kselectmem: " << kselectmem/1000000000.00 << " GB" << std::endl;
423             else
424             std::cout << "phases: " << phases << ": per process memory: " << perProcessMemory << " GB asquareMem: " << asquareMem/1000000.00 << " MB" << " inputMem: " << inputMem/1000000.00 << " MB" << " outputMem: " << outputMem/1000000.00 << " MB" << " kselectmem: " << kselectmem/1000000.00 << " MB" << std::endl;
425 #endif
426 
427         }
428     }
429 
430     IU C_m = A.spSeq->getnrow();
431     IU C_n = B.spSeq->getncol();
432 
433     std::vector< UDERB > PiecesOfB;
434     UDERB CopyB = *(B.spSeq); // we allow alias matrices as input because of this local copy
435 
436     CopyB.ColSplit(phases, PiecesOfB); // CopyB's memory is destroyed at this point
437     MPI_Barrier(GridC->GetWorld());
438 
439 
440     IU ** ARecvSizes = SpHelper::allocate2D<IU>(UDERA::esscount, stages);
441     IU ** BRecvSizes = SpHelper::allocate2D<IU>(UDERB::esscount, stages);
442 
443 
444 
445     SpParHelper::GetSetSizes( *(A.spSeq), ARecvSizes, (A.commGrid)->GetRowWorld());
446 
447     // Remotely fetched matrices are stored as pointers
448     UDERA * ARecv;
449     UDERB * BRecv;
450 
451     std::vector< UDERO > toconcatenate;
452 
453     int Aself = (A.commGrid)->GetRankInProcRow();
454     int Bself = (B.commGrid)->GetRankInProcCol();
455 
456     for(int p = 0; p< phases; ++p)
457     {
458         SpParHelper::GetSetSizes( PiecesOfB[p], BRecvSizes, (B.commGrid)->GetColWorld());
459         std::vector< SpTuples<IU,NUO>  *> tomerge;
460         for(int i = 0; i < stages; ++i)
461         {
462             std::vector<IU> ess;
463             if(i == Aself)  ARecv = A.spSeq;	// shallow-copy
464             else
465             {
466                 ess.resize(UDERA::esscount);
467                 for(int j=0; j< UDERA::esscount; ++j)
468                     ess[j] = ARecvSizes[j][i];		// essentials of the ith matrix in this row
469                 ARecv = new UDERA();				// first, create the object
470             }
471 
472 #ifdef TIMING
473             double t0=MPI_Wtime();
474 #endif
475             SpParHelper::BCastMatrix(GridC->GetRowWorld(), *ARecv, ess, i);	// then, receive its elements
476 #ifdef TIMING
477             double t1=MPI_Wtime();
478             mcl_Abcasttime += (t1-t0);
479 #endif
480             ess.clear();
481 
482             if(i == Bself)  BRecv = &(PiecesOfB[p]);	// shallow-copy
483             else
484             {
485                 ess.resize(UDERB::esscount);
486                 for(int j=0; j< UDERB::esscount; ++j)
487                     ess[j] = BRecvSizes[j][i];
488                 BRecv = new UDERB();
489             }
490 #ifdef TIMING
491             double t2=MPI_Wtime();
492 #endif
493             SpParHelper::BCastMatrix(GridC->GetColWorld(), *BRecv, ess, i);	// then, receive its elements
494 #ifdef TIMING
495             double t3=MPI_Wtime();
496             mcl_Bbcasttime += (t3-t2);
497 #endif
498 
499 
500 #ifdef TIMING
501             double t4=MPI_Wtime();
502 #endif
503             SpTuples<IU,NUO> * C_cont = LocalSpGEMM<SR, NUO>(*ARecv, *BRecv,i != Aself, i != Bself);
504 
505 #ifdef TIMING
506             double t5=MPI_Wtime();
507             mcl_localspgemmtime += (t5-t4);
508 #endif
509 
510             if(!C_cont->isZero())
511                 tomerge.push_back(C_cont);
512             else
513                 delete C_cont;
514 
515         }   // all stages executed
516 
517 #ifdef SHOW_MEMORY_USAGE
518         int64_t gcnnz_unmerged, lcnnz_unmerged = 0;
519          for(size_t i = 0; i < tomerge.size(); ++i)
520          {
521               lcnnz_unmerged += tomerge[i]->getnnz();
522          }
523         MPI_Allreduce(&lcnnz_unmerged, &gcnnz_unmerged, 1, MPIType<int64_t>(), MPI_MAX, MPI_COMM_WORLD);
524         int64_t summa_memory = gcnnz_unmerged*20;//(gannz*2 + phase_nnz + gcnnz_unmerged + gannz + gannz/phases) * 20; // last two for broadcasts
525 
526         if(myrank==0)
527         {
528             if(summa_memory>1000000000)
529                 std::cout << p+1 << ". unmerged: " << summa_memory/1000000000.00 << "GB " ;
530             else
531                 std::cout << p+1 << ". unmerged: " << summa_memory/1000000.00 << " MB " ;
532 
533         }
534 #endif
535 
536 #ifdef TIMING
537         double t6=MPI_Wtime();
538 #endif
539         //UDERO OnePieceOfC(MergeAll<SR>(tomerge, C_m, PiecesOfB[p].getncol(),true), false);
540         // TODO: MultiwayMerge can directly return UDERO inorder to avoid the extra copy
541         SpTuples<IU,NUO> * OnePieceOfC_tuples = MultiwayMerge<SR>(tomerge, C_m, PiecesOfB[p].getncol(),true);
542 
543 #ifdef SHOW_MEMORY_USAGE
544         int64_t gcnnz_merged, lcnnz_merged ;
545         lcnnz_merged = OnePieceOfC_tuples->getnnz();
546         MPI_Allreduce(&lcnnz_merged, &gcnnz_merged, 1, MPIType<int64_t>(), MPI_MAX, MPI_COMM_WORLD);
547 
548         // TODO: we can remove gcnnz_merged memory here because we don't need to concatenate anymore
549         int64_t merge_memory = gcnnz_merged*2*20;//(gannz*2 + phase_nnz + gcnnz_unmerged + gcnnz_merged*2) * 20;
550 
551         if(myrank==0)
552         {
553             if(merge_memory>1000000000)
554                 std::cout << " merged: " << merge_memory/1000000000.00 << "GB " ;
555             else
556                 std::cout << " merged: " << merge_memory/1000000.00 << " MB " ;
557 
558         }
559 #endif
560 
561 
562 #ifdef TIMING
563         double t7=MPI_Wtime();
564         mcl_multiwaymergetime += (t7-t6);
565 #endif
566         UDERO * OnePieceOfC = new UDERO(* OnePieceOfC_tuples, false);
567         delete OnePieceOfC_tuples;
568 
569         SpParMat<IU,NUO,UDERO> OnePieceOfC_mat(OnePieceOfC, GridC);
570         MCLPruneRecoverySelect(OnePieceOfC_mat, hardThreshold, selectNum, recoverNum, recoverPct, kselectVersion);
571 
572 #ifdef SHOW_MEMORY_USAGE
573         int64_t gcnnz_pruned, lcnnz_pruned ;
574         lcnnz_pruned = OnePieceOfC_mat.getlocalnnz();
575         MPI_Allreduce(&lcnnz_pruned, &gcnnz_pruned, 1, MPIType<int64_t>(), MPI_MAX, MPI_COMM_WORLD);
576 
577 
578         // TODO: we can remove gcnnz_merged memory here because we don't need to concatenate anymore
579         int64_t prune_memory = gcnnz_pruned*2*20;//(gannz*2 + phase_nnz + gcnnz_pruned*2) * 20 + kselectmem; // 3 extra copies of OnePieceOfC_mat, we can make it one extra copy!
580         //phase_nnz += gcnnz_pruned;
581 
582         if(myrank==0)
583         {
584             if(prune_memory>1000000000)
585                 std::cout << "Prune: " << prune_memory/1000000000.00 << "GB " << std::endl ;
586             else
587                 std::cout << "Prune: " << prune_memory/1000000.00 << " MB " << std::endl ;
588 
589         }
590 #endif
591 
592         // ABAB: Change this to accept pointers to objects
593         toconcatenate.push_back(OnePieceOfC_mat.seq());
594     }
595 
596 
597     UDERO * C = new UDERO(0,C_m, C_n,0);
598     C->ColConcatenate(toconcatenate); // ABAB: Change this to accept a vector of pointers to pointers to DER objects
599 
600 
601     SpHelper::deallocate2D(ARecvSizes, UDERA::esscount);
602     SpHelper::deallocate2D(BRecvSizes, UDERA::esscount);
603     return SpParMat<IU,NUO,UDERO> (C, GridC);
604 }
605 
606 
607 
608 /**
609  * Parallel C = A*B routine that uses a double buffered broadcasting scheme
610  * @pre { Input matrices, A and B, should not alias }
611  * Most memory efficient version available. Total stages: 2*sqrt(p)
612  * Memory requirement during first sqrt(p) stages: <= (3/2)*(nnz(A)+nnz(B))+(1/2)*nnz(C)
613  * Memory requirement during second sqrt(p) stages: <= nnz(A)+nnz(B)+nnz(C)
614  * Final memory requirement: nnz(C) if clearA and clearB are true
615  **/
616 template <typename SR, typename NUO, typename UDERO, typename IU, typename NU1, typename NU2, typename UDERA, typename UDERB>
617 SpParMat<IU,NUO,UDERO> Mult_AnXBn_DoubleBuff
618 		(SpParMat<IU,NU1,UDERA> & A, SpParMat<IU,NU2,UDERB> & B, bool clearA = false, bool clearB = false )
619 
620 {
621 	if(!CheckSpGEMMCompliance(A,B) )
622 	{
623 		return SpParMat< IU,NUO,UDERO >();
624 	}
625 
626 	int stages, dummy; 	// last two parameters of ProductGrid are ignored for Synch multiplication
627 	std::shared_ptr<CommGrid> GridC = ProductGrid((A.commGrid).get(), (B.commGrid).get(), stages, dummy, dummy);
628 	IU C_m = A.spSeq->getnrow();
629 	IU C_n = B.spSeq->getncol();
630 
631 	UDERA * A1seq = new UDERA();
632 	UDERA * A2seq = new UDERA();
633 	UDERB * B1seq = new UDERB();
634 	UDERB * B2seq = new UDERB();
635 	(A.spSeq)->Split( *A1seq, *A2seq);
636 	const_cast< UDERB* >(B.spSeq)->Transpose();
637 	(B.spSeq)->Split( *B1seq, *B2seq);
638 	MPI_Barrier(GridC->GetWorld());
639 
640 	IU ** ARecvSizes = SpHelper::allocate2D<IU>(UDERA::esscount, stages);
641 	IU ** BRecvSizes = SpHelper::allocate2D<IU>(UDERB::esscount, stages);
642 
643 	SpParHelper::GetSetSizes( *A1seq, ARecvSizes, (A.commGrid)->GetRowWorld());
644 	SpParHelper::GetSetSizes( *B1seq, BRecvSizes, (B.commGrid)->GetColWorld());
645 
646 	// Remotely fetched matrices are stored as pointers
647 	UDERA * ARecv;
648 	UDERB * BRecv;
649 	std::vector< SpTuples<IU,NUO>  *> tomerge;
650 
651 	int Aself = (A.commGrid)->GetRankInProcRow();
652 	int Bself = (B.commGrid)->GetRankInProcCol();
653 
654 	for(int i = 0; i < stages; ++i)
655 	{
656 		std::vector<IU> ess;
657 		if(i == Aself)
658 		{
659 			ARecv = A1seq;	// shallow-copy
660 		}
661 		else
662 		{
663 			ess.resize(UDERA::esscount);
664 			for(int j=0; j< UDERA::esscount; ++j)
665 			{
666 				ess[j] = ARecvSizes[j][i];		// essentials of the ith matrix in this row
667 			}
668 			ARecv = new UDERA();				// first, create the object
669 		}
670 		SpParHelper::BCastMatrix(GridC->GetRowWorld(), *ARecv, ess, i);	// then, receive its elements
671 		ess.clear();
672 		if(i == Bself)
673 		{
674 			BRecv = B1seq;	// shallow-copy
675 		}
676 		else
677 		{
678 			ess.resize(UDERB::esscount);
679 			for(int j=0; j< UDERB::esscount; ++j)
680 			{
681 				ess[j] = BRecvSizes[j][i];
682 			}
683 			BRecv = new UDERB();
684 		}
685 		SpParHelper::BCastMatrix(GridC->GetColWorld(), *BRecv, ess, i);	// then, receive its elements
686 
687 
688 		SpTuples<IU,NUO> * C_cont = MultiplyReturnTuples<SR, NUO>
689 						(*ARecv, *BRecv, // parameters themselves
690 						false, true,	// transpose information (B is transposed)
691 						i != Aself, 	// 'delete A' condition
692 						i != Bself);	// 'delete B' condition
693 
694 		if(!C_cont->isZero())
695 			tomerge.push_back(C_cont);
696 		else
697 			delete C_cont;
698 	}
699 	if(clearA) delete A1seq;
700 	if(clearB) delete B1seq;
701 
702 	// Set the new dimensions
703 	SpParHelper::GetSetSizes( *A2seq, ARecvSizes, (A.commGrid)->GetRowWorld());
704 	SpParHelper::GetSetSizes( *B2seq, BRecvSizes, (B.commGrid)->GetColWorld());
705 
706 	// Start the second round
707 	for(int i = 0; i < stages; ++i)
708 	{
709 		std::vector<IU> ess;
710 		if(i == Aself)
711 		{
712 			ARecv = A2seq;	// shallow-copy
713 		}
714 		else
715 		{
716 			ess.resize(UDERA::esscount);
717 			for(int j=0; j< UDERA::esscount; ++j)
718 			{
719 				ess[j] = ARecvSizes[j][i];		// essentials of the ith matrix in this row
720 			}
721 			ARecv = new UDERA();				// first, create the object
722 		}
723 
724 		SpParHelper::BCastMatrix(GridC->GetRowWorld(), *ARecv, ess, i);	// then, receive its elements
725 		ess.clear();
726 
727 		if(i == Bself)
728 		{
729 			BRecv = B2seq;	// shallow-copy
730 		}
731 		else
732 		{
733 			ess.resize(UDERB::esscount);
734 			for(int j=0; j< UDERB::esscount; ++j)
735 			{
736 				ess[j] = BRecvSizes[j][i];
737 			}
738 			BRecv = new UDERB();
739 		}
740 		SpParHelper::BCastMatrix(GridC->GetColWorld(), *BRecv, ess, i);	// then, receive its elements
741 
742 		SpTuples<IU,NUO> * C_cont = MultiplyReturnTuples<SR, NUO>
743 						(*ARecv, *BRecv, // parameters themselves
744 						false, true,	// transpose information (B is transposed)
745 						i != Aself, 	// 'delete A' condition
746 						i != Bself);	// 'delete B' condition
747 
748 		if(!C_cont->isZero())
749 			tomerge.push_back(C_cont);
750 		else
751 			delete C_cont;
752 	}
753 	SpHelper::deallocate2D(ARecvSizes, UDERA::esscount);
754 	SpHelper::deallocate2D(BRecvSizes, UDERB::esscount);
755 	if(clearA)
756 	{
757 		delete A2seq;
758 		delete A.spSeq;
759 		A.spSeq = NULL;
760 	}
761 	else
762 	{
763 		(A.spSeq)->Merge(*A1seq, *A2seq);
764 		delete A1seq;
765 		delete A2seq;
766 	}
767 	if(clearB)
768 	{
769 		delete B2seq;
770 		delete B.spSeq;
771 		B.spSeq = NULL;
772 	}
773 	else
774 	{
775 		(B.spSeq)->Merge(*B1seq, *B2seq);
776 		delete B1seq;
777 		delete B2seq;
778 		const_cast< UDERB* >(B.spSeq)->Transpose();	// transpose back to original
779 	}
780 
781 	UDERO * C = new UDERO(MergeAll<SR>(tomerge, C_m, C_n,true), false);
782 	return SpParMat<IU,NUO,UDERO> (C, GridC);		// return the result object
783 }
784 
785 
786 /**
787  * Parallel A = B*C routine that uses only MPI-1 features
788  * Relies on simple blocking broadcast
789  * @pre { Input matrices, A and B, should not alias }
790  **/
791 template <typename SR, typename NUO, typename UDERO, typename IU, typename NU1, typename NU2, typename UDERA, typename UDERB>
792 SpParMat<IU, NUO, UDERO> Mult_AnXBn_Synch
793 		(SpParMat<IU,NU1,UDERA> & A, SpParMat<IU,NU2,UDERB> & B, bool clearA = false, bool clearB = false )
794 
795 {
796 	if(!CheckSpGEMMCompliance(A,B) )
797 	{
798 		return SpParMat< IU,NUO,UDERO >();
799 	}
800 	int stages, dummy; 	// last two parameters of ProductGrid are ignored for Synch multiplication
801 	std::shared_ptr<CommGrid> GridC = ProductGrid((A.commGrid).get(), (B.commGrid).get(), stages, dummy, dummy);
802 	IU C_m = A.spSeq->getnrow();
803 	IU C_n = B.spSeq->getncol();
804 
805 	//const_cast< UDERB* >(B.spSeq)->Transpose(); // do not transpose for colum-by-column multiplication
806 	MPI_Barrier(GridC->GetWorld());
807 
808 	IU ** ARecvSizes = SpHelper::allocate2D<IU>(UDERA::esscount, stages);
809 	IU ** BRecvSizes = SpHelper::allocate2D<IU>(UDERB::esscount, stages);
810 
811 	SpParHelper::GetSetSizes( *(A.spSeq), ARecvSizes, (A.commGrid)->GetRowWorld());
812 	SpParHelper::GetSetSizes( *(B.spSeq), BRecvSizes, (B.commGrid)->GetColWorld());
813 
814 	// Remotely fetched matrices are stored as pointers
815 	UDERA * ARecv;
816 	UDERB * BRecv;
817 	std::vector< SpTuples<IU,NUO>  *> tomerge;
818 
819 	int Aself = (A.commGrid)->GetRankInProcRow();
820 	int Bself = (B.commGrid)->GetRankInProcCol();
821 
822 	for(int i = 0; i < stages; ++i)
823 	{
824 		std::vector<IU> ess;
825 		if(i == Aself)
826 		{
827 			ARecv = A.spSeq;	// shallow-copy
828 		}
829 		else
830 		{
831 			ess.resize(UDERA::esscount);
832 			for(int j=0; j< UDERA::esscount; ++j)
833 			{
834 				ess[j] = ARecvSizes[j][i];		// essentials of the ith matrix in this row
835 			}
836 			ARecv = new UDERA();				// first, create the object
837 		}
838 
839 		SpParHelper::BCastMatrix(GridC->GetRowWorld(), *ARecv, ess, i);	// then, receive its elements
840 		ess.clear();
841 
842 		if(i == Bself)
843 		{
844 			BRecv = B.spSeq;	// shallow-copy
845 		}
846 		else
847 		{
848 			ess.resize(UDERB::esscount);
849 			for(int j=0; j< UDERB::esscount; ++j)
850 			{
851 				ess[j] = BRecvSizes[j][i];
852 			}
853 			BRecv = new UDERB();
854 		}
855 
856 		SpParHelper::BCastMatrix(GridC->GetColWorld(), *BRecv, ess, i);	// then, receive its elements
857 
858 		/*
859 		 // before activating this transpose B first
860 		SpTuples<IU,NUO> * C_cont = MultiplyReturnTuples<SR, NUO>
861 						(*ARecv, *BRecv, // parameters themselves
862 						false, true,	// transpose information (B is transposed)
863 						i != Aself, 	// 'delete A' condition
864 						i != Bself);	// 'delete B' condition
865 		 */
866 
867 		SpTuples<IU,NUO> * C_cont = LocalSpGEMM<SR, NUO>
868 						(*ARecv, *BRecv, // parameters themselves
869 						i != Aself, 	// 'delete A' condition
870 						i != Bself);	// 'delete B' condition
871 
872 
873 		if(!C_cont->isZero())
874 			tomerge.push_back(C_cont);
875 
876 		#ifdef COMBBLAS_DEBUG
877     std::ostringstream outs;
878 		outs << i << "th SUMMA iteration"<< std::endl;
879 		SpParHelper::Print(outs.str());
880 		#endif
881 	}
882 	if(clearA && A.spSeq != NULL)
883 	{
884 		delete A.spSeq;
885 		A.spSeq = NULL;
886 	}
887 	if(clearB && B.spSeq != NULL)
888 	{
889 		delete B.spSeq;
890 		B.spSeq = NULL;
891 	}
892 
893 	SpHelper::deallocate2D(ARecvSizes, UDERA::esscount);
894 	SpHelper::deallocate2D(BRecvSizes, UDERB::esscount);
895 
896 	//UDERO * C = new UDERO(MergeAll<SR>(tomerge, C_m, C_n,true), false);
897 	// First get the result in SpTuples, then convert to UDER
898 	// the last parameter to MergeAll deletes tomerge arrays
899 
900 	SpTuples<IU,NUO> * C_tuples = MultiwayMerge<SR>(tomerge, C_m, C_n,true);
901 	UDERO * C = new UDERO(*C_tuples, false);
902 
903 	//if(!clearB)
904 	//	const_cast< UDERB* >(B.spSeq)->Transpose();	// transpose back to original
905 
906 	return SpParMat<IU,NUO,UDERO> (C, GridC);		// return the result object
907 }
908 
909 
910 
911     /**
912      * Estimate the maximum nnz needed to store in a process from all stages of SUMMA before reduction
913      * @pre { Input matrices, A and B, should not alias }
914      **/
915     template <typename IU, typename NU1, typename NU2, typename UDERA, typename UDERB>
EstPerProcessNnzSUMMA(SpParMat<IU,NU1,UDERA> & A,SpParMat<IU,NU2,UDERB> & B)916     int64_t EstPerProcessNnzSUMMA(SpParMat<IU,NU1,UDERA> & A, SpParMat<IU,NU2,UDERB> & B)
917 
918     {
919         int64_t nnzC_SUMMA = 0;
920 
921         if(A.getncol() != B.getnrow())
922         {
923             std::ostringstream outs;
924             outs << "Can not multiply, dimensions does not match"<< std::endl;
925             outs << A.getncol() << " != " << B.getnrow() << std::endl;
926             SpParHelper::Print(outs.str());
927             MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
928             return nnzC_SUMMA;
929         }
930 
931         int stages, dummy;     // last two parameters of ProductGrid are ignored for Synch multiplication
932         std::shared_ptr<CommGrid> GridC = ProductGrid((A.commGrid).get(), (B.commGrid).get(), stages, dummy, dummy);
933 
934         MPI_Barrier(GridC->GetWorld());
935 
936         IU ** ARecvSizes = SpHelper::allocate2D<IU>(UDERA::esscount, stages);
937         IU ** BRecvSizes = SpHelper::allocate2D<IU>(UDERB::esscount, stages);
938         SpParHelper::GetSetSizes( *(A.spSeq), ARecvSizes, (A.commGrid)->GetRowWorld());
939         SpParHelper::GetSetSizes( *(B.spSeq), BRecvSizes, (B.commGrid)->GetColWorld());
940 
941         // Remotely fetched matrices are stored as pointers
942         UDERA * ARecv;
943         UDERB * BRecv;
944 
945         int Aself = (A.commGrid)->GetRankInProcRow();
946         int Bself = (B.commGrid)->GetRankInProcCol();
947 
948 
949         for(int i = 0; i < stages; ++i)
950         {
951             std::vector<IU> ess;
952             if(i == Aself)
953             {
954                 ARecv = A.spSeq;    // shallow-copy
955             }
956             else
957             {
958                 ess.resize(UDERA::esscount);
959                 for(int j=0; j< UDERA::esscount; ++j)
960                 {
961                     ess[j] = ARecvSizes[j][i];        // essentials of the ith matrix in this row
962                 }
963                 ARecv = new UDERA();                // first, create the object
964             }
965 
966             SpParHelper::BCastMatrix(GridC->GetRowWorld(), *ARecv, ess, i);    // then, receive its elements
967             ess.clear();
968 
969             if(i == Bself)
970             {
971                 BRecv = B.spSeq;    // shallow-copy
972             }
973             else
974             {
975                 ess.resize(UDERB::esscount);
976                 for(int j=0; j< UDERB::esscount; ++j)
977                 {
978                     ess[j] = BRecvSizes[j][i];
979                 }
980                 BRecv = new UDERB();
981             }
982 
983             SpParHelper::BCastMatrix(GridC->GetColWorld(), *BRecv, ess, i);    // then, receive its elements
984 
985 
986             IU* colnnzC = estimateNNZ(*ARecv, *BRecv);
987 
988 
989             IU nzc = BRecv->GetDCSC()->nzc;
990             IU nnzC_stage = 0;
991 #ifdef THREADED
992 #pragma omp parallel for reduction (+:nnzC_stage)
993 #endif
994             for (IU k=0; k<nzc; k++)
995             {
996                 nnzC_stage = nnzC_stage + colnnzC[k];
997             }
998             nnzC_SUMMA += nnzC_stage;
999 
1000             // delete received data
1001             if(i != Aself)
1002                 delete ARecv;
1003             if(i != Bself)
1004                 delete BRecv;
1005         }
1006 
1007         SpHelper::deallocate2D(ARecvSizes, UDERA::esscount);
1008         SpHelper::deallocate2D(BRecvSizes, UDERB::esscount);
1009 
1010         int64_t nnzC_SUMMA_max = 0;
1011         MPI_Allreduce(&nnzC_SUMMA, &nnzC_SUMMA_max, 1, MPIType<int64_t>(), MPI_MAX, GridC->GetWorld());
1012 
1013         return nnzC_SUMMA_max;
1014     }
1015 
1016 
1017 
1018 
1019 template <typename MATRIX, typename VECTOR>
CheckSpMVCompliance(const MATRIX & A,const VECTOR & x)1020 void CheckSpMVCompliance(const MATRIX & A, const VECTOR & x)
1021 {
1022 	if(A.getncol() != x.TotalLength())
1023 	{
1024 		std::ostringstream outs;
1025 		outs << "Can not multiply, dimensions does not match"<< std::endl;
1026 		outs << A.getncol() << " != " << x.TotalLength() << std::endl;
1027 		SpParHelper::Print(outs.str());
1028 		MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
1029 	}
1030 	if(! ( *(A.getcommgrid()) == *(x.getcommgrid())) )
1031 	{
1032 		std::cout << "Grids are not comparable for SpMV" << std::endl;
1033 		MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
1034 	}
1035 }
1036 
1037 
1038 template <typename SR, typename IU, typename NUM, typename UDER>
1039 FullyDistSpVec<IU,typename promote_trait<NUM,IU>::T_promote>  SpMV
1040 	(const SpParMat<IU,NUM,UDER> & A, const FullyDistSpVec<IU,IU> & x, bool indexisvalue, OptBuf<int32_t, typename promote_trait<NUM,IU>::T_promote > & optbuf);
1041 
1042 template <typename SR, typename IU, typename NUM, typename UDER>
SpMV(const SpParMat<IU,NUM,UDER> & A,const FullyDistSpVec<IU,IU> & x,bool indexisvalue)1043 FullyDistSpVec<IU,typename promote_trait<NUM,IU>::T_promote>  SpMV
1044 	(const SpParMat<IU,NUM,UDER> & A, const FullyDistSpVec<IU,IU> & x, bool indexisvalue)
1045 {
1046 	typedef typename promote_trait<NUM,IU>::T_promote T_promote;
1047 	OptBuf<int32_t, T_promote > optbuf = OptBuf<int32_t, T_promote >();
1048 	return SpMV<SR>(A, x, indexisvalue, optbuf);
1049 }
1050 
1051 /**
1052  * Step 1 of the sparse SpMV algorithm
1053  * @param[in,out]   trxlocnz, lenuntil,trxinds,trxnums  { set or allocated }
1054  * @param[in] 	indexisvalue
1055  **/
1056 template<typename IU, typename NV>
TransposeVector(MPI_Comm & World,const FullyDistSpVec<IU,NV> & x,int32_t & trxlocnz,IU & lenuntil,int32_t * & trxinds,NV * & trxnums,bool indexisvalue)1057 void TransposeVector(MPI_Comm & World, const FullyDistSpVec<IU,NV> & x, int32_t & trxlocnz, IU & lenuntil, int32_t * & trxinds, NV * & trxnums, bool indexisvalue)
1058 {
1059 	int32_t xlocnz = (int32_t) x.getlocnnz();
1060 	int32_t roffst = (int32_t) x.RowLenUntil();	// since trxinds is int32_t
1061 	int32_t roffset;
1062 	IU luntil = x.LengthUntil();
1063 	int diagneigh = x.commGrid->GetComplementRank();
1064 
1065 	MPI_Status status;
1066 	MPI_Sendrecv(&roffst, 1, MPIType<int32_t>(), diagneigh, TROST, &roffset, 1, MPIType<int32_t>(), diagneigh, TROST, World, &status);
1067 	MPI_Sendrecv(&xlocnz, 1, MPIType<int32_t>(), diagneigh, TRNNZ, &trxlocnz, 1, MPIType<int32_t>(), diagneigh, TRNNZ, World, &status);
1068 	MPI_Sendrecv(&luntil, 1, MPIType<IU>(), diagneigh, TRLUT, &lenuntil, 1, MPIType<IU>(), diagneigh, TRLUT, World, &status);
1069 
1070 	// ABAB: Important observation is that local indices (given by x.ind) is 32-bit addressible
1071 	// Copy them to 32 bit integers and transfer that to save 50% of off-node bandwidth
1072 	trxinds = new int32_t[trxlocnz];
1073 	int32_t * temp_xind = new int32_t[xlocnz];
1074 #ifdef THREADED
1075 #pragma omp parallel for
1076 #endif
1077 	for(int i=0; i< xlocnz; ++i)
1078         temp_xind[i] = (int32_t) x.ind[i];
1079 	MPI_Sendrecv(temp_xind, xlocnz, MPIType<int32_t>(), diagneigh, TRI, trxinds, trxlocnz, MPIType<int32_t>(), diagneigh, TRI, World, &status);
1080 	delete [] temp_xind;
1081 	if(!indexisvalue)
1082 	{
1083 		trxnums = new NV[trxlocnz];
1084 		MPI_Sendrecv(const_cast<NV*>(SpHelper::p2a(x.num)), xlocnz, MPIType<NV>(), diagneigh, TRX, trxnums, trxlocnz, MPIType<NV>(), diagneigh, TRX, World, &status);
1085 	}
1086 
1087   std::transform(trxinds, trxinds+trxlocnz, trxinds, std::bind2nd(std::plus<int32_t>(), roffset)); // fullydist indexing (p pieces) -> matrix indexing (sqrt(p) pieces)
1088 }
1089 
1090 
1091 /**
1092  * Step 2 of the sparse SpMV algorithm
1093  * @param[in,out]   trxinds, trxnums { deallocated }
1094  * @param[in,out]   indacc, numacc { allocated }
1095  * @param[in,out]	accnz { set }
1096  * @param[in] 		trxlocnz, lenuntil, indexisvalue
1097  **/
1098 template<typename IU, typename NV>
AllGatherVector(MPI_Comm & ColWorld,int trxlocnz,IU lenuntil,int32_t * & trxinds,NV * & trxnums,int32_t * & indacc,NV * & numacc,int & accnz,bool indexisvalue)1099 void AllGatherVector(MPI_Comm & ColWorld, int trxlocnz, IU lenuntil, int32_t * & trxinds, NV * & trxnums,
1100 					 int32_t * & indacc, NV * & numacc, int & accnz, bool indexisvalue)
1101 {
1102     int colneighs, colrank;
1103 	MPI_Comm_size(ColWorld, &colneighs);
1104 	MPI_Comm_rank(ColWorld, &colrank);
1105 	int * colnz = new int[colneighs];
1106 	colnz[colrank] = trxlocnz;
1107 	MPI_Allgather(MPI_IN_PLACE, 1, MPI_INT, colnz, 1, MPI_INT, ColWorld);
1108 	int * dpls = new int[colneighs]();	// displacements (zero initialized pid)
1109 	std::partial_sum(colnz, colnz+colneighs-1, dpls+1);
1110 	accnz = std::accumulate(colnz, colnz+colneighs, 0);
1111 	indacc = new int32_t[accnz];
1112 	numacc = new NV[accnz];
1113 
1114 	// ABAB: Future issues here, colnz is of type int (MPI limitation)
1115 	// What if the aggregate vector size along the processor row/column is not 32-bit addressible?
1116 	// This will happen when n/sqrt(p) > 2^31
1117 	// Currently we can solve a small problem (scale 32) with 4096 processor
1118 	// For a medium problem (scale 35), we'll need 32K processors which gives sqrt(p) ~ 180
1119 	// 2^35 / 180 ~ 2^29 / 3 which is not an issue !
1120 
1121 #ifdef TIMING
1122 	double t0=MPI_Wtime();
1123 #endif
1124 	MPI_Allgatherv(trxinds, trxlocnz, MPIType<int32_t>(), indacc, colnz, dpls, MPIType<int32_t>(), ColWorld);
1125 
1126 	delete [] trxinds;
1127 	if(indexisvalue)
1128 	{
1129 		IU lenuntilcol;
1130 		if(colrank == 0)  lenuntilcol = lenuntil;
1131 		MPI_Bcast(&lenuntilcol, 1, MPIType<IU>(), 0, ColWorld);
1132 		for(int i=0; i< accnz; ++i)	// fill numerical values from indices
1133 		{
1134 			numacc[i] = indacc[i] + lenuntilcol;
1135 		}
1136 	}
1137 	else
1138 	{
1139 		MPI_Allgatherv(trxnums, trxlocnz, MPIType<NV>(), numacc, colnz, dpls, MPIType<NV>(), ColWorld);
1140 		delete [] trxnums;
1141 	}
1142 #ifdef TIMING
1143 	double t1=MPI_Wtime();
1144 	cblas_allgathertime += (t1-t0);
1145 #endif
1146 	DeleteAll(colnz,dpls);
1147 }
1148 
1149 
1150 
1151 /**
1152   * Step 3 of the sparse SpMV algorithm, with the semiring
1153   * @param[in,out] optbuf {scratch space for all-to-all (fold) communication}
1154   * @param[in,out] indacc, numacc {index and values of the input vector, deleted upon exit}
1155   * @param[in,out] sendindbuf, sendnumbuf {index and values of the output vector, created}
1156  **/
1157 template<typename SR, typename IVT, typename OVT, typename IU, typename NUM, typename UDER>
LocalSpMV(const SpParMat<IU,NUM,UDER> & A,int rowneighs,OptBuf<int32_t,OVT> & optbuf,int32_t * & indacc,IVT * & numacc,int32_t * & sendindbuf,OVT * & sendnumbuf,int * & sdispls,int * sendcnt,int accnz,bool indexisvalue,PreAllocatedSPA<OVT> & SPA)1158 void LocalSpMV(const SpParMat<IU,NUM,UDER> & A, int rowneighs, OptBuf<int32_t, OVT > & optbuf, int32_t * & indacc, IVT * & numacc,
1159 			   int32_t * & sendindbuf, OVT * & sendnumbuf, int * & sdispls, int * sendcnt, int accnz, bool indexisvalue, PreAllocatedSPA<OVT> & SPA)
1160 {
1161     if(optbuf.totmax > 0)	// graph500 optimization enabled
1162 	{
1163 		if(A.spSeq->getnsplit() > 0)
1164 		{
1165 			// optbuf.{inds/nums/dspls} and sendcnt are all pre-allocated and only filled by dcsc_gespmv_threaded
1166 			generic_gespmv_threaded_setbuffers<SR> (*(A.spSeq), indacc, numacc, accnz, optbuf.inds, optbuf.nums, sendcnt, optbuf.dspls, rowneighs);
1167 		}
1168 		else
1169 		{
1170 			generic_gespmv<SR> (*(A.spSeq), indacc, numacc, accnz, optbuf.inds, optbuf.nums, sendcnt, optbuf.dspls, rowneighs, indexisvalue);
1171 		}
1172 		DeleteAll(indacc,numacc);
1173 	}
1174 	else
1175 	{
1176 		if(A.spSeq->getnsplit() > 0)
1177 		{
1178 			// sendindbuf/sendnumbuf/sdispls are all allocated and filled by dcsc_gespmv_threaded
1179 			int totalsent = generic_gespmv_threaded<SR> (*(A.spSeq), indacc, numacc, accnz, sendindbuf, sendnumbuf, sdispls, rowneighs, SPA);
1180 
1181 			DeleteAll(indacc, numacc);
1182 			for(int i=0; i<rowneighs-1; ++i)
1183 				sendcnt[i] = sdispls[i+1] - sdispls[i];
1184 			sendcnt[rowneighs-1] = totalsent - sdispls[rowneighs-1];
1185 		}
1186 		else
1187 		{
1188             // default SpMSpV
1189             std::vector< int32_t > indy;
1190             std::vector< OVT >  numy;
1191             generic_gespmv<SR>(*(A.spSeq), indacc, numacc, accnz, indy, numy, SPA);
1192 
1193             DeleteAll(indacc, numacc);
1194 
1195             int32_t bufsize = indy.size();	// as compact as possible
1196             sendindbuf = new int32_t[bufsize];
1197             sendnumbuf = new OVT[bufsize];
1198             int32_t perproc = A.getlocalrows() / rowneighs;
1199 
1200             int k = 0;	// index to buffer
1201             for(int i=0; i<rowneighs; ++i)
1202             {
1203                 int32_t end_this = (i==rowneighs-1) ? A.getlocalrows(): (i+1)*perproc;
1204                 while(k < bufsize && indy[k] < end_this)
1205                 {
1206                     sendindbuf[k] = indy[k] - i*perproc;
1207                     sendnumbuf[k] = numy[k];
1208                     ++sendcnt[i];
1209                     ++k;
1210                 }
1211             }
1212             sdispls = new int[rowneighs]();
1213             std::partial_sum(sendcnt, sendcnt+rowneighs-1, sdispls+1);
1214 
1215 //#endif
1216 
1217 		}
1218 	}
1219 
1220 }
1221 
1222 
1223 
1224 // non threaded
1225 template <typename SR, typename IU, typename OVT>
MergeContributions(int * listSizes,std::vector<int32_t * > & indsvec,std::vector<OVT * > & numsvec,std::vector<IU> & mergedind,std::vector<OVT> & mergednum)1226 void MergeContributions(int*  listSizes, std::vector<int32_t *> & indsvec, std::vector<OVT *> & numsvec, std::vector<IU>& mergedind, std::vector<OVT>& mergednum)
1227 {
1228 
1229     int nlists = indsvec.size();
1230     // this condition is checked in the caller SpMV function.
1231     // I am still putting it here for completeness
1232     if(nlists == 1)
1233     {
1234         // simply copy data
1235         int veclen = listSizes[0];
1236         mergedind.resize(veclen);
1237         mergednum.resize(veclen);
1238         for(int i=0; i<veclen; i++)
1239         {
1240             mergedind[i] = indsvec[0][i];
1241             mergednum[i] = numsvec[0][i];
1242         }
1243         return;
1244     }
1245 
1246     int32_t hsize = 0;
1247     int32_t inf = std::numeric_limits<int32_t>::min();
1248     int32_t sup = std::numeric_limits<int32_t>::max();
1249     KNHeap< int32_t, int32_t > sHeap(sup, inf);
1250     int * processed = new int[nlists]();
1251     for(int i=0; i<nlists; ++i)
1252     {
1253         if(listSizes[i] > 0)
1254         {
1255             // key, list_id
1256             sHeap.insert(indsvec[i][0], i);
1257             ++hsize;
1258         }
1259     }
1260     int32_t key, locv;
1261     if(hsize > 0)
1262     {
1263         sHeap.deleteMin(&key, &locv);
1264         mergedind.push_back( static_cast<IU>(key));
1265         mergednum.push_back(numsvec[locv][0]);	// nothing is processed yet
1266 
1267         if( (++(processed[locv])) < listSizes[locv] )
1268             sHeap.insert(indsvec[locv][processed[locv]], locv);
1269         else
1270             --hsize;
1271     }
1272     while(hsize > 0)
1273     {
1274         sHeap.deleteMin(&key, &locv);
1275         if(mergedind.back() == static_cast<IU>(key))
1276         {
1277             mergednum.back() = SR::add(mergednum.back(), numsvec[locv][processed[locv]]);
1278             // ABAB: Benchmark actually allows us to be non-deterministic in terms of parent selection
1279             // We can just skip this addition operator (if it's a max/min select)
1280         }
1281         else
1282         {
1283             mergedind.push_back(static_cast<IU>(key));
1284             mergednum.push_back(numsvec[locv][processed[locv]]);
1285         }
1286 
1287         if( (++(processed[locv])) < listSizes[locv] )
1288             sHeap.insert(indsvec[locv][processed[locv]], locv);
1289         else
1290             --hsize;
1291     }
1292     DeleteAll(processed);
1293 }
1294 
1295 
1296 
1297 template <typename SR, typename IU, typename OVT>
MergeContributions_threaded(int * & listSizes,std::vector<int32_t * > & indsvec,std::vector<OVT * > & numsvec,std::vector<IU> & mergedind,std::vector<OVT> & mergednum,IU maxindex)1298 void MergeContributions_threaded(int * & listSizes, std::vector<int32_t *> & indsvec, std::vector<OVT *> & numsvec, std::vector<IU> & mergedind, std::vector<OVT> & mergednum, IU maxindex)
1299 {
1300 
1301     int nlists = indsvec.size();
1302     // this condition is checked in the caller SpMV function.
1303     // I am still putting it here for completeness
1304     if(nlists == 1)
1305     {
1306         // simply copy data
1307         int veclen = listSizes[0];
1308         mergedind.resize(veclen);
1309         mergednum.resize(veclen);
1310 
1311 #ifdef THREADED
1312 #pragma omp parallel for
1313 #endif
1314         for(int i=0; i<veclen; i++)
1315         {
1316             mergedind[i] = indsvec[0][i];
1317             mergednum[i] = numsvec[0][i];
1318         }
1319         return;
1320     }
1321 
1322     int nthreads=1;
1323 #ifdef THREADED
1324 #pragma omp parallel
1325     {
1326         nthreads = omp_get_num_threads();
1327     }
1328 #endif
1329     int nsplits = 4*nthreads; // oversplit for load balance
1330     nsplits = std::min(nsplits, (int)maxindex);
1331     std::vector< std::vector<int32_t> > splitters(nlists);
1332     for(int k=0; k< nlists; k++)
1333     {
1334         splitters[k].resize(nsplits+1);
1335         splitters[k][0] = static_cast<int32_t>(0);
1336 #pragma omp parallel for
1337         for(int i=1; i< nsplits; i++)
1338         {
1339             IU cur_idx = i * (maxindex/nsplits);
1340             auto it = std::lower_bound (indsvec[k], indsvec[k] + listSizes[k], cur_idx);
1341             splitters[k][i] = (int32_t) (it - indsvec[k]);
1342         }
1343         splitters[k][nsplits] = listSizes[k];
1344     }
1345 
1346     // ------ perform merge in parallel ------
1347     std::vector<std::vector<IU>> indsBuf(nsplits);
1348     std::vector<std::vector<OVT>> numsBuf(nsplits);
1349     //TODO: allocate these vectors here before calling MergeContributions
1350 #pragma omp parallel for schedule(dynamic)
1351     for(int i=0; i< nsplits; i++)
1352     {
1353         std::vector<int32_t *> tIndsVec(nlists);
1354         std::vector<OVT *> tNumsVec(nlists);
1355         std::vector<int> tLengths(nlists);
1356         for(int j=0; j< nlists; ++j)
1357         {
1358             tIndsVec[j] = indsvec[j] + splitters[j][i];
1359             tNumsVec[j] = numsvec[j] + splitters[j][i];
1360             tLengths[j]= splitters[j][i+1] - splitters[j][i];
1361 
1362         }
1363         MergeContributions<SR>(tLengths.data(), tIndsVec, tNumsVec, indsBuf[i], numsBuf[i]);
1364     }
1365 
1366     // ------ concatenate merged tuples processed by threads ------
1367     std::vector<IU> tdisp(nsplits+1);
1368     tdisp[0] = 0;
1369     for(int i=0; i<nsplits; ++i)
1370     {
1371         tdisp[i+1] = tdisp[i] + indsBuf[i].size();
1372     }
1373 
1374     mergedind.resize(tdisp[nsplits]);
1375     mergednum.resize(tdisp[nsplits]);
1376 
1377 
1378 #pragma omp parallel for schedule(dynamic)
1379     for(int i=0; i< nsplits; i++)
1380     {
1381         std::copy(indsBuf[i].data() , indsBuf[i].data() + indsBuf[i].size(), mergedind.data() + tdisp[i]);
1382         std::copy(numsBuf[i].data() , numsBuf[i].data() + numsBuf[i].size(), mergednum.data() + tdisp[i]);
1383     }
1384 }
1385 
1386 
1387 /**
1388   * This version is the most flexible sparse matrix X sparse vector [Used in KDT]
1389   * It accepts different types for the matrix (NUM), the input vector (IVT) and the output vector (OVT)
1390   * without relying on automatic type promotion
1391   * Input (x) and output (y) vectors can be ALIASED because y is not written until the algorithm is done with x.
1392   */
1393 template <typename SR, typename IVT, typename OVT, typename IU, typename NUM, typename UDER>
SpMV(const SpParMat<IU,NUM,UDER> & A,const FullyDistSpVec<IU,IVT> & x,FullyDistSpVec<IU,OVT> & y,bool indexisvalue,OptBuf<int32_t,OVT> & optbuf,PreAllocatedSPA<OVT> & SPA)1394 void SpMV (const SpParMat<IU,NUM,UDER> & A, const FullyDistSpVec<IU,IVT> & x, FullyDistSpVec<IU,OVT> & y,
1395 			bool indexisvalue, OptBuf<int32_t, OVT > & optbuf, PreAllocatedSPA<OVT> & SPA)
1396 {
1397 	CheckSpMVCompliance(A,x);
1398 	optbuf.MarkEmpty();
1399     y.glen = A.getnrow(); // in case it is not set already
1400 
1401 	MPI_Comm World = x.commGrid->GetWorld();
1402 	MPI_Comm ColWorld = x.commGrid->GetColWorld();
1403 	MPI_Comm RowWorld = x.commGrid->GetRowWorld();
1404 
1405 	int accnz;
1406 	int32_t trxlocnz;
1407 	IU lenuntil;
1408 	int32_t *trxinds, *indacc;
1409 	IVT *trxnums, *numacc;
1410 
1411 #ifdef TIMING
1412     double t0=MPI_Wtime();
1413 #endif
1414 
1415 	TransposeVector(World, x, trxlocnz, lenuntil, trxinds, trxnums, indexisvalue);
1416 
1417 #ifdef TIMING
1418     double t1=MPI_Wtime();
1419     cblas_transvectime += (t1-t0);
1420 #endif
1421 
1422     if(x.commGrid->GetGridRows() > 1)
1423     {
1424         AllGatherVector(ColWorld, trxlocnz, lenuntil, trxinds, trxnums, indacc, numacc, accnz, indexisvalue);   // trxindS/trxnums deallocated, indacc/numacc allocated, accnz set
1425     }
1426     else
1427     {
1428         accnz = trxlocnz;
1429         indacc = trxinds;   // aliasing ptr
1430         numacc = trxnums;   // aliasing ptr
1431     }
1432 
1433 	int rowneighs;
1434 	MPI_Comm_size(RowWorld, &rowneighs);
1435 	int * sendcnt = new int[rowneighs]();
1436 	int32_t * sendindbuf;
1437 	OVT * sendnumbuf;
1438 	int * sdispls;
1439 
1440 #ifdef TIMING
1441     double t2=MPI_Wtime();
1442 #endif
1443 
1444 	LocalSpMV<SR>(A, rowneighs, optbuf, indacc, numacc, sendindbuf, sendnumbuf, sdispls, sendcnt, accnz, indexisvalue, SPA);	// indacc/numacc deallocated, sendindbuf/sendnumbuf/sdispls allocated
1445 
1446 #ifdef TIMING
1447     double t3=MPI_Wtime();
1448     cblas_localspmvtime += (t3-t2);
1449 #endif
1450 
1451 
1452     if(x.commGrid->GetGridCols() == 1)
1453     {
1454         y.ind.resize(sendcnt[0]);
1455         y.num.resize(sendcnt[0]);
1456 
1457 
1458 		if(optbuf.totmax > 0 )	// graph500 optimization enabled
1459 		{
1460 #ifdef THREADED
1461 #pragma omp parallel for
1462 #endif
1463 			for(int i=0; i<sendcnt[0]; i++)
1464 			{
1465 				y.ind[i] = optbuf.inds[i];
1466 				y.num[i] = optbuf.nums[i];
1467 			}
1468 		}
1469 		else
1470 		{
1471 #ifdef THREADED
1472 #pragma omp parallel for
1473 #endif
1474 			for(int i=0; i<sendcnt[0]; i++)
1475 			{
1476 				y.ind[i] = sendindbuf[i];
1477 				y.num[i] = sendnumbuf[i];
1478 			}
1479 			DeleteAll(sendindbuf, sendnumbuf,sdispls);
1480 		}
1481 		delete [] sendcnt;
1482         return;
1483     }
1484 	int * rdispls = new int[rowneighs];
1485 	int * recvcnt = new int[rowneighs];
1486 	MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, RowWorld);       // share the request counts
1487 
1488 	// receive displacements are exact whereas send displacements have slack
1489 	rdispls[0] = 0;
1490 	for(int i=0; i<rowneighs-1; ++i)
1491 	{
1492 		rdispls[i+1] = rdispls[i] + recvcnt[i];
1493 	}
1494 
1495 	int totrecv = std::accumulate(recvcnt,recvcnt+rowneighs,0);
1496 	int32_t * recvindbuf = new int32_t[totrecv];
1497 	OVT * recvnumbuf = new OVT[totrecv];
1498 
1499 #ifdef TIMING
1500 	double t4=MPI_Wtime();
1501 #endif
1502 	if(optbuf.totmax > 0 )	// graph500 optimization enabled
1503 	{
1504 		MPI_Alltoallv(optbuf.inds, sendcnt, optbuf.dspls, MPIType<int32_t>(), recvindbuf, recvcnt, rdispls, MPIType<int32_t>(), RowWorld);
1505 		MPI_Alltoallv(optbuf.nums, sendcnt, optbuf.dspls, MPIType<OVT>(), recvnumbuf, recvcnt, rdispls, MPIType<OVT>(), RowWorld);
1506 		delete [] sendcnt;
1507 	}
1508 	else
1509     {
1510 		MPI_Alltoallv(sendindbuf, sendcnt, sdispls, MPIType<int32_t>(), recvindbuf, recvcnt, rdispls, MPIType<int32_t>(), RowWorld);
1511 		MPI_Alltoallv(sendnumbuf, sendcnt, sdispls, MPIType<OVT>(), recvnumbuf, recvcnt, rdispls, MPIType<OVT>(), RowWorld);
1512 		DeleteAll(sendindbuf, sendnumbuf, sendcnt, sdispls);
1513 	}
1514 #ifdef TIMING
1515 	double t5=MPI_Wtime();
1516 	cblas_alltoalltime += (t5-t4);
1517 #endif
1518 
1519 #ifdef TIMING
1520     double t6=MPI_Wtime();
1521 #endif
1522     //MergeContributions<SR>(y,recvcnt, rdispls, recvindbuf, recvnumbuf, rowneighs);
1523     // free memory of y, in case it was aliased
1524     std::vector<IU>().swap(y.ind);
1525     std::vector<OVT>().swap(y.num);
1526 
1527     std::vector<int32_t *> indsvec(rowneighs);
1528     std::vector<OVT *> numsvec(rowneighs);
1529 
1530 #ifdef THREADED
1531 #pragma omp parallel for
1532 #endif
1533     for(int i=0; i<rowneighs; i++)
1534     {
1535         indsvec[i] = recvindbuf+rdispls[i];
1536         numsvec[i] = recvnumbuf+rdispls[i];
1537     }
1538 #ifdef THREADED
1539     MergeContributions_threaded<SR>(recvcnt, indsvec, numsvec, y.ind, y.num, y.MyLocLength());
1540 #else
1541     MergeContributions<SR>(recvcnt, indsvec, numsvec, y.ind, y.num);
1542 #endif
1543 
1544     DeleteAll(recvcnt, rdispls,recvindbuf, recvnumbuf);
1545 #ifdef TIMING
1546     double t7=MPI_Wtime();
1547     cblas_mergeconttime += (t7-t6);
1548 #endif
1549 
1550 }
1551 
1552 
1553 template <typename SR, typename IVT, typename OVT, typename IU, typename NUM, typename UDER>
SpMV(const SpParMat<IU,NUM,UDER> & A,const FullyDistSpVec<IU,IVT> & x,FullyDistSpVec<IU,OVT> & y,bool indexisvalue,PreAllocatedSPA<OVT> & SPA)1554 void SpMV (const SpParMat<IU,NUM,UDER> & A, const FullyDistSpVec<IU,IVT> & x, FullyDistSpVec<IU,OVT> & y, bool indexisvalue, PreAllocatedSPA<OVT> & SPA)
1555 {
1556 	OptBuf< int32_t, OVT > optbuf = OptBuf< int32_t,OVT >();
1557 	SpMV<SR>(A, x, y, indexisvalue, optbuf, SPA);
1558 }
1559 
1560 template <typename SR, typename IVT, typename OVT, typename IU, typename NUM, typename UDER>
SpMV(const SpParMat<IU,NUM,UDER> & A,const FullyDistSpVec<IU,IVT> & x,FullyDistSpVec<IU,OVT> & y,bool indexisvalue)1561 void SpMV (const SpParMat<IU,NUM,UDER> & A, const FullyDistSpVec<IU,IVT> & x, FullyDistSpVec<IU,OVT> & y, bool indexisvalue)
1562 {
1563     OptBuf< int32_t, OVT > optbuf = OptBuf< int32_t,OVT >();
1564     PreAllocatedSPA<OVT> SPA;
1565     SpMV<SR>(A, x, y, indexisvalue, optbuf, SPA);
1566 }
1567 
1568 template <typename SR, typename IVT, typename OVT, typename IU, typename NUM, typename UDER>
SpMV(const SpParMat<IU,NUM,UDER> & A,const FullyDistSpVec<IU,IVT> & x,FullyDistSpVec<IU,OVT> & y,bool indexisvalue,OptBuf<int32_t,OVT> & optbuf)1569 void SpMV (const SpParMat<IU,NUM,UDER> & A, const FullyDistSpVec<IU,IVT> & x, FullyDistSpVec<IU,OVT> & y, bool indexisvalue, OptBuf<int32_t, OVT > & optbuf)
1570 {
1571 	PreAllocatedSPA<OVT> SPA;
1572 	SpMV<SR>(A, x, y, indexisvalue, optbuf, SPA);
1573 }
1574 
1575 
1576 /**
1577  * Automatic type promotion is ONLY done here, all the callee functions (in Friends.h and below) are initialized with the promoted type
1578  * If indexisvalues = true, then we do not need to transfer values for x (happens for BFS iterations with boolean matrices and integer rhs vectors)
1579  **/
1580 template <typename SR, typename IU, typename NUM, typename UDER>
SpMV(const SpParMat<IU,NUM,UDER> & A,const FullyDistSpVec<IU,IU> & x,bool indexisvalue,OptBuf<int32_t,typename promote_trait<NUM,IU>::T_promote> & optbuf)1581 FullyDistSpVec<IU,typename promote_trait<NUM,IU>::T_promote>  SpMV
1582 (const SpParMat<IU,NUM,UDER> & A, const FullyDistSpVec<IU,IU> & x, bool indexisvalue, OptBuf<int32_t, typename promote_trait<NUM,IU>::T_promote > & optbuf)
1583 {
1584 	typedef typename promote_trait<NUM,IU>::T_promote T_promote;
1585 	FullyDistSpVec<IU, T_promote> y ( x.getcommgrid(), A.getnrow());	// identity doesn't matter for sparse vectors
1586 	SpMV<SR>(A, x, y, indexisvalue, optbuf);
1587 	return y;
1588 }
1589 
1590 /**
1591  * Parallel dense SpMV
1592  **/
1593 template <typename SR, typename IU, typename NUM, typename NUV, typename UDER>
SpMV(const SpParMat<IU,NUM,UDER> & A,const FullyDistVec<IU,NUV> & x)1594 FullyDistVec<IU,typename promote_trait<NUM,NUV>::T_promote>  SpMV
1595 	(const SpParMat<IU,NUM,UDER> & A, const FullyDistVec<IU,NUV> & x )
1596 {
1597 	typedef typename promote_trait<NUM,NUV>::T_promote T_promote;
1598 	CheckSpMVCompliance(A, x);
1599 
1600 	MPI_Comm World = x.commGrid->GetWorld();
1601 	MPI_Comm ColWorld = x.commGrid->GetColWorld();
1602 	MPI_Comm RowWorld = x.commGrid->GetRowWorld();
1603 
1604 	int xsize = (int) x.LocArrSize();
1605 	int trxsize = 0;
1606 
1607 	int diagneigh = x.commGrid->GetComplementRank();
1608 	MPI_Status status;
1609 	MPI_Sendrecv(&xsize, 1, MPI_INT, diagneigh, TRX, &trxsize, 1, MPI_INT, diagneigh, TRX, World, &status);
1610 
1611 	NUV * trxnums = new NUV[trxsize];
1612 	MPI_Sendrecv(const_cast<NUV*>(SpHelper::p2a(x.arr)), xsize, MPIType<NUV>(), diagneigh, TRX, trxnums, trxsize, MPIType<NUV>(), diagneigh, TRX, World, &status);
1613 
1614         int colneighs, colrank;
1615 	MPI_Comm_size(ColWorld, &colneighs);
1616 	MPI_Comm_rank(ColWorld, &colrank);
1617 	int * colsize = new int[colneighs];
1618 	colsize[colrank] = trxsize;
1619 	MPI_Allgather(MPI_IN_PLACE, 1, MPI_INT, colsize, 1, MPI_INT, ColWorld);
1620 	int * dpls = new int[colneighs]();	// displacements (zero initialized pid)
1621 	std::partial_sum(colsize, colsize+colneighs-1, dpls+1);
1622 	int accsize = std::accumulate(colsize, colsize+colneighs, 0);
1623 	NUV * numacc = new NUV[accsize];
1624 
1625 	MPI_Allgatherv(trxnums, trxsize, MPIType<NUV>(), numacc, colsize, dpls, MPIType<NUV>(), ColWorld);
1626 	delete [] trxnums;
1627 
1628 	// serial SpMV with dense vector
1629 	T_promote id = SR::id();
1630 	IU ysize = A.getlocalrows();
1631 	T_promote * localy = new T_promote[ysize];
1632 	std::fill_n(localy, ysize, id);
1633 
1634 #ifdef THREADED
1635 	dcsc_gespmv_threaded<SR>(*(A.spSeq), numacc, localy);
1636 #else
1637 	dcsc_gespmv<SR>(*(A.spSeq), numacc, localy);
1638 #endif
1639 
1640 
1641 	DeleteAll(numacc,colsize, dpls);
1642 
1643 	// FullyDistVec<IT,NT>(shared_ptr<CommGrid> grid, IT globallen, NT initval, NT id)
1644 	FullyDistVec<IU, T_promote> y ( x.commGrid, A.getnrow(), id);
1645 
1646 	int rowneighs;
1647 	MPI_Comm_size(RowWorld, &rowneighs);
1648 
1649 	IU begptr, endptr;
1650 	for(int i=0; i< rowneighs; ++i)
1651 	{
1652 		begptr = y.RowLenUntil(i);
1653 		if(i == rowneighs-1)
1654 		{
1655 			endptr = ysize;
1656 		}
1657 		else
1658 		{
1659 			endptr = y.RowLenUntil(i+1);
1660 		}
1661 		MPI_Reduce(localy+begptr, SpHelper::p2a(y.arr), endptr-begptr, MPIType<T_promote>(), SR::mpi_op(), i, RowWorld);
1662 	}
1663 	delete [] localy;
1664 	return y;
1665 }
1666 
1667 
1668 /**
1669  * \TODO: Old version that is no longer considered optimal
1670  * Kept for legacy purposes
1671  * To be removed when other functionals are fully tested.
1672  **/
1673 template <typename SR, typename IU, typename NUM, typename NUV, typename UDER>
SpMV(const SpParMat<IU,NUM,UDER> & A,const FullyDistSpVec<IU,NUV> & x)1674 FullyDistSpVec<IU,typename promote_trait<NUM,NUV>::T_promote>  SpMV
1675 	(const SpParMat<IU,NUM,UDER> & A, const FullyDistSpVec<IU,NUV> & x)
1676 {
1677 	typedef typename promote_trait<NUM,NUV>::T_promote T_promote;
1678 	CheckSpMVCompliance(A, x);
1679 
1680 	MPI_Comm World = x.commGrid->GetWorld();
1681 	MPI_Comm ColWorld = x.commGrid->GetColWorld();
1682 	MPI_Comm RowWorld = x.commGrid->GetRowWorld();
1683 
1684 	int xlocnz = (int) x.getlocnnz();
1685 	int trxlocnz = 0;
1686 	int roffst = x.RowLenUntil();
1687 	int offset;
1688 
1689 	int diagneigh = x.commGrid->GetComplementRank();
1690 	MPI_Status status;
1691 	MPI_Sendrecv(&xlocnz, 1, MPI_INT, diagneigh, TRX, &trxlocnz, 1, MPI_INT, diagneigh, TRX, World, &status);
1692 	MPI_Sendrecv(&roffst, 1, MPI_INT, diagneigh, TROST, &offset, 1, MPI_INT, diagneigh, TROST, World, &status);
1693 
1694 	IU * trxinds = new IU[trxlocnz];
1695 	NUV * trxnums = new NUV[trxlocnz];
1696 	MPI_Sendrecv(const_cast<IU*>(SpHelper::p2a(x.ind)), xlocnz, MPIType<IU>(), diagneigh, TRX, trxinds, trxlocnz, MPIType<IU>(), diagneigh, TRX, World, &status);
1697 	MPI_Sendrecv(const_cast<NUV*>(SpHelper::p2a(x.num)), xlocnz, MPIType<NUV>(), diagneigh, TRX, trxnums, trxlocnz, MPIType<NUV>(), diagneigh, TRX, World, &status);
1698   std::transform(trxinds, trxinds+trxlocnz, trxinds, std::bind2nd(std::plus<IU>(), offset)); // fullydist indexing (n pieces) -> matrix indexing (sqrt(p) pieces)
1699 
1700         int colneighs, colrank;
1701 	MPI_Comm_size(ColWorld, &colneighs);
1702 	MPI_Comm_rank(ColWorld, &colrank);
1703 	int * colnz = new int[colneighs];
1704 	colnz[colrank] = trxlocnz;
1705 	MPI_Allgather(MPI_IN_PLACE, 1, MPI_INT, colnz, 1, MPI_INT, ColWorld);
1706 	int * dpls = new int[colneighs]();	// displacements (zero initialized pid)
1707 	std::partial_sum(colnz, colnz+colneighs-1, dpls+1);
1708 	int accnz = std::accumulate(colnz, colnz+colneighs, 0);
1709 	IU * indacc = new IU[accnz];
1710 	NUV * numacc = new NUV[accnz];
1711 
1712 	// ABAB: Future issues here, colnz is of type int (MPI limitation)
1713 	// What if the aggregate vector size along the processor row/column is not 32-bit addressible?
1714 	MPI_Allgatherv(trxinds, trxlocnz, MPIType<IU>(), indacc, colnz, dpls, MPIType<IU>(), ColWorld);
1715 	MPI_Allgatherv(trxnums, trxlocnz, MPIType<NUV>(), numacc, colnz, dpls, MPIType<NUV>(), ColWorld);
1716 	DeleteAll(trxinds, trxnums);
1717 
1718 	// serial SpMV with sparse vector
1719 	std::vector< int32_t > indy;
1720 	std::vector< T_promote >  numy;
1721 
1722         int32_t * tmpindacc = new int32_t[accnz];
1723         for(int i=0; i< accnz; ++i) tmpindacc[i] = indacc[i];
1724 	delete [] indacc;
1725 
1726 	dcsc_gespmv<SR>(*(A.spSeq), tmpindacc, numacc, accnz, indy, numy);	// actual multiplication
1727 
1728 	DeleteAll(tmpindacc, numacc);
1729 	DeleteAll(colnz, dpls);
1730 
1731 	FullyDistSpVec<IU, T_promote> y ( x.commGrid, A.getnrow());	// identity doesn't matter for sparse vectors
1732 	IU yintlen = y.MyRowLength();
1733 
1734 	int rowneighs;
1735 	MPI_Comm_size(RowWorld,&rowneighs);
1736 	std::vector< std::vector<IU> > sendind(rowneighs);
1737 	std::vector< std::vector<T_promote> > sendnum(rowneighs);
1738 	typename std::vector<int32_t>::size_type outnz = indy.size();
1739 	for(typename std::vector<IU>::size_type i=0; i< outnz; ++i)
1740 	{
1741 		IU locind;
1742 		int rown = y.OwnerWithinRow(yintlen, static_cast<IU>(indy[i]), locind);
1743 		sendind[rown].push_back(locind);
1744 		sendnum[rown].push_back(numy[i]);
1745 	}
1746 
1747 	IU * sendindbuf = new IU[outnz];
1748 	T_promote * sendnumbuf = new T_promote[outnz];
1749 	int * sendcnt = new int[rowneighs];
1750 	int * sdispls = new int[rowneighs];
1751 	for(int i=0; i<rowneighs; ++i)
1752 		sendcnt[i] = sendind[i].size();
1753 
1754 	int * rdispls = new int[rowneighs];
1755 	int * recvcnt = new int[rowneighs];
1756 	MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, RowWorld);       // share the request counts
1757 
1758 	sdispls[0] = 0;
1759 	rdispls[0] = 0;
1760 	for(int i=0; i<rowneighs-1; ++i)
1761 	{
1762 		sdispls[i+1] = sdispls[i] + sendcnt[i];
1763 		rdispls[i+1] = rdispls[i] + recvcnt[i];
1764 	}
1765 	int totrecv = std::accumulate(recvcnt,recvcnt+rowneighs,0);
1766 	IU * recvindbuf = new IU[totrecv];
1767 	T_promote * recvnumbuf = new T_promote[totrecv];
1768 
1769 	for(int i=0; i<rowneighs; ++i)
1770 	{
1771     std::copy(sendind[i].begin(), sendind[i].end(), sendindbuf+sdispls[i]);
1772 		std::vector<IU>().swap(sendind[i]);
1773 	}
1774 	for(int i=0; i<rowneighs; ++i)
1775 	{
1776     std::copy(sendnum[i].begin(), sendnum[i].end(), sendnumbuf+sdispls[i]);
1777 		std::vector<T_promote>().swap(sendnum[i]);
1778 	}
1779 	MPI_Alltoallv(sendindbuf, sendcnt, sdispls, MPIType<IU>(), recvindbuf, recvcnt, rdispls, MPIType<IU>(), RowWorld);
1780 	MPI_Alltoallv(sendnumbuf, sendcnt, sdispls, MPIType<T_promote>(), recvnumbuf, recvcnt, rdispls, MPIType<T_promote>(), RowWorld);
1781 
1782 	DeleteAll(sendindbuf, sendnumbuf);
1783 	DeleteAll(sendcnt, recvcnt, sdispls, rdispls);
1784 
1785 	// define a SPA-like data structure
1786 	IU ysize = y.MyLocLength();
1787 	T_promote * localy = new T_promote[ysize];
1788 	bool * isthere = new bool[ysize];
1789 	std::vector<IU> nzinds;	// nonzero indices
1790   std::fill_n(isthere, ysize, false);
1791 
1792 	for(int i=0; i< totrecv; ++i)
1793 	{
1794 		if(!isthere[recvindbuf[i]])
1795 		{
1796 			localy[recvindbuf[i]] = recvnumbuf[i];	// initial assignment
1797 			nzinds.push_back(recvindbuf[i]);
1798 			isthere[recvindbuf[i]] = true;
1799 		}
1800 		else
1801 		{
1802 			localy[recvindbuf[i]] = SR::add(localy[recvindbuf[i]], recvnumbuf[i]);
1803 		}
1804 	}
1805 	DeleteAll(isthere, recvindbuf, recvnumbuf);
1806 	sort(nzinds.begin(), nzinds.end());
1807 	int nnzy = nzinds.size();
1808 	y.ind.resize(nnzy);
1809 	y.num.resize(nnzy);
1810 	for(int i=0; i< nnzy; ++i)
1811 	{
1812 		y.ind[i] = nzinds[i];
1813 		y.num[i] = localy[nzinds[i]];
1814 	}
1815 	delete [] localy;
1816 	return y;
1817 }
1818 
1819 
1820 template <typename IU, typename NU1, typename NU2, typename UDERA, typename UDERB>
EWiseMult(const SpParMat<IU,NU1,UDERA> & A,const SpParMat<IU,NU2,UDERB> & B,bool exclude)1821 SpParMat<IU,typename promote_trait<NU1,NU2>::T_promote,typename promote_trait<UDERA,UDERB>::T_promote> EWiseMult
1822 	(const SpParMat<IU,NU1,UDERA> & A, const SpParMat<IU,NU2,UDERB> & B , bool exclude)
1823 {
1824 	typedef typename promote_trait<NU1,NU2>::T_promote N_promote;
1825 	typedef typename promote_trait<UDERA,UDERB>::T_promote DER_promote;
1826 
1827 	if(*(A.commGrid) == *(B.commGrid))
1828 	{
1829 		DER_promote * result = new DER_promote( EWiseMult(*(A.spSeq),*(B.spSeq),exclude) );
1830 		return SpParMat<IU, N_promote, DER_promote> (result, A.commGrid);
1831 	}
1832 	else
1833 	{
1834 		std::cout << "Grids are not comparable elementwise multiplication" << std::endl;
1835 		MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
1836 		return SpParMat< IU,N_promote,DER_promote >();
1837 	}
1838 }
1839 
1840 template <typename RETT, typename RETDER, typename IU, typename NU1, typename NU2, typename UDERA, typename UDERB, typename _BinaryOperation>
EWiseApply(const SpParMat<IU,NU1,UDERA> & A,const SpParMat<IU,NU2,UDERB> & B,_BinaryOperation __binary_op,bool notB,const NU2 & defaultBVal)1841 SpParMat<IU,RETT,RETDER> EWiseApply
1842 	(const SpParMat<IU,NU1,UDERA> & A, const SpParMat<IU,NU2,UDERB> & B, _BinaryOperation __binary_op, bool notB, const NU2& defaultBVal)
1843 {
1844 	if(*(A.commGrid) == *(B.commGrid))
1845 	{
1846 		RETDER * result = new RETDER( EWiseApply<RETT>(*(A.spSeq),*(B.spSeq), __binary_op, notB, defaultBVal) );
1847 		return SpParMat<IU, RETT, RETDER> (result, A.commGrid);
1848 	}
1849 	else
1850 	{
1851 		std::cout << "Grids are not comparable elementwise apply" << std::endl;
1852 		MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
1853 		return SpParMat< IU,RETT,RETDER >();
1854 	}
1855 }
1856 
1857 template <typename RETT, typename RETDER, typename IU, typename NU1, typename NU2, typename UDERA, typename UDERB, typename _BinaryOperation, typename _BinaryPredicate>
EWiseApply(const SpParMat<IU,NU1,UDERA> & A,const SpParMat<IU,NU2,UDERB> & B,_BinaryOperation __binary_op,_BinaryPredicate do_op,bool allowANulls,bool allowBNulls,const NU1 & ANullVal,const NU2 & BNullVal,const bool allowIntersect,const bool useExtendedBinOp)1858 SpParMat<IU,RETT,RETDER> EWiseApply
1859 	(const SpParMat<IU,NU1,UDERA> & A, const SpParMat<IU,NU2,UDERB> & B, _BinaryOperation __binary_op, _BinaryPredicate do_op, bool allowANulls, bool allowBNulls, const NU1& ANullVal, const NU2& BNullVal, const bool allowIntersect, const bool useExtendedBinOp)
1860 {
1861 	if(*(A.commGrid) == *(B.commGrid))
1862 	{
1863 		RETDER * result = new RETDER( EWiseApply<RETT>(*(A.spSeq),*(B.spSeq), __binary_op, do_op, allowANulls, allowBNulls, ANullVal, BNullVal, allowIntersect) );
1864 		return SpParMat<IU, RETT, RETDER> (result, A.commGrid);
1865 	}
1866 	else
1867 	{
1868 		std::cout << "Grids are not comparable elementwise apply" << std::endl;
1869 		MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
1870 		return SpParMat< IU,RETT,RETDER >();
1871 	}
1872 }
1873 
1874 // plain adapter
1875 template <typename RETT, typename RETDER, typename IU, typename NU1, typename NU2, typename UDERA, typename UDERB, typename _BinaryOperation, typename _BinaryPredicate>
1876 SpParMat<IU,RETT,RETDER>
1877 EWiseApply (const SpParMat<IU,NU1,UDERA> & A, const SpParMat<IU,NU2,UDERB> & B, _BinaryOperation __binary_op, _BinaryPredicate do_op, bool allowANulls, bool allowBNulls, const NU1& ANullVal, const NU2& BNullVal, const bool allowIntersect = true)
1878 {
1879 	return EWiseApply<RETT, RETDER>(A, B,
1880 				EWiseExtToPlainAdapter<RETT, NU1, NU2, _BinaryOperation>(__binary_op),
1881 				EWiseExtToPlainAdapter<bool, NU1, NU2, _BinaryPredicate>(do_op),
1882 				allowANulls, allowBNulls, ANullVal, BNullVal, allowIntersect, true);
1883 }
1884 // end adapter
1885 
1886 /**
1887  * if exclude is true, then we prune all entries W[i] != zero from V
1888  * if exclude is false, then we perform a proper elementwise multiplication
1889 **/
1890 template <typename IU, typename NU1, typename NU2>
EWiseMult(const FullyDistSpVec<IU,NU1> & V,const FullyDistVec<IU,NU2> & W,bool exclude,NU2 zero)1891 FullyDistSpVec<IU,typename promote_trait<NU1,NU2>::T_promote> EWiseMult
1892 	(const FullyDistSpVec<IU,NU1> & V, const FullyDistVec<IU,NU2> & W , bool exclude, NU2 zero)
1893 {
1894 	typedef typename promote_trait<NU1,NU2>::T_promote T_promote;
1895 
1896 	if(*(V.commGrid) == *(W.commGrid))
1897 	{
1898 		FullyDistSpVec< IU, T_promote> Product(V.commGrid);
1899 		if(V.glen != W.glen)
1900 		{
1901 			std::cerr << "Vector dimensions don't match for EWiseMult\n";
1902 			MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
1903 		}
1904 		else
1905 		{
1906 			Product.glen = V.glen;
1907 			IU size= V.getlocnnz();
1908 			if(exclude)
1909 			{
1910 				#if defined(_OPENMP) && defined(CBLAS_EXPERIMENTAL)	// not faster than serial
1911 				int actual_splits = cblas_splits * 1;	// 1 is the parallel slackness
1912         std::vector <IU> tlosizes (actual_splits, 0);
1913         std::vector < std::vector<IU> > tlinds(actual_splits);
1914         std::vector < std::vector<T_promote> > tlnums(actual_splits);
1915 				IU tlsize = size / actual_splits;
1916 				#pragma omp parallel for //schedule(dynamic, 1)
1917 				for(IU t = 0; t < actual_splits; ++t)
1918 				{
1919 					IU tlbegin = t*tlsize;
1920 					IU tlend = (t==actual_splits-1)? size : (t+1)*tlsize;
1921 					for(IU i=tlbegin; i<tlend; ++i)
1922 					{
1923 						if(W.arr[V.ind[i]] == zero) 	// keep only those
1924 						{
1925 							tlinds[t].push_back(V.ind[i]);
1926 							tlnums[t].push_back(V.num[i]);
1927 							tlosizes[t]++;
1928 						}
1929 					}
1930 				}
1931         std::vector<IU> prefix_sum(actual_splits+1,0);
1932         std::partial_sum(tlosizes.begin(), tlosizes.end(), prefix_sum.begin()+1);
1933 				Product.ind.resize(prefix_sum[actual_splits]);
1934 				Product.num.resize(prefix_sum[actual_splits]);
1935 
1936 				#pragma omp parallel for //schedule(dynamic, 1)
1937 				for(IU t=0; t< actual_splits; ++t)
1938 				{
1939           std::copy(tlinds[t].begin(), tlinds[t].end(), Product.ind.begin()+prefix_sum[t]);
1940           std::copy(tlnums[t].begin(), tlnums[t].end(), Product.num.begin()+prefix_sum[t]);
1941 				}
1942 				#else
1943 				for(IU i=0; i<size; ++i)
1944 				{
1945 					if(W.arr[V.ind[i]] == zero)     // keep only those
1946 					{
1947                         	       		Product.ind.push_back(V.ind[i]);
1948                                 		Product.num.push_back(V.num[i]);
1949                                       	}
1950 				}
1951 				#endif
1952 			}
1953 			else
1954 			{
1955 				for(IU i=0; i<size; ++i)
1956 				{
1957 					if(W.arr[V.ind[i]] != zero) 	// keep only those
1958 					{
1959 						Product.ind.push_back(V.ind[i]);
1960 						Product.num.push_back(V.num[i] * W.arr[V.ind[i]]);
1961 					}
1962 				}
1963 			}
1964 		}
1965 		return Product;
1966 	}
1967 	else
1968 	{
1969 		std::cout << "Grids are not comparable elementwise multiplication" << std::endl;
1970 		MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
1971 		return FullyDistSpVec< IU,T_promote>();
1972 	}
1973 }
1974 
1975 
1976 /**
1977  Threaded EWiseApply. Only called internally from EWiseApply.
1978 **/
1979 template <typename RET, typename IU, typename NU1, typename NU2, typename _BinaryOperation, typename _BinaryPredicate>
EWiseApply_threaded(const FullyDistSpVec<IU,NU1> & V,const FullyDistVec<IU,NU2> & W,_BinaryOperation _binary_op,_BinaryPredicate _doOp,bool allowVNulls,NU1 Vzero,const bool useExtendedBinOp)1980 FullyDistSpVec<IU,RET> EWiseApply_threaded
1981 	(const FullyDistSpVec<IU,NU1> & V, const FullyDistVec<IU,NU2> & W , _BinaryOperation _binary_op, _BinaryPredicate _doOp, bool allowVNulls, NU1 Vzero, const bool useExtendedBinOp)
1982 {
1983 	typedef RET T_promote; //typedef typename promote_trait<NU1,NU2>::T_promote T_promote;
1984 	if(*(V.commGrid) == *(W.commGrid))
1985 	{
1986 		FullyDistSpVec< IU, T_promote> Product(V.commGrid);
1987 		if(V.TotalLength() != W.TotalLength())
1988 		{
1989 			std::ostringstream outs;
1990 			outs << "Vector dimensions don't match (" << V.TotalLength() << " vs " << W.TotalLength() << ") for EWiseApply (short version)\n";
1991 			SpParHelper::Print(outs.str());
1992 			MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
1993 		}
1994 		else
1995 		{
1996             int nthreads=1;
1997 #ifdef _OPENMP
1998 #pragma omp parallel
1999             {
2000                 nthreads = omp_get_num_threads();
2001             }
2002 #endif
2003 
2004 			Product.glen = V.glen;
2005 			IU size= W.LocArrSize();
2006 			IU spsize = V.getlocnnz();
2007 
2008             // temporary result vectors per thread
2009             std::vector<std::vector<IU>> tProductInd(nthreads);
2010             std::vector<std::vector<T_promote>> tProductVal(nthreads);
2011             IU perthread; //chunk of tProductInd or tProductVal allocated to each thread
2012             if (allowVNulls)
2013                 perthread = size/nthreads;
2014             else
2015                 perthread = spsize/nthreads;
2016 
2017 #ifdef _OPENMP
2018 #pragma omp parallel
2019 #endif
2020             {
2021                 int curthread = 0;
2022 #ifdef _OPENMP
2023                 curthread = omp_get_thread_num();
2024 #endif
2025                 IU tStartIdx = perthread * curthread;
2026                 IU tNextIdx = perthread * (curthread+1);
2027 
2028                 if (allowVNulls)
2029                 {
2030                     if(curthread == nthreads-1) tNextIdx = size;
2031 
2032                     // get sparse part for the current thread
2033                     auto it = std::lower_bound (V.ind.begin(), V.ind.end(), tStartIdx);
2034                     IU tSpIdx = (IU) std::distance(V.ind.begin(), it);
2035 
2036                     // iterate over the dense vector
2037                     for(IU tIdx=tStartIdx; tIdx < tNextIdx; ++tIdx)
2038                     {
2039                         if(tSpIdx < spsize && V.ind[tSpIdx] < tNextIdx && V.ind[tSpIdx] == tIdx)
2040                         {
2041                             if (_doOp(V.num[tSpIdx], W.arr[tIdx], false, false))
2042                             {
2043                                 tProductInd[curthread].push_back(tIdx);
2044                                 tProductVal[curthread].push_back (_binary_op(V.num[tSpIdx], W.arr[tIdx], false, false));
2045                             }
2046                             tSpIdx++;
2047                         }
2048                         else
2049                         {
2050                             if (_doOp(Vzero, W.arr[tIdx], true, false))
2051                             {
2052                                 tProductInd[curthread].push_back(tIdx);
2053                                 tProductVal[curthread].push_back (_binary_op(Vzero, W.arr[tIdx], true, false));
2054                             }
2055                         }
2056                     }
2057                 }
2058                 else // iterate over the sparse vector
2059                 {
2060                     if(curthread == nthreads-1) tNextIdx = spsize;
2061                     for(IU tSpIdx=tStartIdx; tSpIdx < tNextIdx; ++tSpIdx)
2062                     {
2063                         if (_doOp(V.num[tSpIdx], W.arr[V.ind[tSpIdx]], false, false))
2064                         {
2065 
2066                             tProductInd[curthread].push_back( V.ind[tSpIdx]);
2067                             tProductVal[curthread].push_back (_binary_op(V.num[tSpIdx], W.arr[V.ind[tSpIdx]], false, false));
2068                         }
2069                     }
2070                 }
2071             }
2072 
2073             std::vector<IU> tdisp(nthreads+1);
2074             tdisp[0] = 0;
2075             for(int i=0; i<nthreads; ++i)
2076             {
2077                 tdisp[i+1] = tdisp[i] + tProductInd[i].size();
2078             }
2079 
2080             // copy results from temporary vectors
2081             Product.ind.resize(tdisp[nthreads]);
2082             Product.num.resize(tdisp[nthreads]);
2083 
2084 #ifdef _OPENMP
2085 #pragma omp parallel
2086 #endif
2087             {
2088                 int curthread = 0;
2089 #ifdef _OPENMP
2090                 curthread = omp_get_thread_num();
2091 #endif
2092                 std::copy(tProductInd[curthread].begin(), tProductInd[curthread].end(), Product.ind.data() + tdisp[curthread]);
2093                 std::copy(tProductVal[curthread].begin() , tProductVal[curthread].end(), Product.num.data() + tdisp[curthread]);
2094             }
2095 		}
2096 		return Product;
2097 	}
2098 	else
2099 	{
2100 		std::cout << "Grids are not comparable for EWiseApply" << std::endl;
2101 		MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2102 		return FullyDistSpVec< IU,T_promote>();
2103 	}
2104 }
2105 
2106 
2107 
2108 /**
2109  * Performs an arbitrary binary operation _binary_op on the corresponding elements of two vectors with the result stored in a return vector ret.
2110  * The binary operatiation is only performed if the binary predicate _doOp returns true for those elements. Otherwise the binary operation is not
2111  * performed and ret does not contain an element at that position.
2112  * More formally the operation is defined as:
2113  * if (_doOp(V[i], W[i]))
2114  *    ret[i] = _binary_op(V[i], W[i])
2115  * else
2116  *    // ret[i] is not set
2117  * Hence _doOp can be used to implement a filter on either of the vectors.
2118  *
2119  * The above is only defined if both V[i] and W[i] exist (i.e. an intersection). To allow a union operation (ex. when V[i] doesn't exist but W[i] does)
2120  * the allowVNulls flag is set to true and the Vzero argument is used as the missing V[i] value.
2121  *
2122  * The type of each element of ret must not necessarily be related to the types of V or W, so the return type must be explicitly specified as a template parameter:
2123  * FullyDistSpVec<int, double> r = EWiseApply<double>(V, W, plus, retTrue, false, 0)
2124  **/
2125 template <typename RET, typename IU, typename NU1, typename NU2, typename _BinaryOperation, typename _BinaryPredicate>
EWiseApply(const FullyDistSpVec<IU,NU1> & V,const FullyDistVec<IU,NU2> & W,_BinaryOperation _binary_op,_BinaryPredicate _doOp,bool allowVNulls,NU1 Vzero,const bool useExtendedBinOp)2126 FullyDistSpVec<IU,RET> EWiseApply
2127 (const FullyDistSpVec<IU,NU1> & V, const FullyDistVec<IU,NU2> & W , _BinaryOperation _binary_op, _BinaryPredicate _doOp, bool allowVNulls, NU1 Vzero, const bool useExtendedBinOp)
2128 {
2129 
2130 #ifdef _OPENMP
2131     return EWiseApply_threaded<RET>(V, W, _binary_op, _doOp, allowVNulls, Vzero, useExtendedBinOp);
2132 
2133 #else
2134     typedef RET T_promote; //typedef typename promote_trait<NU1,NU2>::T_promote T_promote;
2135     if(*(V.commGrid) == *(W.commGrid))
2136     {
2137         FullyDistSpVec< IU, T_promote> Product(V.commGrid);
2138         //FullyDistVec< IU, NU1> DV (V); // Ariful: I am not sure why it was there??
2139         if(V.TotalLength() != W.TotalLength())
2140         {
2141           std::ostringstream outs;
2142             outs << "Vector dimensions don't match (" << V.TotalLength() << " vs " << W.TotalLength() << ") for EWiseApply (short version)\n";
2143             SpParHelper::Print(outs.str());
2144             MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
2145         }
2146         else
2147         {
2148             Product.glen = V.glen;
2149             IU size= W.LocArrSize();
2150             IU spsize = V.getlocnnz();
2151             IU sp_iter = 0;
2152             if (allowVNulls)
2153             {
2154                 // iterate over the dense vector
2155                 for(IU i=0; i<size; ++i)
2156                 {
2157                     if(sp_iter < spsize && V.ind[sp_iter] == i)
2158                     {
2159                         if (_doOp(V.num[sp_iter], W.arr[i], false, false))
2160                         {
2161                             Product.ind.push_back(i);
2162                             Product.num.push_back(_binary_op(V.num[sp_iter], W.arr[i], false, false));
2163                         }
2164                         sp_iter++;
2165                     }
2166                     else
2167                     {
2168                         if (_doOp(Vzero, W.arr[i], true, false))
2169                         {
2170                             Product.ind.push_back(i);
2171                             Product.num.push_back(_binary_op(Vzero, W.arr[i], true, false));
2172                         }
2173                     }
2174                 }
2175             }
2176             else
2177             {
2178                 // iterate over the sparse vector
2179                 for(sp_iter = 0; sp_iter < spsize; ++sp_iter)
2180                 {
2181                     if (_doOp(V.num[sp_iter], W.arr[V.ind[sp_iter]], false, false))
2182                     {
2183                         Product.ind.push_back(V.ind[sp_iter]);
2184                         Product.num.push_back(_binary_op(V.num[sp_iter], W.arr[V.ind[sp_iter]], false, false));
2185                     }
2186                 }
2187 
2188             }
2189         }
2190         return Product;
2191     }
2192     else
2193     {
2194       std::cout << "Grids are not comparable for EWiseApply" << std::endl;
2195         MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2196         return FullyDistSpVec< IU,T_promote>();
2197     }
2198 #endif
2199 }
2200 
2201 
2202 
2203 /**
2204  * Performs an arbitrary binary operation _binary_op on the corresponding elements of two vectors with the result stored in a return vector ret.
2205  * The binary operatiation is only performed if the binary predicate _doOp returns true for those elements. Otherwise the binary operation is not
2206  * performed and ret does not contain an element at that position.
2207  * More formally the operation is defined as:
2208  * if (_doOp(V[i], W[i]))
2209  *    ret[i] = _binary_op(V[i], W[i])
2210  * else
2211  *    // ret[i] is not set
2212  * Hence _doOp can be used to implement a filter on either of the vectors.
2213  *
2214  * The above is only defined if both V[i] and W[i] exist (i.e. an intersection). To allow a union operation (ex. when V[i] doesn't exist but W[i] does)
2215  * the allowVNulls flag is set to true and the Vzero argument is used as the missing V[i] value.
2216  * !allowVNulls && !allowWNulls => intersection
2217  * !allowVNulls &&  allowWNulls => operate on all elements of V
2218  *  allowVNulls && !allowWNulls => operate on all elements of W
2219  *  allowVNulls &&  allowWNulls => union
2220  *
2221  * The type of each element of ret must not necessarily be related to the types of V or W, so the return type must be explicitly specified as a template parameter:
2222  * FullyDistSpVec<int, double> r = EWiseApply<double>(V, W, plus, ...)
2223  * For intersection, Vzero and Wzero are irrelevant
2224  * ABAB: \todo: Should allowIntersect be "false" for all SetDifference uses?
2225 **/
2226 template <typename RET, typename IU, typename NU1, typename NU2, typename _BinaryOperation, typename _BinaryPredicate>
EWiseApply(const FullyDistSpVec<IU,NU1> & V,const FullyDistSpVec<IU,NU2> & W,_BinaryOperation _binary_op,_BinaryPredicate _doOp,bool allowVNulls,bool allowWNulls,NU1 Vzero,NU2 Wzero,const bool allowIntersect,const bool useExtendedBinOp)2227 FullyDistSpVec<IU,RET> EWiseApply
2228 	(const FullyDistSpVec<IU,NU1> & V, const FullyDistSpVec<IU,NU2> & W , _BinaryOperation _binary_op, _BinaryPredicate _doOp, bool allowVNulls, bool allowWNulls, NU1 Vzero, NU2 Wzero, const bool allowIntersect, const bool useExtendedBinOp)
2229 {
2230 
2231 	typedef RET T_promote; // typename promote_trait<NU1,NU2>::T_promote T_promote;
2232 	if(*(V.commGrid) == *(W.commGrid))
2233 	{
2234 		FullyDistSpVec< IU, T_promote> Product(V.commGrid);
2235 		if(V.glen != W.glen)
2236 		{
2237 			std::ostringstream outs;
2238 			outs << "Vector dimensions don't match (" << V.glen << " vs " << W.glen << ") for EWiseApply (full version)\n";
2239 			SpParHelper::Print(outs.str());
2240 			MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
2241 		}
2242 		else
2243 		{
2244 			Product.glen = V.glen;
2245 			typename std::vector< IU  >::const_iterator indV = V.ind.begin();
2246 			typename std::vector< NU1 >::const_iterator numV = V.num.begin();
2247 			typename std::vector< IU  >::const_iterator indW = W.ind.begin();
2248 			typename std::vector< NU2 >::const_iterator numW = W.num.begin();
2249 
2250 			while (indV < V.ind.end() && indW < W.ind.end())
2251 			{
2252 				if (*indV == *indW)
2253 				{
2254 					// overlap
2255 					if (allowIntersect)
2256 					{
2257 						if (_doOp(*numV, *numW, false, false))
2258 						{
2259 							Product.ind.push_back(*indV);
2260 							Product.num.push_back(_binary_op(*numV, *numW, false, false));
2261 						}
2262 					}
2263 					indV++; numV++;
2264 					indW++; numW++;
2265 				}
2266 				else if (*indV < *indW)
2267 				{
2268 					// V has value but W does not
2269 					if (allowWNulls)
2270 					{
2271 						if (_doOp(*numV, Wzero, false, true))
2272 						{
2273 							Product.ind.push_back(*indV);
2274 							Product.num.push_back(_binary_op(*numV, Wzero, false, true));
2275 						}
2276 					}
2277 					indV++; numV++;
2278 				}
2279 				else //(*indV > *indW)
2280 				{
2281 					// W has value but V does not
2282 					if (allowVNulls)
2283 					{
2284 						if (_doOp(Vzero, *numW, true, false))
2285 						{
2286 							Product.ind.push_back(*indW);
2287 							Product.num.push_back(_binary_op(Vzero, *numW, true, false));
2288 						}
2289 					}
2290 					indW++; numW++;
2291 				}
2292 			}
2293 			// clean up
2294 			while (allowWNulls && indV < V.ind.end())
2295 			{
2296 				if (_doOp(*numV, Wzero, false, true))
2297 				{
2298 					Product.ind.push_back(*indV);
2299 					Product.num.push_back(_binary_op(*numV, Wzero, false, true));
2300 				}
2301 				indV++; numV++;
2302 			}
2303 			while (allowVNulls && indW < W.ind.end())
2304 			{
2305 				if (_doOp(Vzero, *numW, true, false))
2306 				{
2307 					Product.ind.push_back(*indW);
2308 					Product.num.push_back(_binary_op(Vzero, *numW, true, false));
2309 				}
2310 				indW++; numW++;
2311 			}
2312 		}
2313 		return Product;
2314 	}
2315 	else
2316 	{
2317 		std::cout << "Grids are not comparable for EWiseApply" << std::endl;
2318 		MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2319 		return FullyDistSpVec< IU,T_promote>();
2320 	}
2321 }
2322 
2323 // plain callback versions
2324 template <typename RET, typename IU, typename NU1, typename NU2, typename _BinaryOperation, typename _BinaryPredicate>
EWiseApply(const FullyDistSpVec<IU,NU1> & V,const FullyDistVec<IU,NU2> & W,_BinaryOperation _binary_op,_BinaryPredicate _doOp,bool allowVNulls,NU1 Vzero)2325 FullyDistSpVec<IU,RET> EWiseApply
2326 	(const FullyDistSpVec<IU,NU1> & V, const FullyDistVec<IU,NU2> & W , _BinaryOperation _binary_op, _BinaryPredicate _doOp, bool allowVNulls, NU1 Vzero)
2327 {
2328 
2329 
2330 	return EWiseApply<RET>(V, W,
2331 					EWiseExtToPlainAdapter<RET, NU1, NU2, _BinaryOperation>(_binary_op),
2332 					EWiseExtToPlainAdapter<bool, NU1, NU2, _BinaryPredicate>(_doOp),
2333 					allowVNulls, Vzero, true);
2334 }
2335 
2336 
2337 
2338 template <typename RET, typename IU, typename NU1, typename NU2, typename _BinaryOperation, typename _BinaryPredicate>
2339 FullyDistSpVec<IU,RET> EWiseApply
2340 	(const FullyDistSpVec<IU,NU1> & V, const FullyDistSpVec<IU,NU2> & W , _BinaryOperation _binary_op, _BinaryPredicate _doOp, bool allowVNulls, bool allowWNulls, NU1 Vzero, NU2 Wzero, const bool allowIntersect = true)
2341 {
2342 	return EWiseApply<RET>(V, W,
2343 					EWiseExtToPlainAdapter<RET, NU1, NU2, _BinaryOperation>(_binary_op),
2344 					EWiseExtToPlainAdapter<bool, NU1, NU2, _BinaryPredicate>(_doOp),
2345 					allowVNulls, allowWNulls, Vzero, Wzero, allowIntersect, true);
2346 }
2347 
2348 }
2349 
2350 
2351 #endif
2352 
2353