1 #include "CombBLAS.h"
2
3 namespace combblas {
4
5
6 /***************************************************************************
7 * Find indices of column splitters in a list of tuple in parallel.
8 * Inputs:
9 * tuples: an array of SpTuples each tuple is (rowid, colid, val)
10 * nsplits: number of splits requested
11 * Output:
12 * splitters: An array of size (nsplits+1) storing the starts and ends of split tuples.
13 * different type used for output since we might need int or IT
14 ***************************************************************************/
15
16 template <typename RT, typename IT, typename NT>
findColSplitters(SpTuples<IT,NT> * & spTuples,int nsplits)17 std::vector<RT> findColSplitters(SpTuples<IT,NT> * & spTuples, int nsplits)
18 {
19 std::vector<RT> splitters(nsplits+1);
20 splitters[0] = static_cast<RT>(0);
21 ColLexiCompare<IT,NT> comp;
22 #ifdef THREADED
23 #pragma omp parallel for
24 #endif
25 for(int i=1; i< nsplits; i++)
26 {
27 IT cur_col = i * (spTuples->getncol()/nsplits);
28 std::tuple<IT,IT,NT> search_tuple(0, cur_col, 0);
29 std::tuple<IT,IT,NT>* it = std::lower_bound (spTuples->tuples, spTuples->tuples + spTuples->getnnz(), search_tuple, comp);
30 splitters[i] = (RT) (it - spTuples->tuples);
31 }
32 splitters[nsplits] = spTuples->getnnz();
33
34 return splitters;
35 }
36
37
38 // Symbolic serial merge : only estimates nnz
39 template<class IT, class NT>
SerialMergeNNZ(const std::vector<SpTuples<IT,NT> * > & ArrSpTups)40 IT SerialMergeNNZ( const std::vector<SpTuples<IT,NT> *> & ArrSpTups)
41 {
42 int nlists = ArrSpTups.size();
43 ColLexiCompare<IT,int> heapcomp;
44 std::vector<std::tuple<IT, IT, int>> heap(nlists);
45 std::vector<IT> curptr(nlists, static_cast<IT>(0));
46 IT hsize = 0;
47 for(int i=0; i< nlists; ++i)
48 {
49 if(ArrSpTups[i]->getnnz()>0)
50 {
51 heap[hsize++] = std::make_tuple(std::get<0>(ArrSpTups[i]->tuples[0]), std::get<1>(ArrSpTups[i]->tuples[0]), i);
52 }
53
54 }
55 std::make_heap(heap.data(), heap.data()+hsize, std::not2(heapcomp));
56
57 std::tuple<IT, IT, NT> curTuple;
58 IT estnnz = 0;
59 while(hsize > 0)
60 {
61 std::pop_heap(heap.data(), heap.data() + hsize, std::not2(heapcomp)); // result is stored in heap[hsize-1]
62 int source = std::get<2>(heap[hsize-1]);
63 if( (estnnz ==0) || (std::get<0>(curTuple) != std::get<0>(heap[hsize-1])) || (std::get<1>(curTuple) != std::get<1>(heap[hsize-1])))
64 {
65 curTuple = ArrSpTups[source]->tuples[curptr[source]];
66 estnnz++;
67 }
68 curptr[source]++;
69 if(curptr[source] != ArrSpTups[source]->getnnz()) // That array has not been depleted
70 {
71 heap[hsize-1] = std::make_tuple(std::get<0>(ArrSpTups[source]->tuples[curptr[source]]),
72 std::get<1>(ArrSpTups[source]->tuples[curptr[source]]), source);
73 std::push_heap(heap.data(), heap.data()+hsize, std::not2(heapcomp));
74 }
75 else
76 {
77 --hsize;
78 }
79 }
80 return estnnz;
81 }
82
83
84 /*
85 "Internal function" called by MultiwayMerge inside threaded region.
86 The merged list is stored in a preallocated buffer ntuples
87 Never called from outside.
88 Assumption1: the input lists are already column sorted
89 Assumption2: at least two lists are passed to this function
90 Assumption3: the input and output lists are to be deleted by caller
91 */
92
93 template<class SR, class IT, class NT>
SerialMerge(const std::vector<SpTuples<IT,NT> * > & ArrSpTups,std::tuple<IT,IT,NT> * ntuples)94 void SerialMerge( const std::vector<SpTuples<IT,NT> *> & ArrSpTups, std::tuple<IT, IT, NT> * ntuples)
95 {
96 int nlists = ArrSpTups.size();
97 ColLexiCompare<IT,int> heapcomp;
98 std::vector<std::tuple<IT, IT, int>> heap(nlists); // if performance issue, create this outside of threaded region
99 std::vector<IT> curptr(nlists, static_cast<IT>(0));
100 IT estnnz = 0;
101 IT hsize = 0;
102 for(int i=0; i< nlists; ++i)
103 {
104 if(ArrSpTups[i]->getnnz()>0)
105 {
106 estnnz += ArrSpTups[i]->getnnz();
107 heap[hsize++] = std::make_tuple(std::get<0>(ArrSpTups[i]->tuples[0]), std::get<1>(ArrSpTups[i]->tuples[0]), i);
108 }
109
110 }
111 std::make_heap(heap.data(), heap.data()+hsize, std::not2(heapcomp));
112 IT cnz = 0;
113
114 while(hsize > 0)
115 {
116 std::pop_heap(heap.data(), heap.data() + hsize, std::not2(heapcomp)); // result is stored in heap[hsize-1]
117 int source = std::get<2>(heap[hsize-1]);
118
119 if( (cnz != 0) &&
120 ((std::get<0>(ntuples[cnz-1]) == std::get<0>(heap[hsize-1])) && (std::get<1>(ntuples[cnz-1]) == std::get<1>(heap[hsize-1]))) )
121 {
122 std::get<2>(ntuples[cnz-1]) = SR::add(std::get<2>(ntuples[cnz-1]), ArrSpTups[source]->numvalue(curptr[source]++));
123 }
124 else
125 {
126 ntuples[cnz++] = ArrSpTups[source]->tuples[curptr[source]++];
127 }
128
129 if(curptr[source] != ArrSpTups[source]->getnnz()) // That array has not been depleted
130 {
131 heap[hsize-1] = std::make_tuple(std::get<0>(ArrSpTups[source]->tuples[curptr[source]]),
132 std::get<1>(ArrSpTups[source]->tuples[curptr[source]]), source);
133 std::push_heap(heap.data(), heap.data()+hsize, std::not2(heapcomp));
134 }
135 else
136 {
137 --hsize;
138 }
139 }
140 }
141
142
143
144 // Performs a balanced merge of the array of SpTuples
145 // Assumes the input parameters are already column sorted
146 template<class SR, class IT, class NT>
147 SpTuples<IT, NT>* MultiwayMerge( std::vector<SpTuples<IT,NT> *> & ArrSpTups, IT mdim = 0, IT ndim = 0, bool delarrs = false )
148 {
149
150 int nlists = ArrSpTups.size();
151 if(nlists == 0)
152 {
153 return new SpTuples<IT,NT>(0, mdim, ndim); //empty mxn SpTuples
154 }
155 if(nlists == 1)
156 {
157 if(delarrs) // steal data from input, and don't delete input
158 {
159 return ArrSpTups[0];
160 }
161 else // copy input to output
162 {
163 std::tuple<IT, IT, NT>* mergeTups = static_cast<std::tuple<IT, IT, NT>*>
164 (::operator new (sizeof(std::tuple<IT, IT, NT>[ArrSpTups[0]->getnnz()])));
165 #ifdef THREADED
166 #pragma omp parallel for
167 #endif
168 for(int i=0; i<ArrSpTups[0]->getnnz(); i++)
169 mergeTups[i] = ArrSpTups[0]->tuples[i];
170
171 return new SpTuples<IT,NT> (ArrSpTups[0]->getnnz(), mdim, ndim, mergeTups, true);
172 }
173 }
174
175 // ---- check correctness of input dimensions ------
176 for(int i=0; i< nlists; ++i)
177 {
178 if((mdim != ArrSpTups[i]->getnrow()) || ndim != ArrSpTups[i]->getncol())
179 {
180 std::cerr << "Dimensions of SpTuples do not match on multiwayMerge()" << std::endl;
181 return new SpTuples<IT,NT>(0,0,0);
182 }
183 }
184
185 int nthreads = 1;
186 #ifdef THREADED
187 #pragma omp parallel
188 {
189 nthreads = omp_get_num_threads();
190 }
191 #endif
192 int nsplits = 4*nthreads; // oversplit for load balance
193 nsplits = std::min(nsplits, (int)ndim); // we cannot split a column
194 std::vector< std::vector<IT> > colPtrs;
195 for(int i=0; i< nlists; i++)
196 {
197 colPtrs.push_back(findColSplitters<IT>(ArrSpTups[i], nsplits)); // in parallel
198 }
199
200 std::vector<IT> mergedNnzPerSplit(nsplits);
201 std::vector<IT> inputNnzPerSplit(nsplits);
202 // ------ estimate memory requirement after merge in each split ------
203 #ifdef THREADED
204 #pragma omp parallel for schedule(dynamic)
205 #endif
206 for(int i=0; i< nsplits; i++) // for each part
207 {
208 std::vector<SpTuples<IT,NT> *> listSplitTups(nlists);
209 IT t = static_cast<IT>(0);
210 for(int j=0; j< nlists; ++j)
211 {
212 IT curnnz= colPtrs[j][i+1] - colPtrs[j][i];
213 listSplitTups[j] = new SpTuples<IT, NT> (curnnz, mdim, ndim, ArrSpTups[j]->tuples + colPtrs[j][i], true);
214 t += colPtrs[j][i+1] - colPtrs[j][i];
215 }
216 mergedNnzPerSplit[i] = SerialMergeNNZ(listSplitTups);
217 inputNnzPerSplit[i] = t;
218 }
219
220 std::vector<IT> mdisp(nsplits+1,0);
221 for(int i=0; i<nsplits; ++i)
222 mdisp[i+1] = mdisp[i] + mergedNnzPerSplit[i];
223 IT mergedNnzAll = mdisp[nsplits];
224
225
226 #ifdef COMBBLAS_DEBUG
227 IT inputNnzAll = std::accumulate(inputNnzPerSplit.begin(), inputNnzPerSplit.end(), static_cast<IT>(0));
228 double ratio = inputNnzAll / (double) mergedNnzAll;
229 std::ostringstream outs;
230 outs << "Multiwaymerge: inputNnz/mergedNnz = " << ratio << std::endl;
231 SpParHelper::Print(outs.str());
232 #endif
233
234
235 // ------ allocate memory outside of the parallel region ------
236 std::tuple<IT, IT, NT> * mergeBuf = static_cast<std::tuple<IT, IT, NT>*> (::operator new (sizeof(std::tuple<IT, IT, NT>[mergedNnzAll])));
237
238 // ------ perform merge in parallel ------
239 #ifdef THREADED
240 #pragma omp parallel for schedule(dynamic)
241 #endif
242 for(int i=0; i< nsplits; i++) // serially merge part by part
243 {
244 std::vector<SpTuples<IT,NT> *> listSplitTups(nlists);
245 for(int j=0; j< nlists; ++j)
246 {
247 IT curnnz= colPtrs[j][i+1] - colPtrs[j][i];
248 listSplitTups[j] = new SpTuples<IT, NT> (curnnz, mdim, ndim, ArrSpTups[j]->tuples + colPtrs[j][i], true);
249 }
250 SerialMerge<SR>(listSplitTups, mergeBuf + mdisp[i]);
251 }
252
253 for(int i=0; i< nlists; i++)
254 {
255 if(delarrs)
256 delete ArrSpTups[i]; // May be expensive for large local matrices
257 }
258 return new SpTuples<IT, NT> (mergedNnzAll, mdim, ndim, mergeBuf, true, true);
259 }
260
261 }
262