1 /*
2 Copyright (c) 2009, David Cheng, Viral B. Shah.
3 
4 Permission is hereby granted, free of charge, to any person obtaining a copy
5 of this software and associated documentation files (the "Software"), to deal
6 in the Software without restriction, including without limitation the rights
7 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 copies of the Software, and to permit persons to whom the Software is
9 furnished to do so, subject to the following conditions:
10 
11 The above copyright notice and this permission notice shall be included in
12 all copies or substantial portions of the Software.
13 
14 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20 THE SOFTWARE.
21 */
22 
23 #ifndef PSORT_SAMPLE_H
24 
25 #define PSORT_SAMPLE_H
26 
27 #include "psort_util.h"
28 
29 #define MPI_PSORT_TAG 31337
30 
31 namespace vpsort {
32   using namespace std;
33 
34   // sample n_out elements from array in _without_ replacement
35   // done with two sweeps of size n_out
36   // in is touched, but restored
37   template<typename _RandomAccessIter, typename _ValueType, typename _Distance>
sample_splitters(_RandomAccessIter in,_Distance n_in,_ValueType * out,long n_out)38   static void sample_splitters(_RandomAccessIter in, _Distance n_in,
39 			       _ValueType *out, long n_out) {
40     if (n_out >= n_in) {
41       // TODO
42     } else {
43       _Distance *picks = new _Distance[n_out];
44 
45       for (int i=0; i<n_out; ++i) {
46 	picks[i] = static_cast<_Distance> ((n_in - i) * drand48());
47 	out[i] = *(in + picks[i]);
48 	*(in + picks[i]) = *(in + (n_out - i - 1));
49       }
50 
51       for (int i=n_out-1; i>=0; --i) {
52 	*(in + (n_out - i - 1)) = *(in + picks[i]);
53 	*(in + picks[i]) = out[i];
54       }
55 
56       delete [] picks;
57     }
58   }
59 
set_splitters(long * part_gsize,long n_parts,int nproc,long * dist,long * out)60   static inline void set_splitters(long *part_gsize, long n_parts, int nproc, long *dist, long *out) {
61     // impl 1: get all but last about correct size; last might pick up slack
62     long ind = 0, cum = 0;
63     for (long i=0; i<n_parts; ++i) {
64       cum += part_gsize[i];
65       if (cum > dist[ind]) {
66 	out[ind] = i;
67 	if (cum - dist[ind] > dist[ind] - cum + part_gsize[i]) {
68 	  --out[ind];
69 	  cum = part_gsize[i];
70 	} else {
71 	  cum = 0;
72 	}
73 	if (++ind >= nproc - 1) break;
74       }
75     }
76   }
77 
78   template<typename _ValueType>
redist(int * from_dist,_ValueType * current,int * to_dist,_ValueType * final,int rank,int nproc,MPI_Datatype & MPI_valueType,MPI_Comm comm)79   static void redist (int *from_dist, _ValueType *current,
80 		      int *to_dist, _ValueType *final,
81 		      int rank, int nproc,
82 		      MPI_Datatype &MPI_valueType, MPI_Comm comm)
83   {
84     int ** starch =  new int*[nproc];
85 
86     for (int i = 0; i < nproc; ++i) {
87         starch[i] = new int[nproc];
88         for (int j = 0; j < nproc; ++j) {
89             starch[i][j] = 0;
90       }
91     }
92 
93     int i = 0, j=0;
94     int from_delta = from_dist[0], to_delta = to_dist[0];
95 
96     while ((i < nproc) && (j < nproc)) {
97 
98       int diff = from_delta - to_delta;
99 
100       if (diff == 0) {
101 	starch[i][j] = from_delta;
102 	++i;
103 	++j;
104 	from_delta = from_dist[i];
105 	to_delta = to_dist[j];
106       } else if (diff > 0) {
107 	starch[i][j] = to_delta;
108 	++j;
109 	from_delta -= to_delta;
110 	to_delta = to_dist[j];
111       } else {
112 	starch[i][j] = from_delta;
113 	++i;
114 	to_delta -= from_delta;
115 	from_delta = from_dist[i];
116       }
117     }
118 
119     MPI_Request send_req[nproc], recv_req[nproc];
120     MPI_Status status;
121 
122     int sendl = 0, recvl = 0;
123 
124     for (int p = 0; p <= rank; ++p) {
125 
126       int torecv = starch[p][rank];
127       if (torecv) {
128 	MPI_Irecv (&final[recvl], torecv,
129 		   MPI_valueType, p, MPI_PSORT_TAG,
130 		   comm, &recv_req[p]);
131 	recvl += torecv;
132       }
133 
134       int tosend = starch[rank][p];
135       if (tosend) {
136 	MPI_Isend (&current[sendl], tosend,
137 		   MPI_valueType, p, MPI_PSORT_TAG,
138 		   comm, &send_req[p]);
139 	sendl += tosend;
140       }
141 
142     }
143 
144     int sendr = 0, recvr = 0;
145 
146     for (int p = nproc-1; p > rank; --p) {
147 
148       int torecv = starch[p][rank];
149       if (torecv) {
150 	recvr += torecv;
151 	MPI_Irecv (&final[to_dist[rank]-recvr], torecv,
152 		   MPI_valueType, p, MPI_PSORT_TAG,
153 		   comm, &recv_req[p]);
154       }
155 
156       int tosend = starch[rank][p];
157       if (tosend) {
158 	sendr += tosend;
159 	MPI_Isend (&current[from_dist[rank]-sendr], tosend,
160 		   MPI_valueType, p, MPI_PSORT_TAG,
161 		   comm, &send_req[p]);
162       }
163     }
164 
165     for (int p = 0; p < nproc; ++p) {
166       if ( starch[rank][p] )
167 	MPI_Wait(&send_req[p], &status);
168       if ( starch[p][rank] )
169 	MPI_Wait(&recv_req[p], &status);
170     }
171 
172       for (int i = 0; i < nproc; ++i)
173           delete [] starch[i];
174       delete [] starch;
175 
176   }
177 
178 
179   template<typename _RandomAccessIter, typename _Compare,
180 	   typename _SeqSortType>
parallel_samplesort(_RandomAccessIter first,_RandomAccessIter last,_Compare comp,long * dist,SeqSort<_SeqSortType> & mysort,long s,long k,MPI_Comm comm)181   void parallel_samplesort(_RandomAccessIter first, _RandomAccessIter last,
182 			    _Compare comp, long *dist,
183 			    SeqSort<_SeqSortType> &mysort,
184 			    long s, long k,
185 			    MPI_Comm comm) {
186 
187     typedef typename iterator_traits<_RandomAccessIter>::value_type _ValueType;
188     typedef typename iterator_traits<_RandomAccessIter>::difference_type _Distance;
189 
190     int nproc, rank;
191     MPI_Comm_size(comm, &nproc);
192     MPI_Comm_rank(comm, &rank);
193 
194     MPI_Datatype MPI_valueType, MPI_distanceType;
195     MPI_Type_contiguous (sizeof(_ValueType), MPI_CHAR, &MPI_valueType);
196     MPI_Type_commit (&MPI_valueType);
197     MPI_Type_contiguous (sizeof(_Distance), MPI_CHAR, &MPI_distanceType);
198     MPI_Type_commit (&MPI_distanceType);
199 
200     _Distance n_loc = last - first;
201 
202     progress (rank, 0, "Sample splitters", comm);
203 
204     // randomly pick s*k sample splitters
205     long n_samp = s*k;
206     _ValueType *s_splitters = new _ValueType[n_samp];
207     sample_splitters(first, n_loc, s_splitters, n_samp);
208 
209     // send the sample splitters to the root
210     long n_parts = nproc * k - 1;
211     _ValueType *p_splitters = new _ValueType[n_parts];
212     if (rank != 0) {
213       MPI_Gather(s_splitters, n_samp, MPI_valueType,
214 		 NULL, 0, MPI_valueType, 0, comm);
215     } else {
216       _ValueType *gath_splitters = new _ValueType[nproc * n_samp];
217       MPI_Gather(s_splitters, n_samp, MPI_valueType,
218 		 gath_splitters, n_samp, MPI_valueType,
219 		 0, comm);
220 
221       // root sorts sample splitters serially
222       sort(gath_splitters, gath_splitters + (nproc * n_samp), comp);
223 
224       // root picks pk-1 splitters, broadcasts
225       double stride = (double) (nproc * n_samp) / (nproc * k);
226       for (int i=0; i<n_parts; ++i) {
227 	p_splitters[i] = gath_splitters[(int) (stride * (double) (i + 1))];
228       }
229       delete [] gath_splitters;
230     }
231 
232     MPI_Bcast(p_splitters, n_parts, MPI_valueType, 0, comm);
233 
234     delete [] s_splitters;
235 
236     progress (rank, 1, "Partition", comm);
237 
238     // apply the splitters
239     //  impl1: binary search each element over the splitters
240     //  TODO: impl2: recurse, partitioning dfs order over splitters
241     _Distance *part_ind = new _Distance[n_loc];
242     long *part_lsize = new long[n_parts + 1];
243     for (long i=0; i<=n_parts; ++i) part_lsize[i] = 0;
244 
245     for (long i=0; i<n_loc; ++i) {
246       // TODO: load balance when splitters have same value
247       part_ind[i] = lower_bound(p_splitters, p_splitters + n_parts, *(first + i))
248 	- p_splitters;
249       ++part_lsize[part_ind[i]];
250     }
251 
252     delete [] p_splitters;
253 
254     long *boundaries = new long[nproc - 1];
255     if (rank != 0) {
256       MPI_Reduce(part_lsize, NULL, n_parts + 1, MPI_LONG, MPI_SUM, 0, comm);
257     } else {
258       // determine global partition sizes, starch-minimizing boundaries
259       long *part_gsize = new long[n_parts + 1];
260       MPI_Reduce(part_lsize, part_gsize, n_parts + 1, MPI_LONG, MPI_SUM, 0, comm);
261       set_splitters (part_gsize, n_parts + 1, nproc, dist, boundaries);
262       delete [] part_gsize;
263     }
264     MPI_Bcast(boundaries, nproc - 1, MPI_LONG, 0, comm);
265 
266     // send data according to boundaries
267     // part_proc[i] says which processor partition i goes to
268     long *part_proc = new long[n_parts + 1];
269     int out_counts[nproc];
270     for (int i=0; i<nproc; ++i) out_counts[i] = 0;
271     long cur = 0;
272     for (long i=0; i<=n_parts; ++i) {
273       if (cur < nproc - 1 && i > boundaries[cur]) {
274 	++cur;
275       }
276       part_proc[i] = cur;
277       out_counts[cur] += part_lsize[i];
278     }
279 
280     delete [] boundaries;
281     delete [] part_lsize;
282 
283     long curs[nproc];
284     _ValueType **s_bufs = new _ValueType*[nproc];
285     for (int i=0; i<nproc; ++i) {
286       curs[i] = 0;
287       s_bufs[i] = new _ValueType[out_counts[i]];
288     }
289     for (int i=0; i<n_loc; ++i) {
290       int dest = part_proc[part_ind[i]];
291       s_bufs[dest][curs[dest]++] = *(first + i);
292     }
293 
294     delete [] part_ind;
295     delete [] part_proc;
296 
297     progress (rank, 2, "Alltoall", comm);
298 
299     // tell each processor how many elements to expect from m
300     int in_counts[nproc];
301     MPI_Alltoall(out_counts, 1, MPI_INT, in_counts, 1, MPI_INT, comm);
302 
303     int offsets[nproc + 1];
304     offsets[0] = 0;
305     for (int i=1; i<=nproc; ++i) {
306       offsets[i] = offsets[i-1] + in_counts[i-1];
307     }
308     int n_loct = offsets[nproc];
309 
310     MPI_Request send_reqs[nproc];
311     MPI_Request recv_reqs[nproc];
312     _ValueType *trans_data = new _ValueType[n_loct];
313 
314     for (int i=0; i<nproc; ++i) {
315       MPI_Irecv(&trans_data[offsets[i]], in_counts[i],
316 		MPI_valueType, i, MPI_PSORT_TAG, comm, &recv_reqs[i]);
317     }
318     for (int i=1; i<=nproc; ++i) {
319       int dest = (rank + i) % nproc;
320       MPI_Isend(s_bufs[dest], out_counts[dest], MPI_valueType,
321 		dest, MPI_PSORT_TAG, comm, &send_reqs[dest]);
322     }
323 
324     MPI_Waitall(nproc, send_reqs, MPI_STATUS_IGNORE);
325     for (int i=0; i < nproc; ++i) {
326       delete [] s_bufs[i];
327     }
328     delete [] s_bufs;
329 
330     MPI_Waitall (nproc, recv_reqs, MPI_STATUS_IGNORE);
331 
332     progress (rank, 3, "Sequential sort", comm);
333 
334     mysort.seqsort (trans_data, trans_data + n_loct, comp);
335 
336     progress (rank, 4, "Adjust boundaries", comm);
337 
338     // starch
339     int trans_dist[nproc], dist_out[nproc];
340     MPI_Allgather(&n_loct, 1, MPI_INT, trans_dist, 1, MPI_INT, comm);
341     for (int i=0; i<nproc; ++i) dist_out[i] = dist[i];
342     redist(trans_dist, trans_data, dist_out, first, rank, nproc, MPI_valueType, comm);
343     delete [] trans_data;
344 
345     progress (rank, 5, "Finish", comm);
346 
347     MPI_Type_free (&MPI_valueType);
348   }
349 
350   template<typename _RandomAccessIter>
parallel_samplesort(_RandomAccessIter first,_RandomAccessIter last,long * dist,MPI_Comm comm)351     void parallel_samplesort(_RandomAccessIter first, _RandomAccessIter last,
352 			     long *dist, MPI_Comm comm) {
353 
354     typedef typename iterator_traits<_RandomAccessIter>::value_type _ValueType;
355 
356     STLSort stl_sort;
357 
358     int nproc;
359     MPI_Comm_size (comm, &nproc);
360     long s = nproc, k = nproc;
361     parallel_samplesort (first, last, less<_ValueType>(), dist, stl_sort, s, k, comm);
362 
363   }
364 
365 } /* namespace vpsort */
366 
367 #endif /* PSORT_SAMPLE_H */
368