1 /****************************************************************/
2 /* Parallel Combinatorial BLAS Library (for Graph Computations) */
3 /* version 1.6 -------------------------------------------------*/
4 /* date: 6/15/2017 ---------------------------------------------*/
5 /* authors: Ariful Azad, Aydin Buluc  --------------------------*/
6 /****************************************************************/
7 /*
8  Copyright (c) 2010-2017, The Regents of the University of California
9 
10  Permission is hereby granted, free of charge, to any person obtaining a copy
11  of this software and associated documentation files (the "Software"), to deal
12  in the Software without restriction, including without limitation the rights
13  to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14  copies of the Software, and to permit persons to whom the Software is
15  furnished to do so, subject to the following conditions:
16 
17  The above copyright notice and this permission notice shall be included in
18  all copies or substantial portions of the Software.
19 
20  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25  OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26  THE SOFTWARE.
27  */
28 
29 
30 #include "SpImpl.h"
31 #include "SpParHelper.h"
32 #include "PBBS/radixSort.h"
33 #include "Tommy/tommyhashdyn.h"
34 
35 namespace combblas {
36 
37 /**
38  * Base template version [full use of the semiring add() and multiply()]
39  * @param[in] indx { vector that practically keeps column numbers requested from A }
40  *
41  *
42  * Roughly how the below function works:
43  * Let's say our sparse vector has entries at 3, 7 and 9.
44  * FillColInds() creates a vector of pairs that contain the
45  * start and end indices (into matrix.ir and matrix.numx arrays).
46  * pair.first is the start index, pair.second is the end index.
47  *
48  * Here's how we merge these adjacencies of 3,7 and 9:
49  * We keep a heap of size 3 and push the first entries in adj{3}, adj{7}, adj{9} onto the heap wset.
50  * That happens in the first for loop.
51  *
52  * Then as we pop from the heap we push the next entry from the previously popped adjacency (i.e. matrix column).
53  * The heap ensures the output comes out sorted without using a SPA.
54  * that's why indy.back() == wset[hsize-1].key is enough to ensure proper merging.
55  **/
56 template <class SR, class IT, class NUM, class IVT, class OVT>
SpMXSpV(const Dcsc<IT,NUM> & Adcsc,int32_t mA,const int32_t * indx,const IVT * numx,int32_t veclen,std::vector<int32_t> & indy,std::vector<OVT> & numy)57 void SpImpl<SR,IT,NUM,IVT,OVT>::SpMXSpV(const Dcsc<IT,NUM> & Adcsc, int32_t mA, const int32_t * indx, const IVT * numx, int32_t veclen,
58 			std::vector<int32_t> & indy, std::vector< OVT > & numy)
59 {
60 	int32_t hsize = 0;
61 	// colinds dereferences A.ir (valid from colinds[].first to colinds[].second)
62 	std::vector< std::pair<IT,IT> > colinds( (IT) veclen);
63 	Adcsc.FillColInds(indx, (IT) veclen, colinds, NULL, 0);	// csize is irrelevant if aux is NULL
64 
65 	if(sizeof(NUM) > sizeof(OVT))	// ABAB: include a filtering based runtime choice as well?
66 	{
67 		HeapEntry<IT, OVT> * wset = new HeapEntry<IT, OVT>[veclen];
68 		for(IT j =0; j< veclen; ++j)		// create the initial heap
69 		{
70 			while(colinds[j].first != colinds[j].second )	// iterate until finding the first entry within this column that passes the filter
71 			{
72 				OVT mrhs = SR::multiply(Adcsc.numx[colinds[j].first], numx[j]);
73 				if(SR::returnedSAID())
74 				{
75 					++(colinds[j].first);	// increment the active row index within the jth column
76 				}
77 				else
78 				{
79 					wset[hsize++] = HeapEntry< IT,OVT > ( Adcsc.ir[colinds[j].first], j, mrhs);
80 					break;	// this column successfully inserted an entry to the heap
81 				}
82 			}
83 		}
84 		std::make_heap(wset, wset+hsize);
85 		while(hsize > 0)
86 		{
87 			std::pop_heap(wset, wset + hsize);         	// result is stored in wset[hsize-1]
88 			IT locv = wset[hsize-1].runr;		// relative location of the nonzero in sparse column vector
89 			if((!indy.empty()) && indy.back() == wset[hsize-1].key)
90 			{
91 				numy.back() = SR::add(numy.back(), wset[hsize-1].num);
92 			}
93 			else
94 			{
95 				indy.push_back( (int32_t) wset[hsize-1].key);
96 				numy.push_back(wset[hsize-1].num);
97 			}
98 			bool pushed = false;
99 			// invariant: if ++(colinds[locv].first) == colinds[locv].second, then locv will not appear again in the heap
100 			while ( (++(colinds[locv].first)) != colinds[locv].second )	// iterate until finding another passing entry
101 			{
102 				OVT mrhs =  SR::multiply(Adcsc.numx[colinds[locv].first], numx[locv]);
103 				if(!SR::returnedSAID())
104                                 {
105 					wset[hsize-1].key = Adcsc.ir[colinds[locv].first];
106 					wset[hsize-1].num = mrhs;
107 					std::push_heap(wset, wset+hsize);	// runr stays the same
108 					pushed = true;
109 					break;
110 				}
111 			}
112 			if(!pushed)	--hsize;
113 		}
114 		delete [] wset;
115 	}
116 
117 	else
118 	{
119 		HeapEntry<IT, NUM> * wset = new HeapEntry<IT, NUM>[veclen];
120 		for(IT j =0; j< veclen; ++j)		// create the initial heap
121 		{
122 			if(colinds[j].first != colinds[j].second)	// current != end
123 			{
124 				wset[hsize++] = HeapEntry< IT,NUM > ( Adcsc.ir[colinds[j].first], j, Adcsc.numx[colinds[j].first]);  // HeapEntry(key, run, num)
125 			}
126 		}
127 		std::make_heap(wset, wset+hsize);
128 		while(hsize > 0)
129 		{
130 			std::pop_heap(wset, wset + hsize);         	// result is stored in wset[hsize-1]
131 			IT locv = wset[hsize-1].runr;		// relative location of the nonzero in sparse column vector
132 			OVT mrhs = SR::multiply(wset[hsize-1].num, numx[locv]);
133 
134 			if (!SR::returnedSAID())
135 			{
136 				if((!indy.empty()) && indy.back() == wset[hsize-1].key)
137 				{
138 					numy.back() = SR::add(numy.back(), mrhs);
139 				}
140 				else
141 				{
142 					indy.push_back( (int32_t) wset[hsize-1].key);
143 					numy.push_back(mrhs);
144 				}
145 			}
146 
147 			if( (++(colinds[locv].first)) != colinds[locv].second)	// current != end
148 			{
149 				// runr stays the same !
150 				wset[hsize-1].key = Adcsc.ir[colinds[locv].first];
151 				wset[hsize-1].num = Adcsc.numx[colinds[locv].first];
152 				std::push_heap(wset, wset+hsize);
153 			}
154 			else		--hsize;
155 		}
156 		delete [] wset;
157 	}
158 }
159 
160 
161 
162 /**
163   * One of the two versions of SpMXSpV with on boolean matrix [uses only Semiring::add()]
164   * This version is likely to be more memory efficient than the other one (the one that uses preallocated memory buffers)
165   * Because here we don't use a dense accumulation vector but a heap. It will probably be slower though.
166 **/
167 template <class SR, class IT, class IVT, class OVT>
SpMXSpV(const Dcsc<IT,bool> & Adcsc,int32_t mA,const int32_t * indx,const IVT * numx,int32_t veclen,std::vector<int32_t> & indy,std::vector<OVT> & numy)168 void SpImpl<SR,IT,bool,IVT,OVT>::SpMXSpV(const Dcsc<IT,bool> & Adcsc, int32_t mA, const int32_t * indx, const IVT * numx, int32_t veclen,
169 			std::vector<int32_t> & indy, std::vector<OVT> & numy)
170 {
171 	IT inf = std::numeric_limits<IT>::min();
172 	IT sup = std::numeric_limits<IT>::max();
173 	KNHeap< IT, IVT > sHeap(sup, inf); 	// max size: flops
174 
175 	IT k = 0; 	// index to indx vector
176 	IT i = 0; 	// index to columns of matrix
177 	while(i< Adcsc.nzc && k < veclen)
178 	{
179 		if(Adcsc.jc[i] < indx[k]) ++i;
180 		else if(indx[k] < Adcsc.jc[i]) ++k;
181 		else
182 		{
183 			for(IT j=Adcsc.cp[i]; j < Adcsc.cp[i+1]; ++j)	// for all nonzeros in this column
184 			{
185 				sHeap.insert(Adcsc.ir[j], numx[k]);	// row_id, num
186 			}
187 			++i;
188 			++k;
189 		}
190 	}
191 
192 	IT row;
193 	IVT num;
194 	if(sHeap.getSize() > 0)
195 	{
196 		sHeap.deleteMin(&row, &num);
197 		indy.push_back( (int32_t) row);
198 		numy.push_back( num );
199 	}
200 	while(sHeap.getSize() > 0)
201 	{
202 		sHeap.deleteMin(&row, &num);
203 		if(indy.back() == row)
204 		{
205 			numy.back() = SR::add(numy.back(), num);
206 		}
207 		else
208 		{
209 			indy.push_back( (int32_t) row);
210 			numy.push_back(num);
211 		}
212 	}
213 }
214 
215 
216 /**
217  * @param[in,out]   indy,numy,cnts 	{preallocated arrays to be filled}
218  * @param[in] 		dspls	{displacements to preallocated indy,numy buffers}
219  * This version determines the receiving column neighbor and adjust the indices to the receiver's local index
220  * If IVT and OVT are different, then OVT should allow implicit conversion from IVT
221 **/
222 template <typename SR, typename IT, typename IVT, class OVT>
SpMXSpV(const Dcsc<IT,bool> & Adcsc,int32_t mA,const int32_t * indx,const IVT * numx,int32_t veclen,int32_t * indy,OVT * numy,int * cnts,int * dspls,int p_c)223 void SpImpl<SR,IT,bool,IVT,OVT>::SpMXSpV(const Dcsc<IT,bool> & Adcsc, int32_t mA, const int32_t * indx, const IVT * numx, int32_t veclen,
224 			int32_t * indy, OVT * numy, int * cnts, int * dspls, int p_c)
225 {
226 	OVT * localy = new OVT[mA];
227 	BitMap isthere(mA);
228 	std::vector< std::vector<int32_t> > nzinds(p_c);	// nonzero indices
229 
230 	int32_t perproc = mA / p_c;
231 	int32_t k = 0; 	// index to indx vector
232 	IT i = 0; 	// index to columns of matrix
233 	while(i< Adcsc.nzc && k < veclen)
234 	{
235 		if(Adcsc.jc[i] < indx[k]) ++i;
236 		else if(indx[k] < Adcsc.jc[i]) ++k;
237 		else
238 		{
239 			for(IT j=Adcsc.cp[i]; j < Adcsc.cp[i+1]; ++j)	// for all nonzeros in this column
240 			{
241 				int32_t rowid = (int32_t) Adcsc.ir[j];
242 				if(!isthere.get_bit(rowid))
243 				{
244 					int32_t owner = std::min(rowid / perproc, static_cast<int32_t>(p_c-1));
245 					localy[rowid] = numx[k];	// initial assignment, requires implicit conversion if IVT != OVT
246 					nzinds[owner].push_back(rowid);
247                     isthere.set_bit(rowid);
248 				}
249 				else
250 				{
251 					localy[rowid] = SR::add(localy[rowid], numx[k]);
252 				}
253 			}
254 			++i;
255 			++k;
256 		}
257 	}
258 
259 	for(int p = 0; p< p_c; ++p)
260 	{
261 		sort(nzinds[p].begin(), nzinds[p].end());
262 		cnts[p] = nzinds[p].size();
263 		int32_t * locnzinds = &nzinds[p][0];
264 		int32_t offset = perproc * p;
265 		for(int i=0; i< cnts[p]; ++i)
266 		{
267 			indy[dspls[p]+i] = locnzinds[i] - offset;	// convert to local offset
268 			numy[dspls[p]+i] = localy[locnzinds[i]];
269 		}
270 	}
271 	delete [] localy;
272 }
273 
274 
275 // this version is still very good with splitters
276 template <typename SR, typename IT, typename IVT, typename OVT>
SpMXSpV_ForThreading(const Dcsc<IT,bool> & Adcsc,int32_t mA,const int32_t * indx,const IVT * numx,int32_t veclen,std::vector<int32_t> & indy,std::vector<OVT> & numy,int32_t offset)277 void SpImpl<SR,IT,bool,IVT,OVT>::SpMXSpV_ForThreading(const Dcsc<IT,bool> & Adcsc, int32_t mA, const int32_t * indx, const IVT * numx, int32_t veclen,
278                                                       std::vector<int32_t> & indy, std::vector<OVT> & numy, int32_t offset)
279 {
280     std::vector<OVT> localy(mA);
281     BitMap isthere(mA);
282     std::vector<uint32_t> nzinds;	// nonzero indices
283 
284     SpMXSpV_ForThreading(Adcsc, mA, indx, numx, veclen, indy, numy, offset, localy, isthere, nzinds);
285 }
286 
287 
288 
289 //! We can safely use a SPA here because Adcsc is short (::RowSplit() has already been called on it)
290 template <typename SR, typename IT, typename IVT, typename OVT>
SpMXSpV_ForThreading(const Dcsc<IT,bool> & Adcsc,int32_t mA,const int32_t * indx,const IVT * numx,int32_t veclen,std::vector<int32_t> & indy,std::vector<OVT> & numy,int32_t offset,std::vector<OVT> & localy,BitMap & isthere,std::vector<uint32_t> & nzinds)291 void SpImpl<SR,IT,bool,IVT,OVT>::SpMXSpV_ForThreading(const Dcsc<IT,bool> & Adcsc, int32_t mA, const int32_t * indx, const IVT * numx, int32_t veclen, std::vector<int32_t> & indy, std::vector<OVT> & numy, int32_t offset, std::vector<OVT> & localy, BitMap & isthere, std::vector<uint32_t> & nzinds)
292 {
293 	// The following piece of code is not general, but it's more memory efficient than FillColInds
294 	int32_t k = 0; 	// index to indx vector
295 	IT i = 0; 	// index to columns of matrix
296 	while(i< Adcsc.nzc && k < veclen)
297 	{
298 		if(Adcsc.jc[i] < indx[k]) ++i;
299 		else if(indx[k] < Adcsc.jc[i]) ++k;
300 		else
301 		{
302 			for(IT j=Adcsc.cp[i]; j < Adcsc.cp[i+1]; ++j)	// for all nonzeros in this column
303 			{
304 				uint32_t rowid = (uint32_t) Adcsc.ir[j];
305 				if(!isthere.get_bit(rowid))
306 				{
307 					localy[rowid] = numx[k];	// initial assignment
308 					nzinds.push_back(rowid);
309 					isthere.set_bit(rowid);
310 				}
311 				else
312 				{
313 					localy[rowid] = SR::add(localy[rowid], numx[k]);
314 				}
315 			}
316 			++i; ++k;
317 		}
318 	}
319     int nnzy = nzinds.size();
320     integerSort(nzinds.data(), nnzy);
321 	indy.resize(nnzy);
322 	numy.resize(nnzy);
323 	for(int i=0; i< nnzy; ++i)
324 	{
325 		indy[i] = nzinds[i] + offset;	// return column-global index and let gespmv determine the receiver's local index
326 		numy[i] = localy[nzinds[i]];
327 	}
328 }
329 
330 
331 
332 
333 
334 /**
335  * SpMXSpV with HeapSort
336  * Simply insert entries from columns corresponsing to nonzeros of the input vector into a minHeap
337  * Then extract entries from the minHeap
338  * Complexity: O(flops*log(flops))
339  * offset is the offset of indices in the matrix in case the matrix is split
340  * This version is likely to be more memory efficient than the other one (the one that uses preallocated memory buffers)
341  * Because here we don't use a dense accumulation vector but a heap. It will probably be slower though.
342  **/
343 
344 template <typename SR, typename IT, typename NT, typename IVT, typename OVT>
SpMXSpV_HeapSort(const Csc<IT,NT> & Acsc,int32_t mA,const int32_t * indx,const IVT * numx,int32_t veclen,std::vector<int32_t> & indy,std::vector<OVT> & numy,int32_t offset)345 void SpMXSpV_HeapSort(const Csc<IT,NT> & Acsc, int32_t mA, const int32_t * indx, const IVT * numx, int32_t veclen, std::vector<int32_t> & indy, std::vector<OVT> & numy, int32_t offset)
346 {
347     IT inf = std::numeric_limits<IT>::min();
348     IT sup = std::numeric_limits<IT>::max();
349     KNHeap< IT, OVT > sHeap(sup, inf);
350 
351 
352     for (int32_t k = 0; k < veclen; ++k)
353     {
354         IT colid = indx[k];
355         for(IT j=Acsc.jc[colid]; j < Acsc.jc[colid+1]; ++j)
356         {
357             OVT val = SR::multiply( Acsc.num[j], numx[k]);
358             sHeap.insert(Acsc.ir[j], val);
359         }
360     }
361 
362     IT row;
363     OVT num;
364     if(sHeap.getSize() > 0)
365     {
366         sHeap.deleteMin(&row, &num);
367         row += offset;
368         indy.push_back( (int32_t) row);
369         numy.push_back( num );
370     }
371     while(sHeap.getSize() > 0)
372     {
373         sHeap.deleteMin(&row, &num);
374         row += offset;
375         if(indy.back() == row)
376         {
377             numy.back() = SR::add(numy.back(), num);
378         }
379         else
380         {
381             indy.push_back( (int32_t) row);
382             numy.push_back(num);
383         }
384     }
385 }
386 
387 
388 
389 template <typename SR, typename IT, typename NT, typename IVT, typename OVT>
SpMXSpV_Bucket(const Csc<IT,NT> & Acsc,int32_t mA,const int32_t * indx,const IVT * numx,int32_t veclen,std::vector<int32_t> & indy,std::vector<OVT> & numy,PreAllocatedSPA<OVT> & SPA)390 void SpMXSpV_Bucket(const Csc<IT,NT> & Acsc, int32_t mA, const int32_t * indx, const IVT * numx, int32_t veclen,
391                          std::vector<int32_t> & indy, std::vector< OVT > & numy, PreAllocatedSPA<OVT> & SPA)
392 {
393     if(veclen==0)
394         return;
395 
396     double tstart = MPI_Wtime();
397     int nthreads=1;
398     int rowSplits = SPA.buckets; // SPA must be initialized as checked in SpImpl.h
399 #ifdef _OPENMP
400 #pragma omp parallel
401     {
402         nthreads = omp_get_num_threads();
403     }
404 #endif
405     if(rowSplits < nthreads)
406     {
407         std::ostringstream outs;
408         outs << "Warning in SpMXSpV_Bucket: " << rowSplits << " buckets are supplied for " << nthreads << " threads\n";
409         outs << "4 times the number of threads are recommended when creating PreAllocatedSPA\n";
410         SpParHelper::Print(outs.str());
411     }
412 
413     int32_t rowPerSplit = mA / rowSplits;
414 
415 
416     //------------------------------------------------------
417     // Step1: count the nnz in each rowsplit of the matrix,
418     // because we don't want to waste memory
419     // False sharing is not a big problem because it is written outside of the main loop
420     //------------------------------------------------------
421 
422     std::vector<std::vector<int32_t>> bSize(rowSplits, std::vector<int32_t> ( rowSplits, 0));
423     std::vector<std::vector<int32_t>> bOffset(rowSplits, std::vector<int32_t> ( rowSplits, 0));
424     std::vector<int32_t> sendSize(rowSplits);
425     double t0, t1, t2, t3, t4;
426 #ifdef BENCHMARK_SPMSPV
427     t0 = MPI_Wtime();
428 #endif
429 
430 #ifdef _OPENMP
431 #pragma omp parallel for schedule(dynamic, 1)
432 #endif
433     for(int b=0; b<rowSplits; b++)
434     {
435         // evenly balance nnz of x among threads
436         int perBucket = veclen/rowSplits;
437         int spill = veclen%rowSplits;
438         int32_t xstart = b*perBucket + std::min(spill, b);
439         int32_t xend = (b+1)*perBucket + std::min(spill, b+1);
440         std::vector<int32_t> temp(rowSplits,0);
441         for (int32_t i = xstart; i < xend; ++i)
442         {
443             IT colid = indx[i];
444             for(IT j=Acsc.jc[colid]; j < Acsc.jc[colid+1]; ++j)
445             {
446                 uint32_t rowid = (uint32_t) Acsc.ir[j];
447                 int32_t splitId = rowSplits-1;
448                 if(rowPerSplit!=0) splitId = (rowid/rowPerSplit > rowSplits-1) ? rowSplits-1 : rowid/rowPerSplit;
449                 //bSize[b][splitId]++;
450                 temp[splitId]++;
451             }
452         }
453         int32_t totSend = 0;
454         for(int k=0; k<rowSplits; k++)
455         {
456             bSize[b][k] = temp[k];
457             totSend += temp[k];
458         }
459         sendSize[b] = totSend;
460     }
461 
462 
463 #ifdef BENCHMARK_SPMSPV
464     t1 = MPI_Wtime() - t0;
465     t0 = MPI_Wtime();
466 #endif
467 
468 
469 
470     // keep it sequential to avoid fault sharing
471     for(int i=1; i<rowSplits; i++)
472     {
473         for(int j=0; j<rowSplits; j++)
474         {
475             bOffset[i][j] = bOffset[i-1][j] + bSize[i-1][j];
476             bSize[i-1][j] = 0;
477         }
478     }
479 
480     std::vector<uint32_t> disp(rowSplits+1);
481     int maxBucketSize = -1; // maximum size of a bucket
482     disp[0] = 0;
483     for(int j=0; j<rowSplits; j++)
484     {
485         int thisBucketSize = bOffset[rowSplits-1][j] + bSize[rowSplits-1][j];
486         disp[j+1] = disp[j] + thisBucketSize;
487         bSize[rowSplits-1][j] = 0;
488         maxBucketSize = std::max(thisBucketSize, maxBucketSize);
489     }
490 
491 
492 
493 #ifdef BENCHMARK_SPMSPV
494     double  tseq = MPI_Wtime() - t0;
495 #endif
496     //------------------------------------------------------
497     // Step2: The matrix is traversed column by column and
498     // nonzeros each rowsplit of the matrix are compiled together
499     //------------------------------------------------------
500     // Thread private buckets should fit in L2 cache
501 #ifndef L2_CACHE_SIZE
502 #define L2_CACHE_SIZE 256000
503 #endif
504     int THREAD_BUF_LEN = 256;
505     int itemsize = sizeof(int32_t) + sizeof(OVT);
506     while(true)
507     {
508         int bufferMem = THREAD_BUF_LEN * rowSplits * itemsize + 8 * rowSplits;
509         if(bufferMem>L2_CACHE_SIZE ) THREAD_BUF_LEN/=2;
510         else break;
511     }
512     THREAD_BUF_LEN = std::min(maxBucketSize+1,THREAD_BUF_LEN);
513 
514 #ifdef BENCHMARK_SPMSPV
515     t0 = MPI_Wtime();
516 #endif
517 
518 #ifdef _OPENMP
519 #pragma omp parallel
520 #endif
521     {
522         int32_t* tIndSplitA = new int32_t[rowSplits*THREAD_BUF_LEN];
523         OVT* tNumSplitA = new OVT[rowSplits*THREAD_BUF_LEN];
524         std::vector<int32_t> tBucketSize(rowSplits);
525         std::vector<int32_t> tOffset(rowSplits);
526 #ifdef _OPENMP
527 #pragma omp for schedule(dynamic,1)
528 #endif
529         for(int b=0; b<rowSplits; b++)
530         {
531 
532             std::fill(tBucketSize.begin(), tBucketSize.end(), 0);
533             std::fill(tOffset.begin(), tOffset.end(), 0);
534             int perBucket = veclen/rowSplits;
535             int spill = veclen%rowSplits;
536             int32_t xstart = b*perBucket + std::min(spill, b);
537             int32_t xend = (b+1)*perBucket + std::min(spill, b+1);
538 
539             for (int32_t i = xstart; i < xend; ++i)
540             {
541                 IT colid = indx[i];
542                 for(IT j=Acsc.jc[colid]; j < Acsc.jc[colid+1]; ++j)
543                 {
544                     OVT val = SR::multiply( Acsc.num[j], numx[i]);
545                     uint32_t rowid = (uint32_t) Acsc.ir[j];
546                     int32_t splitId = rowSplits-1;
547                     if(rowPerSplit!=0) splitId = (rowid/rowPerSplit > rowSplits-1) ? rowSplits-1 : rowid/rowPerSplit;
548                     if (tBucketSize[splitId] < THREAD_BUF_LEN)
549                     {
550                         tIndSplitA[splitId*THREAD_BUF_LEN + tBucketSize[splitId]] = rowid;
551                         tNumSplitA[splitId*THREAD_BUF_LEN  + tBucketSize[splitId]++] = val;
552                     }
553                     else
554                     {
555                         std::copy(tIndSplitA + splitId*THREAD_BUF_LEN, tIndSplitA + (splitId+1)*THREAD_BUF_LEN, &SPA.indSplitA[disp[splitId] + bOffset[b][splitId]] + tOffset[splitId]);
556                         std::copy(tNumSplitA + splitId*THREAD_BUF_LEN, tNumSplitA + (splitId+1)*THREAD_BUF_LEN, &SPA.numSplitA[disp[splitId] + bOffset[b][splitId]] + tOffset[splitId]);
557                         tIndSplitA[splitId*THREAD_BUF_LEN] = rowid;
558                         tNumSplitA[splitId*THREAD_BUF_LEN] = val;
559                         tOffset[splitId] += THREAD_BUF_LEN ;
560                         tBucketSize[splitId] = 1;
561                     }
562                 }
563             }
564 
565             for(int splitId=0; splitId<rowSplits; ++splitId)
566             {
567                 if(tBucketSize[splitId]>0)
568                 {
569                     std::copy(tIndSplitA + splitId*THREAD_BUF_LEN, tIndSplitA + splitId*THREAD_BUF_LEN + tBucketSize[splitId], &SPA.indSplitA[disp[splitId] + bOffset[b][splitId]] + tOffset[splitId]);
570                     std::copy(tNumSplitA + splitId*THREAD_BUF_LEN, tNumSplitA + splitId*THREAD_BUF_LEN + tBucketSize[splitId], &SPA.numSplitA[disp[splitId] + bOffset[b][splitId]] + tOffset[splitId]);
571                 }
572             }
573         }
574         delete [] tIndSplitA;
575         delete [] tNumSplitA;
576     }
577 
578 #ifdef BENCHMARK_SPMSPV
579     t2 = MPI_Wtime() - t0;
580     t0 = MPI_Wtime();
581 #endif
582     std::vector<uint32_t> nzInRowSplits(rowSplits);
583     uint32_t* nzinds = new uint32_t[disp[rowSplits]];
584 
585 #ifdef _OPENMP
586 #pragma omp parallel for schedule(dynamic,1)
587 #endif
588     for(int rs=0; rs<rowSplits; ++rs)
589     {
590 
591         for(int i=disp[rs]; i<disp[rs+1] ; i++)
592         {
593             int32_t lrowid = SPA.indSplitA[i] - rs * rowPerSplit;
594             SPA.V_isthereBool[rs][lrowid] = false;
595         }
596         uint32_t tMergeDisp = disp[rs];
597         for(int i=disp[rs]; i<disp[rs+1] ; i++)
598         {
599             int32_t rowid = SPA.indSplitA[i];
600             int32_t lrowid = rowid - rs * rowPerSplit;
601             if(!SPA.V_isthereBool[rs][lrowid])// there is no conflict across threads
602             {
603                 SPA.V_localy[0][rowid] = SPA.numSplitA[i];
604                 nzinds[tMergeDisp++] = rowid;
605                 SPA.V_isthereBool[rs][lrowid]=true;
606             }
607             else
608             {
609                 SPA.V_localy[0][rowid] = SR::add(SPA.V_localy[0][rowid], SPA.numSplitA[i]);
610             }
611         }
612 
613         integerSort(nzinds + disp[rs], tMergeDisp - disp[rs]);
614         nzInRowSplits[rs] = tMergeDisp - disp[rs];
615 
616     }
617 
618 #ifdef BENCHMARK_SPMSPV
619     t3 = MPI_Wtime() - t0;
620 #endif
621     // prefix sum
622     std::vector<uint32_t> dispRowSplits(rowSplits+1);
623     dispRowSplits[0] = 0;
624     for(int i=0; i<rowSplits; i++)
625     {
626         dispRowSplits[i+1] = dispRowSplits[i] + nzInRowSplits[i];
627     }
628 
629 #ifdef BENCHMARK_SPMSPV
630     t0 = MPI_Wtime();
631 #endif
632     int nnzy = dispRowSplits[rowSplits];
633     indy.resize(nnzy);
634     numy.resize(nnzy);
635 #ifdef BENCHMARK_SPMSPV
636     tseq = MPI_Wtime() - t0;
637     t0 = MPI_Wtime();
638 #endif
639 
640     int  maxNnzInSplit = *std::max_element(nzInRowSplits.begin(),nzInRowSplits.end());
641     THREAD_BUF_LEN = std::min(maxNnzInSplit+1,256);
642 #ifdef _OPENMP
643 #pragma omp parallel
644 #endif
645     {
646         OVT* tnumy = new OVT [THREAD_BUF_LEN];
647         int32_t* tindy = new int32_t [THREAD_BUF_LEN];
648        	int curSize, tdisp;
649 #ifdef _OPENMP
650 #pragma omp for schedule(dynamic,1)
651 #endif
652         for(int rs=0; rs<rowSplits; rs++)
653         {
654             curSize = 0;
655             tdisp = 0;
656             uint32_t * thisind = nzinds + disp[rs];
657             std::copy(nzinds+disp[rs], nzinds+disp[rs]+nzInRowSplits[rs], indy.begin()+dispRowSplits[rs]);
658             for(int j=0; j<nzInRowSplits[rs]; j++)
659             {
660 
661                 if ( curSize < THREAD_BUF_LEN)
662                 {
663                     tnumy[curSize++] = SPA.V_localy[0][thisind[j]];
664                 }
665                 else
666                 {
667                     std::copy(tnumy, tnumy+curSize, numy.begin()+dispRowSplits[rs]+tdisp);
668                     tdisp += curSize;
669                     tnumy[0] = SPA.V_localy[0][thisind[j]];
670                     curSize = 1;
671                 }
672             }
673             if ( curSize > 0)
674             {
675                 std::copy(tnumy, tnumy+curSize, numy.begin()+dispRowSplits[rs]+tdisp);
676             }
677         }
678         delete [] tnumy;
679         delete [] tindy;
680     }
681 
682 
683 #ifdef BENCHMARK_SPMSPV
684     t4 = MPI_Wtime() - t0;
685 #endif
686 
687     delete[] nzinds;
688 
689 
690 
691 #ifdef BENCHMARK_SPMSPV
692     double tall = MPI_Wtime() - tstart;
693     std::ostringstream outs1;
694     outs1 << "Time breakdown of SpMSpV-bucket." << std::endl;
695     outs1 << "Estimate buckets: "<< t1 << " Bucketing: " << t2 << " SPA-merge: " << t3 << " Output: " << t4  << " Total: "<< tall << std::endl;
696     SpParHelper::Print(outs1.str());
697 #endif
698 
699 }
700 
701 }
702