1 /* 2 Copyright (c) 2009, David Cheng, Viral B. Shah. 3 4 Permission is hereby granted, free of charge, to any person obtaining a copy 5 of this software and associated documentation files (the "Software"), to deal 6 in the Software without restriction, including without limitation the rights 7 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 8 copies of the Software, and to permit persons to whom the Software is 9 furnished to do so, subject to the following conditions: 10 11 The above copyright notice and this permission notice shall be included in 12 all copies or substantial portions of the Software. 13 14 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 15 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 16 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 17 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 18 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 19 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 20 THE SOFTWARE. 21 */ 22 23 #ifndef PSORT_SAMPLE_H 24 25 #define PSORT_SAMPLE_H 26 27 #include "psort_util.h" 28 29 #define MPI_PSORT_TAG 31337 30 31 namespace vpsort { 32 using namespace std; 33 34 // sample n_out elements from array in _without_ replacement 35 // done with two sweeps of size n_out 36 // in is touched, but restored 37 template<typename _RandomAccessIter, typename _ValueType, typename _Distance> sample_splitters(_RandomAccessIter in,_Distance n_in,_ValueType * out,long n_out)38 static void sample_splitters(_RandomAccessIter in, _Distance n_in, 39 _ValueType *out, long n_out) { 40 if (n_out >= n_in) { 41 // TODO 42 } else { 43 _Distance *picks = new _Distance[n_out]; 44 45 for (int i=0; i<n_out; ++i) { 46 picks[i] = static_cast<_Distance> ((n_in - i) * drand48()); 47 out[i] = *(in + picks[i]); 48 *(in + picks[i]) = *(in + (n_out - i - 1)); 49 } 50 51 for (int i=n_out-1; i>=0; --i) { 52 *(in + (n_out - i - 1)) = *(in + picks[i]); 53 *(in + picks[i]) = out[i]; 54 } 55 56 delete [] picks; 57 } 58 } 59 set_splitters(long * part_gsize,long n_parts,int nproc,long * dist,long * out)60 static inline void set_splitters(long *part_gsize, long n_parts, int nproc, long *dist, long *out) { 61 // impl 1: get all but last about correct size; last might pick up slack 62 long ind = 0, cum = 0; 63 for (long i=0; i<n_parts; ++i) { 64 cum += part_gsize[i]; 65 if (cum > dist[ind]) { 66 out[ind] = i; 67 if (cum - dist[ind] > dist[ind] - cum + part_gsize[i]) { 68 --out[ind]; 69 cum = part_gsize[i]; 70 } else { 71 cum = 0; 72 } 73 if (++ind >= nproc - 1) break; 74 } 75 } 76 } 77 78 template<typename _ValueType> redist(int * from_dist,_ValueType * current,int * to_dist,_ValueType * final,int rank,int nproc,MPI_Datatype & MPI_valueType,MPI_Comm comm)79 static void redist (int *from_dist, _ValueType *current, 80 int *to_dist, _ValueType *final, 81 int rank, int nproc, 82 MPI_Datatype &MPI_valueType, MPI_Comm comm) 83 { 84 int ** starch = new int*[nproc]; 85 86 for (int i = 0; i < nproc; ++i) { 87 starch[i] = new int[nproc]; 88 for (int j = 0; j < nproc; ++j) { 89 starch[i][j] = 0; 90 } 91 } 92 93 int i = 0, j=0; 94 int from_delta = from_dist[0], to_delta = to_dist[0]; 95 96 while ((i < nproc) && (j < nproc)) { 97 98 int diff = from_delta - to_delta; 99 100 if (diff == 0) { 101 starch[i][j] = from_delta; 102 ++i; 103 ++j; 104 from_delta = from_dist[i]; 105 to_delta = to_dist[j]; 106 } else if (diff > 0) { 107 starch[i][j] = to_delta; 108 ++j; 109 from_delta -= to_delta; 110 to_delta = to_dist[j]; 111 } else { 112 starch[i][j] = from_delta; 113 ++i; 114 to_delta -= from_delta; 115 from_delta = from_dist[i]; 116 } 117 } 118 119 MPI_Request send_req[nproc], recv_req[nproc]; 120 MPI_Status status; 121 122 int sendl = 0, recvl = 0; 123 124 for (int p = 0; p <= rank; ++p) { 125 126 int torecv = starch[p][rank]; 127 if (torecv) { 128 MPI_Irecv (&final[recvl], torecv, 129 MPI_valueType, p, MPI_PSORT_TAG, 130 comm, &recv_req[p]); 131 recvl += torecv; 132 } 133 134 int tosend = starch[rank][p]; 135 if (tosend) { 136 MPI_Isend (¤t[sendl], tosend, 137 MPI_valueType, p, MPI_PSORT_TAG, 138 comm, &send_req[p]); 139 sendl += tosend; 140 } 141 142 } 143 144 int sendr = 0, recvr = 0; 145 146 for (int p = nproc-1; p > rank; --p) { 147 148 int torecv = starch[p][rank]; 149 if (torecv) { 150 recvr += torecv; 151 MPI_Irecv (&final[to_dist[rank]-recvr], torecv, 152 MPI_valueType, p, MPI_PSORT_TAG, 153 comm, &recv_req[p]); 154 } 155 156 int tosend = starch[rank][p]; 157 if (tosend) { 158 sendr += tosend; 159 MPI_Isend (¤t[from_dist[rank]-sendr], tosend, 160 MPI_valueType, p, MPI_PSORT_TAG, 161 comm, &send_req[p]); 162 } 163 } 164 165 for (int p = 0; p < nproc; ++p) { 166 if ( starch[rank][p] ) 167 MPI_Wait(&send_req[p], &status); 168 if ( starch[p][rank] ) 169 MPI_Wait(&recv_req[p], &status); 170 } 171 172 for (int i = 0; i < nproc; ++i) 173 delete [] starch[i]; 174 delete [] starch; 175 176 } 177 178 179 template<typename _RandomAccessIter, typename _Compare, 180 typename _SeqSortType> parallel_samplesort(_RandomAccessIter first,_RandomAccessIter last,_Compare comp,long * dist,SeqSort<_SeqSortType> & mysort,long s,long k,MPI_Comm comm)181 void parallel_samplesort(_RandomAccessIter first, _RandomAccessIter last, 182 _Compare comp, long *dist, 183 SeqSort<_SeqSortType> &mysort, 184 long s, long k, 185 MPI_Comm comm) { 186 187 typedef typename iterator_traits<_RandomAccessIter>::value_type _ValueType; 188 typedef typename iterator_traits<_RandomAccessIter>::difference_type _Distance; 189 190 int nproc, rank; 191 MPI_Comm_size(comm, &nproc); 192 MPI_Comm_rank(comm, &rank); 193 194 MPI_Datatype MPI_valueType, MPI_distanceType; 195 MPI_Type_contiguous (sizeof(_ValueType), MPI_CHAR, &MPI_valueType); 196 MPI_Type_commit (&MPI_valueType); 197 MPI_Type_contiguous (sizeof(_Distance), MPI_CHAR, &MPI_distanceType); 198 MPI_Type_commit (&MPI_distanceType); 199 200 _Distance n_loc = last - first; 201 202 progress (rank, 0, "Sample splitters", comm); 203 204 // randomly pick s*k sample splitters 205 long n_samp = s*k; 206 _ValueType *s_splitters = new _ValueType[n_samp]; 207 sample_splitters(first, n_loc, s_splitters, n_samp); 208 209 // send the sample splitters to the root 210 long n_parts = nproc * k - 1; 211 _ValueType *p_splitters = new _ValueType[n_parts]; 212 if (rank != 0) { 213 MPI_Gather(s_splitters, n_samp, MPI_valueType, 214 NULL, 0, MPI_valueType, 0, comm); 215 } else { 216 _ValueType *gath_splitters = new _ValueType[nproc * n_samp]; 217 MPI_Gather(s_splitters, n_samp, MPI_valueType, 218 gath_splitters, n_samp, MPI_valueType, 219 0, comm); 220 221 // root sorts sample splitters serially 222 sort(gath_splitters, gath_splitters + (nproc * n_samp), comp); 223 224 // root picks pk-1 splitters, broadcasts 225 double stride = (double) (nproc * n_samp) / (nproc * k); 226 for (int i=0; i<n_parts; ++i) { 227 p_splitters[i] = gath_splitters[(int) (stride * (double) (i + 1))]; 228 } 229 delete [] gath_splitters; 230 } 231 232 MPI_Bcast(p_splitters, n_parts, MPI_valueType, 0, comm); 233 234 delete [] s_splitters; 235 236 progress (rank, 1, "Partition", comm); 237 238 // apply the splitters 239 // impl1: binary search each element over the splitters 240 // TODO: impl2: recurse, partitioning dfs order over splitters 241 _Distance *part_ind = new _Distance[n_loc]; 242 long *part_lsize = new long[n_parts + 1]; 243 for (long i=0; i<=n_parts; ++i) part_lsize[i] = 0; 244 245 for (long i=0; i<n_loc; ++i) { 246 // TODO: load balance when splitters have same value 247 part_ind[i] = lower_bound(p_splitters, p_splitters + n_parts, *(first + i)) 248 - p_splitters; 249 ++part_lsize[part_ind[i]]; 250 } 251 252 delete [] p_splitters; 253 254 long *boundaries = new long[nproc - 1]; 255 if (rank != 0) { 256 MPI_Reduce(part_lsize, NULL, n_parts + 1, MPI_LONG, MPI_SUM, 0, comm); 257 } else { 258 // determine global partition sizes, starch-minimizing boundaries 259 long *part_gsize = new long[n_parts + 1]; 260 MPI_Reduce(part_lsize, part_gsize, n_parts + 1, MPI_LONG, MPI_SUM, 0, comm); 261 set_splitters (part_gsize, n_parts + 1, nproc, dist, boundaries); 262 delete [] part_gsize; 263 } 264 MPI_Bcast(boundaries, nproc - 1, MPI_LONG, 0, comm); 265 266 // send data according to boundaries 267 // part_proc[i] says which processor partition i goes to 268 long *part_proc = new long[n_parts + 1]; 269 int out_counts[nproc]; 270 for (int i=0; i<nproc; ++i) out_counts[i] = 0; 271 long cur = 0; 272 for (long i=0; i<=n_parts; ++i) { 273 if (cur < nproc - 1 && i > boundaries[cur]) { 274 ++cur; 275 } 276 part_proc[i] = cur; 277 out_counts[cur] += part_lsize[i]; 278 } 279 280 delete [] boundaries; 281 delete [] part_lsize; 282 283 long curs[nproc]; 284 _ValueType **s_bufs = new _ValueType*[nproc]; 285 for (int i=0; i<nproc; ++i) { 286 curs[i] = 0; 287 s_bufs[i] = new _ValueType[out_counts[i]]; 288 } 289 for (int i=0; i<n_loc; ++i) { 290 int dest = part_proc[part_ind[i]]; 291 s_bufs[dest][curs[dest]++] = *(first + i); 292 } 293 294 delete [] part_ind; 295 delete [] part_proc; 296 297 progress (rank, 2, "Alltoall", comm); 298 299 // tell each processor how many elements to expect from m 300 int in_counts[nproc]; 301 MPI_Alltoall(out_counts, 1, MPI_INT, in_counts, 1, MPI_INT, comm); 302 303 int offsets[nproc + 1]; 304 offsets[0] = 0; 305 for (int i=1; i<=nproc; ++i) { 306 offsets[i] = offsets[i-1] + in_counts[i-1]; 307 } 308 int n_loct = offsets[nproc]; 309 310 MPI_Request send_reqs[nproc]; 311 MPI_Request recv_reqs[nproc]; 312 _ValueType *trans_data = new _ValueType[n_loct]; 313 314 for (int i=0; i<nproc; ++i) { 315 MPI_Irecv(&trans_data[offsets[i]], in_counts[i], 316 MPI_valueType, i, MPI_PSORT_TAG, comm, &recv_reqs[i]); 317 } 318 for (int i=1; i<=nproc; ++i) { 319 int dest = (rank + i) % nproc; 320 MPI_Isend(s_bufs[dest], out_counts[dest], MPI_valueType, 321 dest, MPI_PSORT_TAG, comm, &send_reqs[dest]); 322 } 323 324 MPI_Waitall(nproc, send_reqs, MPI_STATUS_IGNORE); 325 for (int i=0; i < nproc; ++i) { 326 delete [] s_bufs[i]; 327 } 328 delete [] s_bufs; 329 330 MPI_Waitall (nproc, recv_reqs, MPI_STATUS_IGNORE); 331 332 progress (rank, 3, "Sequential sort", comm); 333 334 mysort.seqsort (trans_data, trans_data + n_loct, comp); 335 336 progress (rank, 4, "Adjust boundaries", comm); 337 338 // starch 339 int trans_dist[nproc], dist_out[nproc]; 340 MPI_Allgather(&n_loct, 1, MPI_INT, trans_dist, 1, MPI_INT, comm); 341 for (int i=0; i<nproc; ++i) dist_out[i] = dist[i]; 342 redist(trans_dist, trans_data, dist_out, first, rank, nproc, MPI_valueType, comm); 343 delete [] trans_data; 344 345 progress (rank, 5, "Finish", comm); 346 347 MPI_Type_free (&MPI_valueType); 348 } 349 350 template<typename _RandomAccessIter> parallel_samplesort(_RandomAccessIter first,_RandomAccessIter last,long * dist,MPI_Comm comm)351 void parallel_samplesort(_RandomAccessIter first, _RandomAccessIter last, 352 long *dist, MPI_Comm comm) { 353 354 typedef typename iterator_traits<_RandomAccessIter>::value_type _ValueType; 355 356 STLSort stl_sort; 357 358 int nproc; 359 MPI_Comm_size (comm, &nproc); 360 long s = nproc, k = nproc; 361 parallel_samplesort (first, last, less<_ValueType>(), dist, stl_sort, s, k, comm); 362 363 } 364 365 } /* namespace vpsort */ 366 367 #endif /* PSORT_SAMPLE_H */ 368