1 #include "CombBLAS.h"
2 
3 namespace combblas {
4 
5 
6 /***************************************************************************
7  * Find indices of column splitters in a list of tuple in parallel.
8  * Inputs:
9  *      tuples: an array of SpTuples each tuple is (rowid, colid, val)
10  *      nsplits: number of splits requested
11  *  Output:
12  *      splitters: An array of size (nsplits+1) storing the starts and ends of split tuples.
13  *      different type used for output since we might need int or IT
14  ***************************************************************************/
15 
16 template <typename RT, typename IT, typename NT>
findColSplitters(SpTuples<IT,NT> * & spTuples,int nsplits)17 std::vector<RT> findColSplitters(SpTuples<IT,NT> * & spTuples, int nsplits)
18 {
19     std::vector<RT> splitters(nsplits+1);
20     splitters[0] = static_cast<RT>(0);
21     ColLexiCompare<IT,NT> comp;
22 #ifdef THREADED
23 #pragma omp parallel for
24 #endif
25     for(int i=1; i< nsplits; i++)
26     {
27         IT cur_col = i * (spTuples->getncol()/nsplits);
28         std::tuple<IT,IT,NT> search_tuple(0, cur_col, 0);
29         std::tuple<IT,IT,NT>* it = std::lower_bound (spTuples->tuples, spTuples->tuples + spTuples->getnnz(), search_tuple, comp);
30         splitters[i] = (RT) (it - spTuples->tuples);
31     }
32     splitters[nsplits] = spTuples->getnnz();
33 
34     return splitters;
35 }
36 
37 
38 // Symbolic serial merge : only estimates nnz
39 template<class IT, class NT>
SerialMergeNNZ(const std::vector<SpTuples<IT,NT> * > & ArrSpTups)40 IT SerialMergeNNZ( const std::vector<SpTuples<IT,NT> *> & ArrSpTups)
41 {
42     int nlists =  ArrSpTups.size();
43     ColLexiCompare<IT,int> heapcomp;
44     std::vector<std::tuple<IT, IT, int>> heap(nlists);
45     std::vector<IT> curptr(nlists, static_cast<IT>(0));
46     IT hsize = 0;
47     for(int i=0; i< nlists; ++i)
48     {
49         if(ArrSpTups[i]->getnnz()>0)
50         {
51             heap[hsize++] = std::make_tuple(std::get<0>(ArrSpTups[i]->tuples[0]), std::get<1>(ArrSpTups[i]->tuples[0]), i);
52         }
53 
54     }
55     std::make_heap(heap.data(), heap.data()+hsize, std::not2(heapcomp));
56 
57     std::tuple<IT, IT, NT> curTuple;
58     IT estnnz = 0;
59     while(hsize > 0)
60     {
61       std::pop_heap(heap.data(), heap.data() + hsize, std::not2(heapcomp));   // result is stored in heap[hsize-1]
62         int source = std::get<2>(heap[hsize-1]);
63         if( (estnnz ==0) || (std::get<0>(curTuple) != std::get<0>(heap[hsize-1])) || (std::get<1>(curTuple) != std::get<1>(heap[hsize-1])))
64         {
65             curTuple = ArrSpTups[source]->tuples[curptr[source]];
66             estnnz++;
67         }
68         curptr[source]++;
69         if(curptr[source] != ArrSpTups[source]->getnnz())	// That array has not been depleted
70         {
71             heap[hsize-1] = std::make_tuple(std::get<0>(ArrSpTups[source]->tuples[curptr[source]]),
72                                        std::get<1>(ArrSpTups[source]->tuples[curptr[source]]), source);
73             std::push_heap(heap.data(), heap.data()+hsize, std::not2(heapcomp));
74         }
75         else
76         {
77             --hsize;
78         }
79     }
80     return estnnz;
81 }
82 
83 
84 /*
85  "Internal function" called by MultiwayMerge inside threaded region.
86  The merged list is stored in a preallocated buffer ntuples
87  Never called from outside.
88  Assumption1: the input lists are already column sorted
89  Assumption2: at least two lists are passed to this function
90  Assumption3: the input and output lists are to be deleted by caller
91  */
92 
93 template<class SR, class IT, class NT>
SerialMerge(const std::vector<SpTuples<IT,NT> * > & ArrSpTups,std::tuple<IT,IT,NT> * ntuples)94 void SerialMerge( const std::vector<SpTuples<IT,NT> *> & ArrSpTups, std::tuple<IT, IT, NT> * ntuples)
95 {
96     int nlists =  ArrSpTups.size();
97     ColLexiCompare<IT,int> heapcomp;
98     std::vector<std::tuple<IT, IT, int>> heap(nlists); // if performance issue, create this outside of threaded region
99     std::vector<IT> curptr(nlists, static_cast<IT>(0));
100     IT estnnz = 0;
101     IT hsize = 0;
102     for(int i=0; i< nlists; ++i)
103     {
104         if(ArrSpTups[i]->getnnz()>0)
105         {
106             estnnz += ArrSpTups[i]->getnnz();
107             heap[hsize++] = std::make_tuple(std::get<0>(ArrSpTups[i]->tuples[0]), std::get<1>(ArrSpTups[i]->tuples[0]), i);
108         }
109 
110     }
111     std::make_heap(heap.data(), heap.data()+hsize, std::not2(heapcomp));
112     IT cnz = 0;
113 
114     while(hsize > 0)
115     {
116       std::pop_heap(heap.data(), heap.data() + hsize, std::not2(heapcomp));   // result is stored in heap[hsize-1]
117         int source = std::get<2>(heap[hsize-1]);
118 
119         if( (cnz != 0) &&
120            ((std::get<0>(ntuples[cnz-1]) == std::get<0>(heap[hsize-1])) && (std::get<1>(ntuples[cnz-1]) == std::get<1>(heap[hsize-1]))) )
121         {
122             std::get<2>(ntuples[cnz-1])  = SR::add(std::get<2>(ntuples[cnz-1]), ArrSpTups[source]->numvalue(curptr[source]++));
123         }
124         else
125         {
126             ntuples[cnz++] = ArrSpTups[source]->tuples[curptr[source]++];
127         }
128 
129         if(curptr[source] != ArrSpTups[source]->getnnz())	// That array has not been depleted
130         {
131             heap[hsize-1] = std::make_tuple(std::get<0>(ArrSpTups[source]->tuples[curptr[source]]),
132                                        std::get<1>(ArrSpTups[source]->tuples[curptr[source]]), source);
133             std::push_heap(heap.data(), heap.data()+hsize, std::not2(heapcomp));
134         }
135         else
136         {
137             --hsize;
138         }
139     }
140 }
141 
142 
143 
144 // Performs a balanced merge of the array of SpTuples
145 // Assumes the input parameters are already column sorted
146 template<class SR, class IT, class NT>
147 SpTuples<IT, NT>* MultiwayMerge( std::vector<SpTuples<IT,NT> *> & ArrSpTups, IT mdim = 0, IT ndim = 0, bool delarrs = false )
148 {
149 
150     int nlists =  ArrSpTups.size();
151     if(nlists == 0)
152     {
153         return new SpTuples<IT,NT>(0, mdim, ndim); //empty mxn SpTuples
154     }
155     if(nlists == 1)
156     {
157         if(delarrs) // steal data from input, and don't delete input
158         {
159             return ArrSpTups[0];
160         }
161         else // copy input to output
162         {
163             std::tuple<IT, IT, NT>* mergeTups = static_cast<std::tuple<IT, IT, NT>*>
164             (::operator new (sizeof(std::tuple<IT, IT, NT>[ArrSpTups[0]->getnnz()])));
165 #ifdef THREADED
166 #pragma omp parallel for
167 #endif
168             for(int i=0; i<ArrSpTups[0]->getnnz(); i++)
169                 mergeTups[i] = ArrSpTups[0]->tuples[i];
170 
171             return new SpTuples<IT,NT> (ArrSpTups[0]->getnnz(), mdim, ndim, mergeTups, true);
172         }
173     }
174 
175     // ---- check correctness of input dimensions ------
176     for(int i=0; i< nlists; ++i)
177     {
178         if((mdim != ArrSpTups[i]->getnrow()) || ndim != ArrSpTups[i]->getncol())
179         {
180             std::cerr << "Dimensions of SpTuples do not match on multiwayMerge()" << std::endl;
181             return new SpTuples<IT,NT>(0,0,0);
182         }
183     }
184 
185     int nthreads = 1;
186 #ifdef THREADED
187 #pragma omp parallel
188     {
189         nthreads = omp_get_num_threads();
190     }
191 #endif
192     int nsplits = 4*nthreads; // oversplit for load balance
193     nsplits = std::min(nsplits, (int)ndim); // we cannot split a column
194     std::vector< std::vector<IT> > colPtrs;
195     for(int i=0; i< nlists; i++)
196     {
197         colPtrs.push_back(findColSplitters<IT>(ArrSpTups[i], nsplits)); // in parallel
198     }
199 
200     std::vector<IT> mergedNnzPerSplit(nsplits);
201     std::vector<IT> inputNnzPerSplit(nsplits);
202 // ------ estimate memory requirement after merge in each split ------
203 #ifdef THREADED
204 #pragma omp parallel for schedule(dynamic)
205 #endif
206     for(int i=0; i< nsplits; i++) // for each part
207     {
208         std::vector<SpTuples<IT,NT> *> listSplitTups(nlists);
209         IT t = static_cast<IT>(0);
210         for(int j=0; j< nlists; ++j)
211         {
212             IT curnnz= colPtrs[j][i+1] - colPtrs[j][i];
213             listSplitTups[j] = new SpTuples<IT, NT> (curnnz, mdim, ndim, ArrSpTups[j]->tuples + colPtrs[j][i], true);
214             t += colPtrs[j][i+1] - colPtrs[j][i];
215         }
216         mergedNnzPerSplit[i] = SerialMergeNNZ(listSplitTups);
217         inputNnzPerSplit[i] = t;
218     }
219 
220     std::vector<IT> mdisp(nsplits+1,0);
221     for(int i=0; i<nsplits; ++i)
222         mdisp[i+1] = mdisp[i] + mergedNnzPerSplit[i];
223     IT mergedNnzAll = mdisp[nsplits];
224 
225 
226 #ifdef COMBBLAS_DEBUG
227     IT inputNnzAll = std::accumulate(inputNnzPerSplit.begin(), inputNnzPerSplit.end(), static_cast<IT>(0));
228     double ratio = inputNnzAll / (double) mergedNnzAll;
229     std::ostringstream outs;
230     outs << "Multiwaymerge: inputNnz/mergedNnz = " << ratio << std::endl;
231     SpParHelper::Print(outs.str());
232 #endif
233 
234 
235     // ------ allocate memory outside of the parallel region ------
236    std::tuple<IT, IT, NT> * mergeBuf = static_cast<std::tuple<IT, IT, NT>*> (::operator new (sizeof(std::tuple<IT, IT, NT>[mergedNnzAll])));
237 
238     // ------ perform merge in parallel ------
239 #ifdef THREADED
240 #pragma omp parallel for schedule(dynamic)
241 #endif
242     for(int i=0; i< nsplits; i++) // serially merge part by part
243     {
244         std::vector<SpTuples<IT,NT> *> listSplitTups(nlists);
245         for(int j=0; j< nlists; ++j)
246         {
247             IT curnnz= colPtrs[j][i+1] - colPtrs[j][i];
248             listSplitTups[j] = new SpTuples<IT, NT> (curnnz, mdim, ndim, ArrSpTups[j]->tuples + colPtrs[j][i], true);
249         }
250         SerialMerge<SR>(listSplitTups, mergeBuf + mdisp[i]);
251     }
252 
253     for(int i=0; i< nlists; i++)
254     {
255         if(delarrs)
256             delete ArrSpTups[i]; // May be expensive for large local matrices
257     }
258     return new SpTuples<IT, NT> (mergedNnzAll, mdim, ndim, mergeBuf, true, true);
259 }
260 
261 }
262