1 #ifndef _mtSpGEMM_h
2 #define _mtSpGEMM_h
3 
4 #include "CombBLAS.h"
5 
6 namespace combblas {
7 /*
8  Multithreaded prefix sum
9  Inputs:
10     in: an input array
11     size: the length of the input array "in"
12     nthreads: number of threads used to compute the prefix sum
13 
14  Output:
15     return an array of size "size+1"
16     the memory of the output array is allocated internallay
17 
18  Example:
19 
20     in = [2, 1, 3, 5]
21     out = [0, 2, 3, 6, 11]
22  */
23 template <typename T>
prefixsum(T * in,int size,int nthreads)24 T* prefixsum(T* in, int size, int nthreads)
25 {
26     std::vector<T> tsum(nthreads+1);
27     tsum[0] = 0;
28     T* out = new T[size+1];
29     out[0] = 0;
30     T* psum = &out[1];
31 #ifdef THREADED
32 #pragma omp parallel
33 #endif
34     {
35 		int ithread = 0;
36 	#ifdef THREADED
37         ithread = omp_get_thread_num();
38 	#endif
39 
40         T sum = 0;
41 #ifdef THREADED
42 #pragma omp for schedule(static)
43 #endif
44         for (int i=0; i<size; i++)
45         {
46             sum += in[i];
47             psum[i] = sum;
48         }
49 
50         tsum[ithread+1] = sum;
51 #ifdef THREADED
52 #pragma omp barrier
53 #endif
54         T offset = 0;
55         for(int i=0; i<(ithread+1); i++)
56         {
57             offset += tsum[i];
58         }
59 
60 #ifdef THREADED
61 #pragma omp for schedule(static)
62 #endif
63         for (int i=0; i<size; i++)
64         {
65             psum[i] += offset;
66         }
67 
68     }
69     return out;
70 }
71 
72 
73 
74 
75 // multithreaded HeapSpGEMM
76 template <typename SR, typename NTO, typename IT, typename NT1, typename NT2>
LocalSpGEMM(const SpDCCols<IT,NT1> & A,const SpDCCols<IT,NT2> & B,bool clearA,bool clearB)77 SpTuples<IT, NTO> * LocalSpGEMM
78 (const SpDCCols<IT, NT1> & A,
79  const SpDCCols<IT, NT2> & B,
80  bool clearA, bool clearB)
81 {
82     IT mdim = A.getnrow();
83     IT ndim = B.getncol();
84     IT nnzA = A.getnnz();
85     if(A.isZero() || B.isZero())
86     {
87         return new SpTuples<IT, NTO>(0, mdim, ndim);
88     }
89 
90 
91     Dcsc<IT,NT1>* Adcsc = A.GetDCSC();
92     Dcsc<IT,NT2>* Bdcsc = B.GetDCSC();
93     IT nA = A.getncol();
94     float cf  = static_cast<float>(nA+1) / static_cast<float>(Adcsc->nzc);
95     IT csize = static_cast<IT>(ceil(cf));   // chunk size
96     IT * aux;
97     Adcsc->ConstructAux(nA, aux);
98 
99 
100     int numThreads = 1;
101 #ifdef THREADED
102 #pragma omp parallel
103     {
104         numThreads = omp_get_num_threads();
105     }
106 #endif
107 
108     IT* colnnzC = estimateNNZ(A, B);
109     IT* colptrC = prefixsum<IT>(colnnzC, Bdcsc->nzc, numThreads);
110     delete [] colnnzC;
111     IT nnzc = colptrC[Bdcsc->nzc];
112     std::tuple<IT,IT,NTO> * tuplesC = static_cast<std::tuple<IT,IT,NTO> *> (::operator new (sizeof(std::tuple<IT,IT,NTO>[nnzc])));
113 
114 
115 
116 
117     // thread private space for heap and colinds
118     std::vector<std::vector< std::pair<IT,IT>>> colindsVec(numThreads);
119     std::vector<std::vector<HeapEntry<IT,NT1>>> globalheapVec(numThreads);
120 
121     for(int i=0; i<numThreads; i++) //inital allocation per thread, may be an overestimate, but does not require more memoty than inputs
122     {
123         colindsVec[i].resize(nnzA/numThreads);
124         globalheapVec[i].resize(nnzA/numThreads);
125     }
126 
127 
128 #ifdef THREADED
129 #pragma omp parallel for
130 #endif
131     for(int i=0; i < Bdcsc->nzc; ++i)
132     {
133         size_t nnzcolB = Bdcsc->cp[i+1] - Bdcsc->cp[i]; //nnz in the current column of B
134 		int myThread = 0;
135 	#ifdef THREADED
136         myThread = omp_get_thread_num();
137 	#endif
138         if(colindsVec[myThread].size() < nnzcolB) //resize thread private vectors if needed
139         {
140             colindsVec[myThread].resize(nnzcolB);
141             globalheapVec[myThread].resize(nnzcolB);
142         }
143 
144 
145         // colinds.first vector keeps indices to A.cp, i.e. it dereferences "colnums" vector (above),
146         // colinds.second vector keeps the end indices (i.e. it gives the index to the last valid element of A.cpnack)
147         Adcsc->FillColInds(Bdcsc->ir + Bdcsc->cp[i], nnzcolB, colindsVec[myThread], aux, csize);
148         std::pair<IT,IT> * colinds = colindsVec[myThread].data();
149         HeapEntry<IT,NT1> * wset = globalheapVec[myThread].data();
150         IT hsize = 0;
151 
152 
153         for(IT j = 0; (unsigned)j < nnzcolB; ++j)		// create the initial heap
154         {
155             if(colinds[j].first != colinds[j].second)	// current != end
156             {
157                 wset[hsize++] = HeapEntry< IT,NT1 > (Adcsc->ir[colinds[j].first], j, Adcsc->numx[colinds[j].first]);
158             }
159         }
160         std::make_heap(wset, wset+hsize);
161 
162         IT curptr = colptrC[i];
163         while(hsize > 0)
164         {
165             std::pop_heap(wset, wset + hsize);         // result is stored in wset[hsize-1]
166             IT locb = wset[hsize-1].runr;	// relative location of the nonzero in B's current column
167 
168             NTO mrhs = SR::multiply(wset[hsize-1].num, Bdcsc->numx[Bdcsc->cp[i]+locb]);
169             if (!SR::returnedSAID())
170             {
171                 if( (curptr > colptrC[i]) && std::get<0>(tuplesC[curptr-1]) == wset[hsize-1].key)
172                 {
173                     std::get<2>(tuplesC[curptr-1]) = SR::add(std::get<2>(tuplesC[curptr-1]), mrhs);
174                 }
175                 else
176                 {
177                     tuplesC[curptr++]= std::make_tuple(wset[hsize-1].key, Bdcsc->jc[i], mrhs) ;
178                 }
179 
180             }
181 
182             if( (++(colinds[locb].first)) != colinds[locb].second)	// current != end
183             {
184                 // runr stays the same !
185                 wset[hsize-1].key = Adcsc->ir[colinds[locb].first];
186                 wset[hsize-1].num = Adcsc->numx[colinds[locb].first];
187                 std::push_heap(wset, wset+hsize);
188             }
189             else
190             {
191                 --hsize;
192             }
193         }
194     }
195 
196     if(clearA)
197         delete const_cast<SpDCCols<IT, NT1> *>(&A);
198     if(clearB)
199         delete const_cast<SpDCCols<IT, NT2> *>(&B);
200 
201     delete [] colptrC;
202     delete [] aux;
203 
204     SpTuples<IT, NTO>* spTuplesC = new SpTuples<IT, NTO> (nnzc, mdim, ndim, tuplesC, true, true);
205     return spTuplesC;
206 
207 }
208 
209 // estimate space for result of SpGEMM
210 template <typename IT, typename NT1, typename NT2>
estimateNNZ(const SpDCCols<IT,NT1> & A,const SpDCCols<IT,NT2> & B)211 IT* estimateNNZ(const SpDCCols<IT, NT1> & A,const SpDCCols<IT, NT2> & B)
212 {
213     IT nnzA = A.getnnz();
214     if(A.isZero() || B.isZero())
215     {
216         return NULL;
217     }
218 
219     Dcsc<IT,NT1>* Adcsc = A.GetDCSC();
220     Dcsc<IT,NT2>* Bdcsc = B.GetDCSC();
221 
222     float cf  = static_cast<float>(A.getncol()+1) / static_cast<float>(Adcsc->nzc);
223     IT csize = static_cast<IT>(ceil(cf));   // chunk size
224     IT * aux;
225     Adcsc->ConstructAux(A.getncol(), aux);
226 
227 
228     int numThreads = 1;
229 #ifdef THREADED
230 #pragma omp parallel
231     {
232         numThreads = omp_get_num_threads();
233     }
234 #endif
235 
236 
237     IT* colnnzC = new IT[Bdcsc->nzc]; // nnz in every nonempty column of C
238 
239 #ifdef THREADED
240 #pragma omp parallel for
241 #endif
242     for(IT i=0; i< Bdcsc->nzc; ++i)
243     {
244         colnnzC[i] = 0;
245     }
246 
247     // thread private space for heap and colinds
248     std::vector<std::vector< std::pair<IT,IT>>> colindsVec(numThreads);
249     std::vector<std::vector<std::pair<IT,IT>>> globalheapVec(numThreads);
250 
251 
252     for(int i=0; i<numThreads; i++) //inital allocation per thread, may be an overestimate, but does not require more memoty than inputs
253     {
254         colindsVec[i].resize(nnzA/numThreads);
255         globalheapVec[i].resize(nnzA/numThreads);
256     }
257 
258 #ifdef THREADED
259 #pragma omp parallel for
260 #endif
261     for(int i=0; i < Bdcsc->nzc; ++i)
262     {
263         size_t nnzcolB = Bdcsc->cp[i+1] - Bdcsc->cp[i]; //nnz in the current column of B
264 		int myThread = 0;
265 #ifdef THREADED
266         myThread = omp_get_thread_num();
267 #endif
268         if(colindsVec[myThread].size() < nnzcolB) //resize thread private vectors if needed
269         {
270             colindsVec[myThread].resize(nnzcolB);
271             globalheapVec[myThread].resize(nnzcolB);
272         }
273 
274         // colinds.first vector keeps indices to A.cp, i.e. it dereferences "colnums" vector (above),
275         // colinds.second vector keeps the end indices (i.e. it gives the index to the last valid element of A.cpnack)
276         Adcsc->FillColInds(Bdcsc->ir + Bdcsc->cp[i], nnzcolB, colindsVec[myThread], aux, csize);
277         std::pair<IT,IT> * colinds = colindsVec[myThread].data();
278         std::pair<IT,IT> * curheap = globalheapVec[myThread].data();
279         IT hsize = 0;
280 
281         // create the initial heap
282         for(IT j = 0; (unsigned)j < nnzcolB; ++j)
283         {
284             if(colinds[j].first != colinds[j].second)
285             {
286                 curheap[hsize++] = std::make_pair(Adcsc->ir[colinds[j].first], j);
287             }
288         }
289         std::make_heap(curheap, curheap+hsize, std::greater<std::pair<IT,IT>>());
290 
291         IT prevRow=-1; // previously popped row from heap
292 
293         while(hsize > 0)
294         {
295           std::pop_heap(curheap, curheap + hsize, std::greater<std::pair<IT,IT>>()); // result is stored in wset[hsize-1]
296             IT locb = curheap[hsize-1].second;
297 
298             if( curheap[hsize-1].first != prevRow)
299             {
300                 prevRow = curheap[hsize-1].first;
301                 colnnzC[i] ++;
302             }
303 
304             if( (++(colinds[locb].first)) != colinds[locb].second)	// current != end
305             {
306                 curheap[hsize-1].first = Adcsc->ir[colinds[locb].first];
307                 std::push_heap(curheap, curheap+hsize, std::greater<std::pair<IT,IT>>());
308             }
309             else
310             {
311                 --hsize;
312             }
313         }
314     }
315 
316     delete [] aux;
317     return colnnzC;
318 }
319 
320 }
321 
322 #endif
323