1 /****************************************************************/
2 /* Parallel Combinatorial BLAS Library (for Graph Computations) */
3 /* version 1.6 -------------------------------------------------*/
4 /* date: 6/15/2017 ---------------------------------------------*/
5 /* authors: Ariful Azad, Aydin Buluc --------------------------*/
6 /****************************************************************/
7 /*
8 Copyright (c) 2010-2017, The Regents of the University of California
9
10 Permission is hereby granted, free of charge, to any person obtaining a copy
11 of this software and associated documentation files (the "Software"), to deal
12 in the Software without restriction, including without limitation the rights
13 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 copies of the Software, and to permit persons to whom the Software is
15 furnished to do so, subject to the following conditions:
16
17 The above copyright notice and this permission notice shall be included in
18 all copies or substantial portions of the Software.
19
20 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26 THE SOFTWARE.
27 */
28
29
30 #include "SpImpl.h"
31 #include "SpParHelper.h"
32 #include "PBBS/radixSort.h"
33 #include "Tommy/tommyhashdyn.h"
34
35 namespace combblas {
36
37 /**
38 * Base template version [full use of the semiring add() and multiply()]
39 * @param[in] indx { vector that practically keeps column numbers requested from A }
40 *
41 *
42 * Roughly how the below function works:
43 * Let's say our sparse vector has entries at 3, 7 and 9.
44 * FillColInds() creates a vector of pairs that contain the
45 * start and end indices (into matrix.ir and matrix.numx arrays).
46 * pair.first is the start index, pair.second is the end index.
47 *
48 * Here's how we merge these adjacencies of 3,7 and 9:
49 * We keep a heap of size 3 and push the first entries in adj{3}, adj{7}, adj{9} onto the heap wset.
50 * That happens in the first for loop.
51 *
52 * Then as we pop from the heap we push the next entry from the previously popped adjacency (i.e. matrix column).
53 * The heap ensures the output comes out sorted without using a SPA.
54 * that's why indy.back() == wset[hsize-1].key is enough to ensure proper merging.
55 **/
56 template <class SR, class IT, class NUM, class IVT, class OVT>
SpMXSpV(const Dcsc<IT,NUM> & Adcsc,int32_t mA,const int32_t * indx,const IVT * numx,int32_t veclen,std::vector<int32_t> & indy,std::vector<OVT> & numy)57 void SpImpl<SR,IT,NUM,IVT,OVT>::SpMXSpV(const Dcsc<IT,NUM> & Adcsc, int32_t mA, const int32_t * indx, const IVT * numx, int32_t veclen,
58 std::vector<int32_t> & indy, std::vector< OVT > & numy)
59 {
60 int32_t hsize = 0;
61 // colinds dereferences A.ir (valid from colinds[].first to colinds[].second)
62 std::vector< std::pair<IT,IT> > colinds( (IT) veclen);
63 Adcsc.FillColInds(indx, (IT) veclen, colinds, NULL, 0); // csize is irrelevant if aux is NULL
64
65 if(sizeof(NUM) > sizeof(OVT)) // ABAB: include a filtering based runtime choice as well?
66 {
67 HeapEntry<IT, OVT> * wset = new HeapEntry<IT, OVT>[veclen];
68 for(IT j =0; j< veclen; ++j) // create the initial heap
69 {
70 while(colinds[j].first != colinds[j].second ) // iterate until finding the first entry within this column that passes the filter
71 {
72 OVT mrhs = SR::multiply(Adcsc.numx[colinds[j].first], numx[j]);
73 if(SR::returnedSAID())
74 {
75 ++(colinds[j].first); // increment the active row index within the jth column
76 }
77 else
78 {
79 wset[hsize++] = HeapEntry< IT,OVT > ( Adcsc.ir[colinds[j].first], j, mrhs);
80 break; // this column successfully inserted an entry to the heap
81 }
82 }
83 }
84 std::make_heap(wset, wset+hsize);
85 while(hsize > 0)
86 {
87 std::pop_heap(wset, wset + hsize); // result is stored in wset[hsize-1]
88 IT locv = wset[hsize-1].runr; // relative location of the nonzero in sparse column vector
89 if((!indy.empty()) && indy.back() == wset[hsize-1].key)
90 {
91 numy.back() = SR::add(numy.back(), wset[hsize-1].num);
92 }
93 else
94 {
95 indy.push_back( (int32_t) wset[hsize-1].key);
96 numy.push_back(wset[hsize-1].num);
97 }
98 bool pushed = false;
99 // invariant: if ++(colinds[locv].first) == colinds[locv].second, then locv will not appear again in the heap
100 while ( (++(colinds[locv].first)) != colinds[locv].second ) // iterate until finding another passing entry
101 {
102 OVT mrhs = SR::multiply(Adcsc.numx[colinds[locv].first], numx[locv]);
103 if(!SR::returnedSAID())
104 {
105 wset[hsize-1].key = Adcsc.ir[colinds[locv].first];
106 wset[hsize-1].num = mrhs;
107 std::push_heap(wset, wset+hsize); // runr stays the same
108 pushed = true;
109 break;
110 }
111 }
112 if(!pushed) --hsize;
113 }
114 delete [] wset;
115 }
116
117 else
118 {
119 HeapEntry<IT, NUM> * wset = new HeapEntry<IT, NUM>[veclen];
120 for(IT j =0; j< veclen; ++j) // create the initial heap
121 {
122 if(colinds[j].first != colinds[j].second) // current != end
123 {
124 wset[hsize++] = HeapEntry< IT,NUM > ( Adcsc.ir[colinds[j].first], j, Adcsc.numx[colinds[j].first]); // HeapEntry(key, run, num)
125 }
126 }
127 std::make_heap(wset, wset+hsize);
128 while(hsize > 0)
129 {
130 std::pop_heap(wset, wset + hsize); // result is stored in wset[hsize-1]
131 IT locv = wset[hsize-1].runr; // relative location of the nonzero in sparse column vector
132 OVT mrhs = SR::multiply(wset[hsize-1].num, numx[locv]);
133
134 if (!SR::returnedSAID())
135 {
136 if((!indy.empty()) && indy.back() == wset[hsize-1].key)
137 {
138 numy.back() = SR::add(numy.back(), mrhs);
139 }
140 else
141 {
142 indy.push_back( (int32_t) wset[hsize-1].key);
143 numy.push_back(mrhs);
144 }
145 }
146
147 if( (++(colinds[locv].first)) != colinds[locv].second) // current != end
148 {
149 // runr stays the same !
150 wset[hsize-1].key = Adcsc.ir[colinds[locv].first];
151 wset[hsize-1].num = Adcsc.numx[colinds[locv].first];
152 std::push_heap(wset, wset+hsize);
153 }
154 else --hsize;
155 }
156 delete [] wset;
157 }
158 }
159
160
161
162 /**
163 * One of the two versions of SpMXSpV with on boolean matrix [uses only Semiring::add()]
164 * This version is likely to be more memory efficient than the other one (the one that uses preallocated memory buffers)
165 * Because here we don't use a dense accumulation vector but a heap. It will probably be slower though.
166 **/
167 template <class SR, class IT, class IVT, class OVT>
SpMXSpV(const Dcsc<IT,bool> & Adcsc,int32_t mA,const int32_t * indx,const IVT * numx,int32_t veclen,std::vector<int32_t> & indy,std::vector<OVT> & numy)168 void SpImpl<SR,IT,bool,IVT,OVT>::SpMXSpV(const Dcsc<IT,bool> & Adcsc, int32_t mA, const int32_t * indx, const IVT * numx, int32_t veclen,
169 std::vector<int32_t> & indy, std::vector<OVT> & numy)
170 {
171 IT inf = std::numeric_limits<IT>::min();
172 IT sup = std::numeric_limits<IT>::max();
173 KNHeap< IT, IVT > sHeap(sup, inf); // max size: flops
174
175 IT k = 0; // index to indx vector
176 IT i = 0; // index to columns of matrix
177 while(i< Adcsc.nzc && k < veclen)
178 {
179 if(Adcsc.jc[i] < indx[k]) ++i;
180 else if(indx[k] < Adcsc.jc[i]) ++k;
181 else
182 {
183 for(IT j=Adcsc.cp[i]; j < Adcsc.cp[i+1]; ++j) // for all nonzeros in this column
184 {
185 sHeap.insert(Adcsc.ir[j], numx[k]); // row_id, num
186 }
187 ++i;
188 ++k;
189 }
190 }
191
192 IT row;
193 IVT num;
194 if(sHeap.getSize() > 0)
195 {
196 sHeap.deleteMin(&row, &num);
197 indy.push_back( (int32_t) row);
198 numy.push_back( num );
199 }
200 while(sHeap.getSize() > 0)
201 {
202 sHeap.deleteMin(&row, &num);
203 if(indy.back() == row)
204 {
205 numy.back() = SR::add(numy.back(), num);
206 }
207 else
208 {
209 indy.push_back( (int32_t) row);
210 numy.push_back(num);
211 }
212 }
213 }
214
215
216 /**
217 * @param[in,out] indy,numy,cnts {preallocated arrays to be filled}
218 * @param[in] dspls {displacements to preallocated indy,numy buffers}
219 * This version determines the receiving column neighbor and adjust the indices to the receiver's local index
220 * If IVT and OVT are different, then OVT should allow implicit conversion from IVT
221 **/
222 template <typename SR, typename IT, typename IVT, class OVT>
SpMXSpV(const Dcsc<IT,bool> & Adcsc,int32_t mA,const int32_t * indx,const IVT * numx,int32_t veclen,int32_t * indy,OVT * numy,int * cnts,int * dspls,int p_c)223 void SpImpl<SR,IT,bool,IVT,OVT>::SpMXSpV(const Dcsc<IT,bool> & Adcsc, int32_t mA, const int32_t * indx, const IVT * numx, int32_t veclen,
224 int32_t * indy, OVT * numy, int * cnts, int * dspls, int p_c)
225 {
226 OVT * localy = new OVT[mA];
227 BitMap isthere(mA);
228 std::vector< std::vector<int32_t> > nzinds(p_c); // nonzero indices
229
230 int32_t perproc = mA / p_c;
231 int32_t k = 0; // index to indx vector
232 IT i = 0; // index to columns of matrix
233 while(i< Adcsc.nzc && k < veclen)
234 {
235 if(Adcsc.jc[i] < indx[k]) ++i;
236 else if(indx[k] < Adcsc.jc[i]) ++k;
237 else
238 {
239 for(IT j=Adcsc.cp[i]; j < Adcsc.cp[i+1]; ++j) // for all nonzeros in this column
240 {
241 int32_t rowid = (int32_t) Adcsc.ir[j];
242 if(!isthere.get_bit(rowid))
243 {
244 int32_t owner = std::min(rowid / perproc, static_cast<int32_t>(p_c-1));
245 localy[rowid] = numx[k]; // initial assignment, requires implicit conversion if IVT != OVT
246 nzinds[owner].push_back(rowid);
247 isthere.set_bit(rowid);
248 }
249 else
250 {
251 localy[rowid] = SR::add(localy[rowid], numx[k]);
252 }
253 }
254 ++i;
255 ++k;
256 }
257 }
258
259 for(int p = 0; p< p_c; ++p)
260 {
261 sort(nzinds[p].begin(), nzinds[p].end());
262 cnts[p] = nzinds[p].size();
263 int32_t * locnzinds = &nzinds[p][0];
264 int32_t offset = perproc * p;
265 for(int i=0; i< cnts[p]; ++i)
266 {
267 indy[dspls[p]+i] = locnzinds[i] - offset; // convert to local offset
268 numy[dspls[p]+i] = localy[locnzinds[i]];
269 }
270 }
271 delete [] localy;
272 }
273
274
275 // this version is still very good with splitters
276 template <typename SR, typename IT, typename IVT, typename OVT>
SpMXSpV_ForThreading(const Dcsc<IT,bool> & Adcsc,int32_t mA,const int32_t * indx,const IVT * numx,int32_t veclen,std::vector<int32_t> & indy,std::vector<OVT> & numy,int32_t offset)277 void SpImpl<SR,IT,bool,IVT,OVT>::SpMXSpV_ForThreading(const Dcsc<IT,bool> & Adcsc, int32_t mA, const int32_t * indx, const IVT * numx, int32_t veclen,
278 std::vector<int32_t> & indy, std::vector<OVT> & numy, int32_t offset)
279 {
280 std::vector<OVT> localy(mA);
281 BitMap isthere(mA);
282 std::vector<uint32_t> nzinds; // nonzero indices
283
284 SpMXSpV_ForThreading(Adcsc, mA, indx, numx, veclen, indy, numy, offset, localy, isthere, nzinds);
285 }
286
287
288
289 //! We can safely use a SPA here because Adcsc is short (::RowSplit() has already been called on it)
290 template <typename SR, typename IT, typename IVT, typename OVT>
SpMXSpV_ForThreading(const Dcsc<IT,bool> & Adcsc,int32_t mA,const int32_t * indx,const IVT * numx,int32_t veclen,std::vector<int32_t> & indy,std::vector<OVT> & numy,int32_t offset,std::vector<OVT> & localy,BitMap & isthere,std::vector<uint32_t> & nzinds)291 void SpImpl<SR,IT,bool,IVT,OVT>::SpMXSpV_ForThreading(const Dcsc<IT,bool> & Adcsc, int32_t mA, const int32_t * indx, const IVT * numx, int32_t veclen, std::vector<int32_t> & indy, std::vector<OVT> & numy, int32_t offset, std::vector<OVT> & localy, BitMap & isthere, std::vector<uint32_t> & nzinds)
292 {
293 // The following piece of code is not general, but it's more memory efficient than FillColInds
294 int32_t k = 0; // index to indx vector
295 IT i = 0; // index to columns of matrix
296 while(i< Adcsc.nzc && k < veclen)
297 {
298 if(Adcsc.jc[i] < indx[k]) ++i;
299 else if(indx[k] < Adcsc.jc[i]) ++k;
300 else
301 {
302 for(IT j=Adcsc.cp[i]; j < Adcsc.cp[i+1]; ++j) // for all nonzeros in this column
303 {
304 uint32_t rowid = (uint32_t) Adcsc.ir[j];
305 if(!isthere.get_bit(rowid))
306 {
307 localy[rowid] = numx[k]; // initial assignment
308 nzinds.push_back(rowid);
309 isthere.set_bit(rowid);
310 }
311 else
312 {
313 localy[rowid] = SR::add(localy[rowid], numx[k]);
314 }
315 }
316 ++i; ++k;
317 }
318 }
319 int nnzy = nzinds.size();
320 integerSort(nzinds.data(), nnzy);
321 indy.resize(nnzy);
322 numy.resize(nnzy);
323 for(int i=0; i< nnzy; ++i)
324 {
325 indy[i] = nzinds[i] + offset; // return column-global index and let gespmv determine the receiver's local index
326 numy[i] = localy[nzinds[i]];
327 }
328 }
329
330
331
332
333
334 /**
335 * SpMXSpV with HeapSort
336 * Simply insert entries from columns corresponsing to nonzeros of the input vector into a minHeap
337 * Then extract entries from the minHeap
338 * Complexity: O(flops*log(flops))
339 * offset is the offset of indices in the matrix in case the matrix is split
340 * This version is likely to be more memory efficient than the other one (the one that uses preallocated memory buffers)
341 * Because here we don't use a dense accumulation vector but a heap. It will probably be slower though.
342 **/
343
344 template <typename SR, typename IT, typename NT, typename IVT, typename OVT>
SpMXSpV_HeapSort(const Csc<IT,NT> & Acsc,int32_t mA,const int32_t * indx,const IVT * numx,int32_t veclen,std::vector<int32_t> & indy,std::vector<OVT> & numy,int32_t offset)345 void SpMXSpV_HeapSort(const Csc<IT,NT> & Acsc, int32_t mA, const int32_t * indx, const IVT * numx, int32_t veclen, std::vector<int32_t> & indy, std::vector<OVT> & numy, int32_t offset)
346 {
347 IT inf = std::numeric_limits<IT>::min();
348 IT sup = std::numeric_limits<IT>::max();
349 KNHeap< IT, OVT > sHeap(sup, inf);
350
351
352 for (int32_t k = 0; k < veclen; ++k)
353 {
354 IT colid = indx[k];
355 for(IT j=Acsc.jc[colid]; j < Acsc.jc[colid+1]; ++j)
356 {
357 OVT val = SR::multiply( Acsc.num[j], numx[k]);
358 sHeap.insert(Acsc.ir[j], val);
359 }
360 }
361
362 IT row;
363 OVT num;
364 if(sHeap.getSize() > 0)
365 {
366 sHeap.deleteMin(&row, &num);
367 row += offset;
368 indy.push_back( (int32_t) row);
369 numy.push_back( num );
370 }
371 while(sHeap.getSize() > 0)
372 {
373 sHeap.deleteMin(&row, &num);
374 row += offset;
375 if(indy.back() == row)
376 {
377 numy.back() = SR::add(numy.back(), num);
378 }
379 else
380 {
381 indy.push_back( (int32_t) row);
382 numy.push_back(num);
383 }
384 }
385 }
386
387
388
389 template <typename SR, typename IT, typename NT, typename IVT, typename OVT>
SpMXSpV_Bucket(const Csc<IT,NT> & Acsc,int32_t mA,const int32_t * indx,const IVT * numx,int32_t veclen,std::vector<int32_t> & indy,std::vector<OVT> & numy,PreAllocatedSPA<OVT> & SPA)390 void SpMXSpV_Bucket(const Csc<IT,NT> & Acsc, int32_t mA, const int32_t * indx, const IVT * numx, int32_t veclen,
391 std::vector<int32_t> & indy, std::vector< OVT > & numy, PreAllocatedSPA<OVT> & SPA)
392 {
393 if(veclen==0)
394 return;
395
396 double tstart = MPI_Wtime();
397 int nthreads=1;
398 int rowSplits = SPA.buckets; // SPA must be initialized as checked in SpImpl.h
399 #ifdef _OPENMP
400 #pragma omp parallel
401 {
402 nthreads = omp_get_num_threads();
403 }
404 #endif
405 if(rowSplits < nthreads)
406 {
407 std::ostringstream outs;
408 outs << "Warning in SpMXSpV_Bucket: " << rowSplits << " buckets are supplied for " << nthreads << " threads\n";
409 outs << "4 times the number of threads are recommended when creating PreAllocatedSPA\n";
410 SpParHelper::Print(outs.str());
411 }
412
413 int32_t rowPerSplit = mA / rowSplits;
414
415
416 //------------------------------------------------------
417 // Step1: count the nnz in each rowsplit of the matrix,
418 // because we don't want to waste memory
419 // False sharing is not a big problem because it is written outside of the main loop
420 //------------------------------------------------------
421
422 std::vector<std::vector<int32_t>> bSize(rowSplits, std::vector<int32_t> ( rowSplits, 0));
423 std::vector<std::vector<int32_t>> bOffset(rowSplits, std::vector<int32_t> ( rowSplits, 0));
424 std::vector<int32_t> sendSize(rowSplits);
425 double t0, t1, t2, t3, t4;
426 #ifdef BENCHMARK_SPMSPV
427 t0 = MPI_Wtime();
428 #endif
429
430 #ifdef _OPENMP
431 #pragma omp parallel for schedule(dynamic, 1)
432 #endif
433 for(int b=0; b<rowSplits; b++)
434 {
435 // evenly balance nnz of x among threads
436 int perBucket = veclen/rowSplits;
437 int spill = veclen%rowSplits;
438 int32_t xstart = b*perBucket + std::min(spill, b);
439 int32_t xend = (b+1)*perBucket + std::min(spill, b+1);
440 std::vector<int32_t> temp(rowSplits,0);
441 for (int32_t i = xstart; i < xend; ++i)
442 {
443 IT colid = indx[i];
444 for(IT j=Acsc.jc[colid]; j < Acsc.jc[colid+1]; ++j)
445 {
446 uint32_t rowid = (uint32_t) Acsc.ir[j];
447 int32_t splitId = rowSplits-1;
448 if(rowPerSplit!=0) splitId = (rowid/rowPerSplit > rowSplits-1) ? rowSplits-1 : rowid/rowPerSplit;
449 //bSize[b][splitId]++;
450 temp[splitId]++;
451 }
452 }
453 int32_t totSend = 0;
454 for(int k=0; k<rowSplits; k++)
455 {
456 bSize[b][k] = temp[k];
457 totSend += temp[k];
458 }
459 sendSize[b] = totSend;
460 }
461
462
463 #ifdef BENCHMARK_SPMSPV
464 t1 = MPI_Wtime() - t0;
465 t0 = MPI_Wtime();
466 #endif
467
468
469
470 // keep it sequential to avoid fault sharing
471 for(int i=1; i<rowSplits; i++)
472 {
473 for(int j=0; j<rowSplits; j++)
474 {
475 bOffset[i][j] = bOffset[i-1][j] + bSize[i-1][j];
476 bSize[i-1][j] = 0;
477 }
478 }
479
480 std::vector<uint32_t> disp(rowSplits+1);
481 int maxBucketSize = -1; // maximum size of a bucket
482 disp[0] = 0;
483 for(int j=0; j<rowSplits; j++)
484 {
485 int thisBucketSize = bOffset[rowSplits-1][j] + bSize[rowSplits-1][j];
486 disp[j+1] = disp[j] + thisBucketSize;
487 bSize[rowSplits-1][j] = 0;
488 maxBucketSize = std::max(thisBucketSize, maxBucketSize);
489 }
490
491
492
493 #ifdef BENCHMARK_SPMSPV
494 double tseq = MPI_Wtime() - t0;
495 #endif
496 //------------------------------------------------------
497 // Step2: The matrix is traversed column by column and
498 // nonzeros each rowsplit of the matrix are compiled together
499 //------------------------------------------------------
500 // Thread private buckets should fit in L2 cache
501 #ifndef L2_CACHE_SIZE
502 #define L2_CACHE_SIZE 256000
503 #endif
504 int THREAD_BUF_LEN = 256;
505 int itemsize = sizeof(int32_t) + sizeof(OVT);
506 while(true)
507 {
508 int bufferMem = THREAD_BUF_LEN * rowSplits * itemsize + 8 * rowSplits;
509 if(bufferMem>L2_CACHE_SIZE ) THREAD_BUF_LEN/=2;
510 else break;
511 }
512 THREAD_BUF_LEN = std::min(maxBucketSize+1,THREAD_BUF_LEN);
513
514 #ifdef BENCHMARK_SPMSPV
515 t0 = MPI_Wtime();
516 #endif
517
518 #ifdef _OPENMP
519 #pragma omp parallel
520 #endif
521 {
522 int32_t* tIndSplitA = new int32_t[rowSplits*THREAD_BUF_LEN];
523 OVT* tNumSplitA = new OVT[rowSplits*THREAD_BUF_LEN];
524 std::vector<int32_t> tBucketSize(rowSplits);
525 std::vector<int32_t> tOffset(rowSplits);
526 #ifdef _OPENMP
527 #pragma omp for schedule(dynamic,1)
528 #endif
529 for(int b=0; b<rowSplits; b++)
530 {
531
532 std::fill(tBucketSize.begin(), tBucketSize.end(), 0);
533 std::fill(tOffset.begin(), tOffset.end(), 0);
534 int perBucket = veclen/rowSplits;
535 int spill = veclen%rowSplits;
536 int32_t xstart = b*perBucket + std::min(spill, b);
537 int32_t xend = (b+1)*perBucket + std::min(spill, b+1);
538
539 for (int32_t i = xstart; i < xend; ++i)
540 {
541 IT colid = indx[i];
542 for(IT j=Acsc.jc[colid]; j < Acsc.jc[colid+1]; ++j)
543 {
544 OVT val = SR::multiply( Acsc.num[j], numx[i]);
545 uint32_t rowid = (uint32_t) Acsc.ir[j];
546 int32_t splitId = rowSplits-1;
547 if(rowPerSplit!=0) splitId = (rowid/rowPerSplit > rowSplits-1) ? rowSplits-1 : rowid/rowPerSplit;
548 if (tBucketSize[splitId] < THREAD_BUF_LEN)
549 {
550 tIndSplitA[splitId*THREAD_BUF_LEN + tBucketSize[splitId]] = rowid;
551 tNumSplitA[splitId*THREAD_BUF_LEN + tBucketSize[splitId]++] = val;
552 }
553 else
554 {
555 std::copy(tIndSplitA + splitId*THREAD_BUF_LEN, tIndSplitA + (splitId+1)*THREAD_BUF_LEN, &SPA.indSplitA[disp[splitId] + bOffset[b][splitId]] + tOffset[splitId]);
556 std::copy(tNumSplitA + splitId*THREAD_BUF_LEN, tNumSplitA + (splitId+1)*THREAD_BUF_LEN, &SPA.numSplitA[disp[splitId] + bOffset[b][splitId]] + tOffset[splitId]);
557 tIndSplitA[splitId*THREAD_BUF_LEN] = rowid;
558 tNumSplitA[splitId*THREAD_BUF_LEN] = val;
559 tOffset[splitId] += THREAD_BUF_LEN ;
560 tBucketSize[splitId] = 1;
561 }
562 }
563 }
564
565 for(int splitId=0; splitId<rowSplits; ++splitId)
566 {
567 if(tBucketSize[splitId]>0)
568 {
569 std::copy(tIndSplitA + splitId*THREAD_BUF_LEN, tIndSplitA + splitId*THREAD_BUF_LEN + tBucketSize[splitId], &SPA.indSplitA[disp[splitId] + bOffset[b][splitId]] + tOffset[splitId]);
570 std::copy(tNumSplitA + splitId*THREAD_BUF_LEN, tNumSplitA + splitId*THREAD_BUF_LEN + tBucketSize[splitId], &SPA.numSplitA[disp[splitId] + bOffset[b][splitId]] + tOffset[splitId]);
571 }
572 }
573 }
574 delete [] tIndSplitA;
575 delete [] tNumSplitA;
576 }
577
578 #ifdef BENCHMARK_SPMSPV
579 t2 = MPI_Wtime() - t0;
580 t0 = MPI_Wtime();
581 #endif
582 std::vector<uint32_t> nzInRowSplits(rowSplits);
583 uint32_t* nzinds = new uint32_t[disp[rowSplits]];
584
585 #ifdef _OPENMP
586 #pragma omp parallel for schedule(dynamic,1)
587 #endif
588 for(int rs=0; rs<rowSplits; ++rs)
589 {
590
591 for(int i=disp[rs]; i<disp[rs+1] ; i++)
592 {
593 int32_t lrowid = SPA.indSplitA[i] - rs * rowPerSplit;
594 SPA.V_isthereBool[rs][lrowid] = false;
595 }
596 uint32_t tMergeDisp = disp[rs];
597 for(int i=disp[rs]; i<disp[rs+1] ; i++)
598 {
599 int32_t rowid = SPA.indSplitA[i];
600 int32_t lrowid = rowid - rs * rowPerSplit;
601 if(!SPA.V_isthereBool[rs][lrowid])// there is no conflict across threads
602 {
603 SPA.V_localy[0][rowid] = SPA.numSplitA[i];
604 nzinds[tMergeDisp++] = rowid;
605 SPA.V_isthereBool[rs][lrowid]=true;
606 }
607 else
608 {
609 SPA.V_localy[0][rowid] = SR::add(SPA.V_localy[0][rowid], SPA.numSplitA[i]);
610 }
611 }
612
613 integerSort(nzinds + disp[rs], tMergeDisp - disp[rs]);
614 nzInRowSplits[rs] = tMergeDisp - disp[rs];
615
616 }
617
618 #ifdef BENCHMARK_SPMSPV
619 t3 = MPI_Wtime() - t0;
620 #endif
621 // prefix sum
622 std::vector<uint32_t> dispRowSplits(rowSplits+1);
623 dispRowSplits[0] = 0;
624 for(int i=0; i<rowSplits; i++)
625 {
626 dispRowSplits[i+1] = dispRowSplits[i] + nzInRowSplits[i];
627 }
628
629 #ifdef BENCHMARK_SPMSPV
630 t0 = MPI_Wtime();
631 #endif
632 int nnzy = dispRowSplits[rowSplits];
633 indy.resize(nnzy);
634 numy.resize(nnzy);
635 #ifdef BENCHMARK_SPMSPV
636 tseq = MPI_Wtime() - t0;
637 t0 = MPI_Wtime();
638 #endif
639
640 int maxNnzInSplit = *std::max_element(nzInRowSplits.begin(),nzInRowSplits.end());
641 THREAD_BUF_LEN = std::min(maxNnzInSplit+1,256);
642 #ifdef _OPENMP
643 #pragma omp parallel
644 #endif
645 {
646 OVT* tnumy = new OVT [THREAD_BUF_LEN];
647 int32_t* tindy = new int32_t [THREAD_BUF_LEN];
648 int curSize, tdisp;
649 #ifdef _OPENMP
650 #pragma omp for schedule(dynamic,1)
651 #endif
652 for(int rs=0; rs<rowSplits; rs++)
653 {
654 curSize = 0;
655 tdisp = 0;
656 uint32_t * thisind = nzinds + disp[rs];
657 std::copy(nzinds+disp[rs], nzinds+disp[rs]+nzInRowSplits[rs], indy.begin()+dispRowSplits[rs]);
658 for(int j=0; j<nzInRowSplits[rs]; j++)
659 {
660
661 if ( curSize < THREAD_BUF_LEN)
662 {
663 tnumy[curSize++] = SPA.V_localy[0][thisind[j]];
664 }
665 else
666 {
667 std::copy(tnumy, tnumy+curSize, numy.begin()+dispRowSplits[rs]+tdisp);
668 tdisp += curSize;
669 tnumy[0] = SPA.V_localy[0][thisind[j]];
670 curSize = 1;
671 }
672 }
673 if ( curSize > 0)
674 {
675 std::copy(tnumy, tnumy+curSize, numy.begin()+dispRowSplits[rs]+tdisp);
676 }
677 }
678 delete [] tnumy;
679 delete [] tindy;
680 }
681
682
683 #ifdef BENCHMARK_SPMSPV
684 t4 = MPI_Wtime() - t0;
685 #endif
686
687 delete[] nzinds;
688
689
690
691 #ifdef BENCHMARK_SPMSPV
692 double tall = MPI_Wtime() - tstart;
693 std::ostringstream outs1;
694 outs1 << "Time breakdown of SpMSpV-bucket." << std::endl;
695 outs1 << "Estimate buckets: "<< t1 << " Bucketing: " << t2 << " SPA-merge: " << t3 << " Output: " << t4 << " Total: "<< tall << std::endl;
696 SpParHelper::Print(outs1.str());
697 #endif
698
699 }
700
701 }
702