1 #ifndef _mtSpGEMM_h
2 #define _mtSpGEMM_h
3
4 #include "CombBLAS.h"
5
6 namespace combblas {
7 /*
8 Multithreaded prefix sum
9 Inputs:
10 in: an input array
11 size: the length of the input array "in"
12 nthreads: number of threads used to compute the prefix sum
13
14 Output:
15 return an array of size "size+1"
16 the memory of the output array is allocated internallay
17
18 Example:
19
20 in = [2, 1, 3, 5]
21 out = [0, 2, 3, 6, 11]
22 */
23 template <typename T>
prefixsum(T * in,int size,int nthreads)24 T* prefixsum(T* in, int size, int nthreads)
25 {
26 std::vector<T> tsum(nthreads+1);
27 tsum[0] = 0;
28 T* out = new T[size+1];
29 out[0] = 0;
30 T* psum = &out[1];
31 #ifdef THREADED
32 #pragma omp parallel
33 #endif
34 {
35 int ithread = 0;
36 #ifdef THREADED
37 ithread = omp_get_thread_num();
38 #endif
39
40 T sum = 0;
41 #ifdef THREADED
42 #pragma omp for schedule(static)
43 #endif
44 for (int i=0; i<size; i++)
45 {
46 sum += in[i];
47 psum[i] = sum;
48 }
49
50 tsum[ithread+1] = sum;
51 #ifdef THREADED
52 #pragma omp barrier
53 #endif
54 T offset = 0;
55 for(int i=0; i<(ithread+1); i++)
56 {
57 offset += tsum[i];
58 }
59
60 #ifdef THREADED
61 #pragma omp for schedule(static)
62 #endif
63 for (int i=0; i<size; i++)
64 {
65 psum[i] += offset;
66 }
67
68 }
69 return out;
70 }
71
72
73
74
75 // multithreaded HeapSpGEMM
76 template <typename SR, typename NTO, typename IT, typename NT1, typename NT2>
LocalSpGEMM(const SpDCCols<IT,NT1> & A,const SpDCCols<IT,NT2> & B,bool clearA,bool clearB)77 SpTuples<IT, NTO> * LocalSpGEMM
78 (const SpDCCols<IT, NT1> & A,
79 const SpDCCols<IT, NT2> & B,
80 bool clearA, bool clearB)
81 {
82 IT mdim = A.getnrow();
83 IT ndim = B.getncol();
84 IT nnzA = A.getnnz();
85 if(A.isZero() || B.isZero())
86 {
87 return new SpTuples<IT, NTO>(0, mdim, ndim);
88 }
89
90
91 Dcsc<IT,NT1>* Adcsc = A.GetDCSC();
92 Dcsc<IT,NT2>* Bdcsc = B.GetDCSC();
93 IT nA = A.getncol();
94 float cf = static_cast<float>(nA+1) / static_cast<float>(Adcsc->nzc);
95 IT csize = static_cast<IT>(ceil(cf)); // chunk size
96 IT * aux;
97 Adcsc->ConstructAux(nA, aux);
98
99
100 int numThreads = 1;
101 #ifdef THREADED
102 #pragma omp parallel
103 {
104 numThreads = omp_get_num_threads();
105 }
106 #endif
107
108 IT* colnnzC = estimateNNZ(A, B);
109 IT* colptrC = prefixsum<IT>(colnnzC, Bdcsc->nzc, numThreads);
110 delete [] colnnzC;
111 IT nnzc = colptrC[Bdcsc->nzc];
112 std::tuple<IT,IT,NTO> * tuplesC = static_cast<std::tuple<IT,IT,NTO> *> (::operator new (sizeof(std::tuple<IT,IT,NTO>[nnzc])));
113
114
115
116
117 // thread private space for heap and colinds
118 std::vector<std::vector< std::pair<IT,IT>>> colindsVec(numThreads);
119 std::vector<std::vector<HeapEntry<IT,NT1>>> globalheapVec(numThreads);
120
121 for(int i=0; i<numThreads; i++) //inital allocation per thread, may be an overestimate, but does not require more memoty than inputs
122 {
123 colindsVec[i].resize(nnzA/numThreads);
124 globalheapVec[i].resize(nnzA/numThreads);
125 }
126
127
128 #ifdef THREADED
129 #pragma omp parallel for
130 #endif
131 for(int i=0; i < Bdcsc->nzc; ++i)
132 {
133 size_t nnzcolB = Bdcsc->cp[i+1] - Bdcsc->cp[i]; //nnz in the current column of B
134 int myThread = 0;
135 #ifdef THREADED
136 myThread = omp_get_thread_num();
137 #endif
138 if(colindsVec[myThread].size() < nnzcolB) //resize thread private vectors if needed
139 {
140 colindsVec[myThread].resize(nnzcolB);
141 globalheapVec[myThread].resize(nnzcolB);
142 }
143
144
145 // colinds.first vector keeps indices to A.cp, i.e. it dereferences "colnums" vector (above),
146 // colinds.second vector keeps the end indices (i.e. it gives the index to the last valid element of A.cpnack)
147 Adcsc->FillColInds(Bdcsc->ir + Bdcsc->cp[i], nnzcolB, colindsVec[myThread], aux, csize);
148 std::pair<IT,IT> * colinds = colindsVec[myThread].data();
149 HeapEntry<IT,NT1> * wset = globalheapVec[myThread].data();
150 IT hsize = 0;
151
152
153 for(IT j = 0; (unsigned)j < nnzcolB; ++j) // create the initial heap
154 {
155 if(colinds[j].first != colinds[j].second) // current != end
156 {
157 wset[hsize++] = HeapEntry< IT,NT1 > (Adcsc->ir[colinds[j].first], j, Adcsc->numx[colinds[j].first]);
158 }
159 }
160 std::make_heap(wset, wset+hsize);
161
162 IT curptr = colptrC[i];
163 while(hsize > 0)
164 {
165 std::pop_heap(wset, wset + hsize); // result is stored in wset[hsize-1]
166 IT locb = wset[hsize-1].runr; // relative location of the nonzero in B's current column
167
168 NTO mrhs = SR::multiply(wset[hsize-1].num, Bdcsc->numx[Bdcsc->cp[i]+locb]);
169 if (!SR::returnedSAID())
170 {
171 if( (curptr > colptrC[i]) && std::get<0>(tuplesC[curptr-1]) == wset[hsize-1].key)
172 {
173 std::get<2>(tuplesC[curptr-1]) = SR::add(std::get<2>(tuplesC[curptr-1]), mrhs);
174 }
175 else
176 {
177 tuplesC[curptr++]= std::make_tuple(wset[hsize-1].key, Bdcsc->jc[i], mrhs) ;
178 }
179
180 }
181
182 if( (++(colinds[locb].first)) != colinds[locb].second) // current != end
183 {
184 // runr stays the same !
185 wset[hsize-1].key = Adcsc->ir[colinds[locb].first];
186 wset[hsize-1].num = Adcsc->numx[colinds[locb].first];
187 std::push_heap(wset, wset+hsize);
188 }
189 else
190 {
191 --hsize;
192 }
193 }
194 }
195
196 if(clearA)
197 delete const_cast<SpDCCols<IT, NT1> *>(&A);
198 if(clearB)
199 delete const_cast<SpDCCols<IT, NT2> *>(&B);
200
201 delete [] colptrC;
202 delete [] aux;
203
204 SpTuples<IT, NTO>* spTuplesC = new SpTuples<IT, NTO> (nnzc, mdim, ndim, tuplesC, true, true);
205 return spTuplesC;
206
207 }
208
209 // estimate space for result of SpGEMM
210 template <typename IT, typename NT1, typename NT2>
estimateNNZ(const SpDCCols<IT,NT1> & A,const SpDCCols<IT,NT2> & B)211 IT* estimateNNZ(const SpDCCols<IT, NT1> & A,const SpDCCols<IT, NT2> & B)
212 {
213 IT nnzA = A.getnnz();
214 if(A.isZero() || B.isZero())
215 {
216 return NULL;
217 }
218
219 Dcsc<IT,NT1>* Adcsc = A.GetDCSC();
220 Dcsc<IT,NT2>* Bdcsc = B.GetDCSC();
221
222 float cf = static_cast<float>(A.getncol()+1) / static_cast<float>(Adcsc->nzc);
223 IT csize = static_cast<IT>(ceil(cf)); // chunk size
224 IT * aux;
225 Adcsc->ConstructAux(A.getncol(), aux);
226
227
228 int numThreads = 1;
229 #ifdef THREADED
230 #pragma omp parallel
231 {
232 numThreads = omp_get_num_threads();
233 }
234 #endif
235
236
237 IT* colnnzC = new IT[Bdcsc->nzc]; // nnz in every nonempty column of C
238
239 #ifdef THREADED
240 #pragma omp parallel for
241 #endif
242 for(IT i=0; i< Bdcsc->nzc; ++i)
243 {
244 colnnzC[i] = 0;
245 }
246
247 // thread private space for heap and colinds
248 std::vector<std::vector< std::pair<IT,IT>>> colindsVec(numThreads);
249 std::vector<std::vector<std::pair<IT,IT>>> globalheapVec(numThreads);
250
251
252 for(int i=0; i<numThreads; i++) //inital allocation per thread, may be an overestimate, but does not require more memoty than inputs
253 {
254 colindsVec[i].resize(nnzA/numThreads);
255 globalheapVec[i].resize(nnzA/numThreads);
256 }
257
258 #ifdef THREADED
259 #pragma omp parallel for
260 #endif
261 for(int i=0; i < Bdcsc->nzc; ++i)
262 {
263 size_t nnzcolB = Bdcsc->cp[i+1] - Bdcsc->cp[i]; //nnz in the current column of B
264 int myThread = 0;
265 #ifdef THREADED
266 myThread = omp_get_thread_num();
267 #endif
268 if(colindsVec[myThread].size() < nnzcolB) //resize thread private vectors if needed
269 {
270 colindsVec[myThread].resize(nnzcolB);
271 globalheapVec[myThread].resize(nnzcolB);
272 }
273
274 // colinds.first vector keeps indices to A.cp, i.e. it dereferences "colnums" vector (above),
275 // colinds.second vector keeps the end indices (i.e. it gives the index to the last valid element of A.cpnack)
276 Adcsc->FillColInds(Bdcsc->ir + Bdcsc->cp[i], nnzcolB, colindsVec[myThread], aux, csize);
277 std::pair<IT,IT> * colinds = colindsVec[myThread].data();
278 std::pair<IT,IT> * curheap = globalheapVec[myThread].data();
279 IT hsize = 0;
280
281 // create the initial heap
282 for(IT j = 0; (unsigned)j < nnzcolB; ++j)
283 {
284 if(colinds[j].first != colinds[j].second)
285 {
286 curheap[hsize++] = std::make_pair(Adcsc->ir[colinds[j].first], j);
287 }
288 }
289 std::make_heap(curheap, curheap+hsize, std::greater<std::pair<IT,IT>>());
290
291 IT prevRow=-1; // previously popped row from heap
292
293 while(hsize > 0)
294 {
295 std::pop_heap(curheap, curheap + hsize, std::greater<std::pair<IT,IT>>()); // result is stored in wset[hsize-1]
296 IT locb = curheap[hsize-1].second;
297
298 if( curheap[hsize-1].first != prevRow)
299 {
300 prevRow = curheap[hsize-1].first;
301 colnnzC[i] ++;
302 }
303
304 if( (++(colinds[locb].first)) != colinds[locb].second) // current != end
305 {
306 curheap[hsize-1].first = Adcsc->ir[colinds[locb].first];
307 std::push_heap(curheap, curheap+hsize, std::greater<std::pair<IT,IT>>());
308 }
309 else
310 {
311 --hsize;
312 }
313 }
314 }
315
316 delete [] aux;
317 return colnnzC;
318 }
319
320 }
321
322 #endif
323