1 2 /** 3 @file parUtils.txx 4 @brief Definitions of the templated functions in the par module. 5 @author Rahul S. Sampath, rahul.sampath@gmail.com 6 @author Hari Sundar, hsundar@gmail.com 7 @author Shravan Veerapaneni, shravan@seas.upenn.edu 8 @author Santi Swaroop Adavani, santis@gmail.com 9 */ 10 11 #include "binUtils.h" 12 #include "seqUtils.h" 13 #include "dtypes.h" 14 15 #include <cassert> 16 #include <iostream> 17 #include <algorithm> 18 #include <cstring> 19 20 21 #include "indexHolder.h" 22 23 #ifdef _PROFILE_SORT 24 #include "sort_profiler.h" 25 #endif 26 27 #include "ompUtils.h" 28 29 30 #include <mpi.h> 31 32 #ifdef __DEBUG__ 33 #ifndef __DEBUG_PAR__ 34 #define __DEBUG_PAR__ 35 #endif 36 #endif 37 38 #ifndef KWAY 39 #define KWAY 64 40 #endif 41 42 namespace par { 43 44 template <typename T> Mpi_Isend(T * buf,int count,int dest,int tag,MPI_Comm comm,MPI_Request * request)45 inline int Mpi_Isend(T* buf, int count, int dest, int tag, 46 MPI_Comm comm, MPI_Request* request) { 47 48 MPI_Isend(buf, count, par::Mpi_datatype<T>::value(), 49 dest, tag, comm, request); 50 51 return 1; 52 53 } 54 55 template <typename T> Mpi_Issend(T * buf,int count,int dest,int tag,MPI_Comm comm,MPI_Request * request)56 inline int Mpi_Issend(T* buf, int count, int dest, int tag, 57 MPI_Comm comm, MPI_Request* request) { 58 59 MPI_Issend(buf, count, par::Mpi_datatype<T>::value(), 60 dest, tag, comm, request); 61 62 return 1; 63 64 } 65 66 template <typename T> Mpi_Recv(T * buf,int count,int source,int tag,MPI_Comm comm,MPI_Status * status)67 inline int Mpi_Recv(T* buf, int count, int source, int tag, 68 MPI_Comm comm, MPI_Status* status) { 69 70 MPI_Recv(buf, count, par::Mpi_datatype<T>::value(), 71 source, tag, comm, status); 72 73 return 1; 74 75 } 76 77 template <typename T> Mpi_Irecv(T * buf,int count,int source,int tag,MPI_Comm comm,MPI_Request * request)78 inline int Mpi_Irecv(T* buf, int count, int source, int tag, 79 MPI_Comm comm, MPI_Request* request) { 80 81 MPI_Irecv(buf, count, par::Mpi_datatype<T>::value(), 82 source, tag, comm, request); 83 84 return 1; 85 86 } 87 88 template <typename T, typename S> Mpi_Sendrecv(T * sendBuf,int sendCount,int dest,int sendTag,S * recvBuf,int recvCount,int source,int recvTag,MPI_Comm comm,MPI_Status * status)89 inline int Mpi_Sendrecv( T* sendBuf, int sendCount, int dest, int sendTag, 90 S* recvBuf, int recvCount, int source, int recvTag, 91 MPI_Comm comm, MPI_Status* status) { 92 93 return MPI_Sendrecv(sendBuf, sendCount, par::Mpi_datatype<T>::value(), dest, sendTag, 94 recvBuf, recvCount, par::Mpi_datatype<S>::value(), source, recvTag, comm, status); 95 96 } 97 98 template <typename T> Mpi_Scan(T * sendbuf,T * recvbuf,int count,MPI_Op op,MPI_Comm comm)99 inline int Mpi_Scan( T* sendbuf, T* recvbuf, int count, MPI_Op op, MPI_Comm comm) { 100 #ifdef __PROFILE_WITH_BARRIER__ 101 MPI_Barrier(comm); 102 #endif 103 return MPI_Scan(sendbuf, recvbuf, count, par::Mpi_datatype<T>::value(), op, comm); 104 } 105 106 template <typename T> Mpi_Allreduce(T * sendbuf,T * recvbuf,int count,MPI_Op op,MPI_Comm comm)107 inline int Mpi_Allreduce(T* sendbuf, T* recvbuf, int count, MPI_Op op, MPI_Comm comm) { 108 #ifdef __PROFILE_WITH_BARRIER__ 109 MPI_Barrier(comm); 110 #endif 111 112 return MPI_Allreduce(sendbuf, recvbuf, count, par::Mpi_datatype<T>::value(), op, comm); 113 } 114 115 template <typename T> Mpi_Alltoall(T * sendbuf,T * recvbuf,int count,MPI_Comm comm)116 inline int Mpi_Alltoall(T* sendbuf, T* recvbuf, int count, MPI_Comm comm) { 117 #ifdef __PROFILE_WITH_BARRIER__ 118 MPI_Barrier(comm); 119 #endif 120 121 return MPI_Alltoall(sendbuf, count, par::Mpi_datatype<T>::value(), 122 recvbuf, count, par::Mpi_datatype<T>::value(), comm); 123 124 } 125 126 template <typename T> Mpi_Alltoallv(T * sendbuf,int * sendcnts,int * sdispls,T * recvbuf,int * recvcnts,int * rdispls,MPI_Comm comm)127 inline int Mpi_Alltoallv 128 (T* sendbuf, int* sendcnts, int* sdispls, 129 T* recvbuf, int* recvcnts, int* rdispls, MPI_Comm comm) { 130 #ifdef __PROFILE_WITH_BARRIER__ 131 MPI_Barrier(comm); 132 #endif 133 134 return MPI_Alltoallv( 135 sendbuf, sendcnts, sdispls, par::Mpi_datatype<T>::value(), 136 recvbuf, recvcnts, rdispls, par::Mpi_datatype<T>::value(), 137 comm); 138 return 0; 139 } 140 141 template <typename T> Mpi_Gather(T * sendBuffer,T * recvBuffer,int count,int root,MPI_Comm comm)142 inline int Mpi_Gather( T* sendBuffer, T* recvBuffer, int count, int root, MPI_Comm comm) { 143 #ifdef __PROFILE_WITH_BARRIER__ 144 MPI_Barrier(comm); 145 #endif 146 147 return MPI_Gather(sendBuffer, count, par::Mpi_datatype<T>::value(), 148 recvBuffer, count, par::Mpi_datatype<T>::value(), root, comm); 149 150 } 151 152 template <typename T> Mpi_Bcast(T * buffer,int count,int root,MPI_Comm comm)153 inline int Mpi_Bcast(T* buffer, int count, int root, MPI_Comm comm) { 154 #ifdef __PROFILE_WITH_BARRIER__ 155 MPI_Barrier(comm); 156 #endif 157 158 return MPI_Bcast(buffer, count, par::Mpi_datatype<T>::value(), root, comm); 159 160 } 161 162 template <typename T> Mpi_Reduce(T * sendbuf,T * recvbuf,int count,MPI_Op op,int root,MPI_Comm comm)163 inline int Mpi_Reduce(T* sendbuf, T* recvbuf, int count, MPI_Op op, int root, MPI_Comm comm) { 164 #ifdef __PROFILE_WITH_BARRIER__ 165 MPI_Barrier(comm); 166 #endif 167 168 return MPI_Reduce(sendbuf, recvbuf, count, par::Mpi_datatype<T>::value(), op, root, comm); 169 170 } 171 172 template <typename T> Mpi_Allgatherv(T * sendBuf,int sendCount,T * recvBuf,int * recvCounts,int * displs,MPI_Comm comm)173 int Mpi_Allgatherv(T* sendBuf, int sendCount, T* recvBuf, 174 int* recvCounts, int* displs, MPI_Comm comm) { 175 #ifdef __PROFILE_WITH_BARRIER__ 176 MPI_Barrier(comm); 177 #endif 178 179 #ifdef __USE_A2A_FOR_MPI_ALLGATHER__ 180 181 int maxSendCount; 182 int npes, rank; 183 184 MPI_Comm_size(comm, &npes); 185 MPI_Comm_rank(comm, &rank); 186 187 par::Mpi_Allreduce<int>(&sendCount, &maxSendCount, 1, MPI_MAX, comm); 188 189 T* dummySendBuf = new T[maxSendCount*npes]; 190 assert(dummySendBuf); 191 192 #pragma omp parallel for 193 for(int i = 0; i < npes; i++) { 194 for(int j = 0; j < sendCount; j++) { 195 dummySendBuf[(i*maxSendCount) + j] = sendBuf[j]; 196 } 197 } 198 199 T* dummyRecvBuf = new T[maxSendCount*npes]; 200 assert(dummyRecvBuf); 201 202 par::Mpi_Alltoall<T>(dummySendBuf, dummyRecvBuf, maxSendCount, comm); 203 204 #pragma omp parallel for 205 for(int i = 0; i < npes; i++) { 206 for(int j = 0; j < recvCounts[i]; j++) { 207 recvBuf[displs[i] + j] = dummyRecvBuf[(i*maxSendCount) + j]; 208 } 209 } 210 211 delete [] dummySendBuf; 212 delete [] dummyRecvBuf; 213 return 1; 214 215 #else 216 217 return MPI_Allgatherv(sendBuf, sendCount, par::Mpi_datatype<T>::value(), 218 recvBuf, recvCounts, displs, par::Mpi_datatype<T>::value(), comm); 219 220 #endif 221 222 } 223 224 template <typename T> Mpi_Allgather(T * sendBuf,T * recvBuf,int count,MPI_Comm comm)225 int Mpi_Allgather(T* sendBuf, T* recvBuf, int count, MPI_Comm comm) { 226 #ifdef __PROFILE_WITH_BARRIER__ 227 MPI_Barrier(comm); 228 #endif 229 230 #ifdef __USE_A2A_FOR_MPI_ALLGATHER__ 231 232 int npes; 233 MPI_Comm_size(comm, &npes); 234 T* dummySendBuf = new T[count*npes]; 235 assert(dummySendBuf); 236 #pragma omp parallel for 237 for(int i = 0; i < npes; i++) { 238 for(int j = 0; j < count; j++) { 239 dummySendBuf[(i*count) + j] = sendBuf[j]; 240 } 241 } 242 par::Mpi_Alltoall<T>(dummySendBuf, recvBuf, count, comm); 243 delete [] dummySendBuf; 244 return 1; 245 246 #else 247 248 return MPI_Allgather(sendBuf, count, par::Mpi_datatype<T>::value(), 249 recvBuf, count, par::Mpi_datatype<T>::value(), comm); 250 251 #endif 252 253 } 254 255 template <typename T> Mpi_Alltoallv_sparse(T * sendbuf,int * sendcnts,int * sdispls,T * recvbuf,int * recvcnts,int * rdispls,MPI_Comm comm)256 int Mpi_Alltoallv_sparse(T* sendbuf, int* sendcnts, int* sdispls, 257 T* recvbuf, int* recvcnts, int* rdispls, MPI_Comm comm) { 258 #ifdef __PROFILE_WITH_BARRIER__ 259 MPI_Barrier(comm); 260 #endif 261 262 #ifndef ALLTOALLV_FIX 263 return Mpi_Alltoallv 264 (sendbuf, sendcnts, sdispls, 265 recvbuf, recvcnts, rdispls, comm); 266 #else 267 268 int npes, rank; 269 MPI_Comm_size(comm, &npes); 270 MPI_Comm_rank(comm, &rank); 271 272 int commCnt = 0; 273 274 #pragma omp parallel for reduction(+:commCnt) 275 for(int i = 0; i < rank; i++) { 276 if(sendcnts[i] > 0) { 277 commCnt++; 278 } 279 if(recvcnts[i] > 0) { 280 commCnt++; 281 } 282 } 283 284 #pragma omp parallel for reduction(+:commCnt) 285 for(int i = (rank+1); i < npes; i++) { 286 if(sendcnts[i] > 0) { 287 commCnt++; 288 } 289 if(recvcnts[i] > 0) { 290 commCnt++; 291 } 292 } 293 294 MPI_Request* requests = new MPI_Request[commCnt]; 295 assert(requests); 296 297 MPI_Status* statuses = new MPI_Status[commCnt]; 298 assert(statuses); 299 300 commCnt = 0; 301 302 //First place all recv requests. Do not recv from self. 303 for(int i = 0; i < rank; i++) { 304 if(recvcnts[i] > 0) { 305 par::Mpi_Irecv<T>( &(recvbuf[rdispls[i]]) , recvcnts[i], i, 1, 306 comm, &(requests[commCnt]) ); 307 commCnt++; 308 } 309 } 310 311 for(int i = (rank + 1); i < npes; i++) { 312 if(recvcnts[i] > 0) { 313 par::Mpi_Irecv<T>( &(recvbuf[rdispls[i]]) , recvcnts[i], i, 1, 314 comm, &(requests[commCnt]) ); 315 commCnt++; 316 } 317 } 318 319 //Next send the messages. Do not send to self. 320 for(int i = 0; i < rank; i++) { 321 if(sendcnts[i] > 0) { 322 par::Mpi_Issend<T>( &(sendbuf[sdispls[i]]), sendcnts[i], i, 1, 323 comm, &(requests[commCnt]) ); 324 commCnt++; 325 } 326 } 327 328 for(int i = (rank + 1); i < npes; i++) { 329 if(sendcnts[i] > 0) { 330 par::Mpi_Issend<T>( &(sendbuf[sdispls[i]]), sendcnts[i], 331 i, 1, comm, &(requests[commCnt]) ); 332 commCnt++; 333 } 334 } 335 336 //Now copy local portion. 337 #ifdef __DEBUG_PAR__ 338 assert(sendcnts[rank] == recvcnts[rank]); 339 #endif 340 341 #pragma omp parallel for 342 for(int i = 0; i < sendcnts[rank]; i++) { 343 recvbuf[rdispls[rank] + i] = sendbuf[sdispls[rank] + i]; 344 } 345 346 MPI_Waitall(commCnt, requests, statuses); 347 348 delete [] requests; 349 delete [] statuses; 350 return 1; 351 #endif 352 } 353 354 //* 355 template <typename T> Mpi_Alltoallv_dense(T * sbuff_,int * s_cnt_,int * sdisp_,T * rbuff_,int * r_cnt_,int * rdisp_,MPI_Comm c)356 int Mpi_Alltoallv_dense(T* sbuff_, int* s_cnt_, int* sdisp_, 357 T* rbuff_, int* r_cnt_, int* rdisp_, MPI_Comm c){ 358 359 #ifdef __PROFILE_WITH_BARRIER__ 360 MPI_Barrier(comm); 361 #endif 362 363 #ifndef ALLTOALLV_FIX 364 return Mpi_Alltoallv 365 (sbuff_, s_cnt_, sdisp_, 366 rbuff_, r_cnt_, rdisp_, c); 367 #else 368 int kway = KWAY; 369 int np, pid; 370 MPI_Comm_size(c, &np); 371 MPI_Comm_rank(c, &pid); 372 int range[2]={0,np}; 373 int split_id, partner; 374 375 std::vector<int> s_cnt(np); 376 #pragma omp parallel for 377 for(int i=0;i<np;i++){ 378 s_cnt[i]=s_cnt_[i]*sizeof(T)+2*sizeof(int); 379 } 380 std::vector<int> sdisp(np); sdisp[0]=0; 381 omp_par::scan(&s_cnt[0],&sdisp[0],np); 382 383 char* sbuff=new char[sdisp[np-1]+s_cnt[np-1]]; 384 #pragma omp parallel for 385 for(int i=0;i<np;i++){ 386 ((int*)&sbuff[sdisp[i]])[0]=s_cnt[i]; 387 ((int*)&sbuff[sdisp[i]])[1]=pid; 388 memcpy(&sbuff[sdisp[i]]+2*sizeof(int),&sbuff_[sdisp_[i]],s_cnt[i]-2*sizeof(int)); 389 } 390 391 //int t_indx=0; 392 int iter_cnt=0; 393 while(range[1]-range[0]>1){ 394 iter_cnt++; 395 if(kway>range[1]-range[0]) 396 kway=range[1]-range[0]; 397 398 std::vector<int> new_range(kway+1); 399 for(int i=0;i<=kway;i++) 400 new_range[i]=(range[0]*(kway-i)+range[1]*i)/kway; 401 int p_class=(std::upper_bound(&new_range[0],&new_range[kway],pid)-&new_range[0]-1); 402 int new_np=new_range[p_class+1]-new_range[p_class]; 403 int new_pid=pid-new_range[p_class]; 404 405 //Communication. 406 { 407 std::vector<int> r_cnt (new_np*kway, 0); 408 std::vector<int> r_cnt_ext(new_np*kway, 0); 409 //Exchange send sizes. 410 for(int i=0;i<kway;i++){ 411 MPI_Status status; 412 int cmp_np=new_range[i+1]-new_range[i]; 413 int partner=(new_pid<cmp_np? new_range[i]+new_pid: new_range[i+1]-1) ; 414 assert( (new_pid<cmp_np? true: new_range[i]+new_pid==new_range[i+1] )); //Remove this. 415 MPI_Sendrecv(&s_cnt[new_range[i]-new_range[0]], cmp_np, MPI_INT, partner, 0, 416 &r_cnt[new_np *i ], new_np, MPI_INT, partner, 0, c, &status); 417 418 //Handle extra communication. 419 if(new_pid==new_np-1 && cmp_np>new_np){ 420 int partner=new_range[i+1]-1; 421 std::vector<int> s_cnt_ext(cmp_np, 0); 422 MPI_Sendrecv(&s_cnt_ext[ 0], cmp_np, MPI_INT, partner, 0, 423 &r_cnt_ext[new_np*i], new_np, MPI_INT, partner, 0, c, &status); 424 } 425 } 426 427 //Allocate receive buffer. 428 std::vector<int> rdisp (new_np*kway, 0); 429 std::vector<int> rdisp_ext(new_np*kway, 0); 430 int rbuff_size, rbuff_size_ext; 431 char *rbuff, *rbuff_ext; 432 { 433 omp_par::scan(&r_cnt [0], &rdisp [0],new_np*kway); 434 omp_par::scan(&r_cnt_ext[0], &rdisp_ext[0],new_np*kway); 435 rbuff_size = rdisp [new_np*kway-1] + r_cnt [new_np*kway-1]; 436 rbuff_size_ext = rdisp_ext[new_np*kway-1] + r_cnt_ext[new_np*kway-1]; 437 rbuff = new char[rbuff_size ]; 438 rbuff_ext = new char[rbuff_size_ext]; 439 } 440 441 //Sendrecv data. 442 //* 443 int my_block=kway; 444 while(pid<new_range[my_block]) my_block--; 445 // MPI_Barrier(c); 446 for(int i_=0;i_<=kway/2;i_++){ 447 int i1=(my_block+i_)%kway; 448 int i2=(my_block+kway-i_)%kway; 449 450 for(int j=0;j<(i_==0 || i_==kway/2?1:2);j++){ 451 int i=(i_==0?i1:((j+my_block/i_)%2?i1:i2)); 452 MPI_Status status; 453 int cmp_np=new_range[i+1]-new_range[i]; 454 int partner=(new_pid<cmp_np? new_range[i]+new_pid: new_range[i+1]-1) ; 455 456 int send_dsp =sdisp[new_range[i ]-new_range[0] ]; 457 int send_dsp_last=sdisp[new_range[i+1]-new_range[0]-1]; 458 int send_cnt =s_cnt[new_range[i+1]-new_range[0]-1]+send_dsp_last-send_dsp; 459 460 // ttt=omp_get_wtime(); 461 MPI_Sendrecv(&sbuff[send_dsp], send_cnt, MPI_BYTE, partner, 0, 462 &rbuff[rdisp[new_np * i ]], r_cnt[new_np *(i+1)-1]+rdisp[new_np *(i+1)-1]-rdisp[new_np * i ], MPI_BYTE, partner, 0, c, &status); 463 464 //Handle extra communication. 465 if(pid==new_np-1 && cmp_np>new_np){ 466 int partner=new_range[i+1]-1; 467 std::vector<int> s_cnt_ext(cmp_np, 0); 468 MPI_Sendrecv( NULL, 0, MPI_BYTE, partner, 0, 469 &rbuff[rdisp_ext[new_np*i]], r_cnt_ext[new_np*(i+1)-1]+rdisp_ext[new_np*(i+1)-1]-rdisp_ext[new_np*i], MPI_BYTE, partner, 0, c, &status); 470 } 471 } 472 } 473 474 //Rearrange received data. 475 { 476 if(sbuff!=NULL) delete[] sbuff; 477 sbuff=new char[rbuff_size+rbuff_size_ext]; 478 479 std::vector<int> cnt_new(2*new_np*kway, 0); 480 std::vector<int> disp_new(2*new_np*kway, 0); 481 for(int i=0;i<new_np;i++) 482 for(int j=0;j<kway;j++){ 483 cnt_new[(i*2 )*kway+j]=r_cnt [j*new_np+i]; 484 cnt_new[(i*2+1)*kway+j]=r_cnt_ext[j*new_np+i]; 485 } 486 omp_par::scan(&cnt_new[0], &disp_new[0],2*new_np*kway); 487 488 #pragma omp parallel for 489 for(int i=0;i<new_np;i++) 490 for(int j=0;j<kway;j++){ 491 memcpy(&sbuff[disp_new[(i*2 )*kway+j]], &rbuff [rdisp [j*new_np+i]], r_cnt [j*new_np+i]); 492 memcpy(&sbuff[disp_new[(i*2+1)*kway+j]], &rbuff_ext[rdisp_ext[j*new_np+i]], r_cnt_ext[j*new_np+i]); 493 } 494 495 //Free memory. 496 if(rbuff !=NULL) delete[] rbuff ; 497 if(rbuff_ext!=NULL) delete[] rbuff_ext; 498 499 s_cnt.clear(); 500 s_cnt.resize(new_np,0); 501 sdisp.resize(new_np); 502 for(int i=0;i<new_np;i++){ 503 for(int j=0;j<2*kway;j++) 504 s_cnt[i]+=cnt_new[i*2*kway+j]; 505 sdisp[i]=disp_new[i*2*kway]; 506 } 507 } 508 } 509 510 range[0]=new_range[p_class ]; 511 range[1]=new_range[p_class+1]; 512 } 513 514 //Copy data to rbuff_. 515 std::vector<char*> buff_ptr(np); 516 char* tmp_ptr=sbuff; 517 for(int i=0;i<np;i++){ 518 int& blk_size=((int*)tmp_ptr)[0]; 519 buff_ptr[i]=tmp_ptr; 520 tmp_ptr+=blk_size; 521 } 522 #pragma omp parallel for 523 for(int i=0;i<np;i++){ 524 int& blk_size=((int*)buff_ptr[i])[0]; 525 int& src_pid=((int*)buff_ptr[i])[1]; 526 assert(blk_size-2*sizeof(int)<=r_cnt_[src_pid]*sizeof(T)); 527 memcpy(&rbuff_[rdisp_[src_pid]],buff_ptr[i]+2*sizeof(int),blk_size-2*sizeof(int)); 528 } 529 530 //Free memory. 531 if(sbuff !=NULL) delete[] sbuff; 532 return 1; 533 #endif 534 535 } 536 537 538 template<typename T> defaultWeight(const T * a)539 unsigned int defaultWeight(const T *a){ 540 return 1; 541 } 542 543 template<typename T> partitionW(std::vector<T> & nodeList,unsigned int (* getWeight)(const T *),MPI_Comm comm)544 int partitionW(std::vector<T>& nodeList, unsigned int (*getWeight)(const T *), MPI_Comm comm){ 545 #ifdef __PROFILE_WITH_BARRIER__ 546 MPI_Barrier(comm); 547 #endif 548 int npes; 549 550 MPI_Comm_size(comm, &npes); 551 552 if(getWeight == NULL) { 553 getWeight = par::defaultWeight<T>; 554 } 555 556 int rank; 557 558 MPI_Comm_rank(comm, &rank); 559 560 MPI_Request request; 561 MPI_Status status; 562 const bool nEmpty = nodeList.empty(); 563 564 DendroIntL off1= 0, off2= 0, localWt= 0, totalWt = 0; 565 566 DendroIntL* wts = NULL; 567 DendroIntL* lscn = NULL; 568 DendroIntL nlSize = nodeList.size(); 569 if(nlSize) { 570 wts = new DendroIntL[nlSize]; 571 assert(wts); 572 573 lscn= new DendroIntL[nlSize]; 574 assert(lscn); 575 } 576 577 // First construct arrays of id and wts. 578 #pragma omp parallel for reduction(+:localWt) 579 for (DendroIntL i = 0; i < nlSize; i++){ 580 wts[i] = (*getWeight)( &(nodeList[i]) ); 581 localWt+=wts[i]; 582 } 583 584 #ifdef __DEBUG_PAR__ 585 MPI_Barrier(comm); 586 if(!rank) { 587 std::cout<<"Partition: Stage-1 passed."<<std::endl; 588 } 589 MPI_Barrier(comm); 590 #endif 591 592 // compute the total weight of the problem ... 593 par::Mpi_Allreduce<DendroIntL>(&localWt, &totalWt, 1, MPI_SUM, comm); 594 595 // perform a local scan on the weights first ... 596 DendroIntL zero = 0; 597 if(!nEmpty) { 598 lscn[0]=wts[0]; 599 // for (DendroIntL i = 1; i < nlSize; i++) { 600 // lscn[i] = wts[i] + lscn[i-1]; 601 // }//end for 602 omp_par::scan(&wts[1],lscn,nlSize); 603 // now scan with the final members of 604 par::Mpi_Scan<DendroIntL>(lscn+nlSize-1, &off1, 1, MPI_SUM, comm ); 605 } else{ 606 par::Mpi_Scan<DendroIntL>(&zero, &off1, 1, MPI_SUM, comm ); 607 } 608 609 // communicate the offsets ... 610 if (rank < (npes-1)){ 611 par::Mpi_Issend<DendroIntL>( &off1, 1, rank+1, 0, comm, &request ); 612 } 613 if (rank){ 614 par::Mpi_Recv<DendroIntL>( &off2, 1, rank-1, 0, comm, &status ); 615 } 616 else{ 617 off2 = 0; 618 } 619 620 // add offset to local array 621 #pragma omp parallel for 622 for (DendroIntL i = 0; i < nlSize; i++) { 623 lscn[i] = lscn[i] + off2; // This has the global scan results now ... 624 }//end for 625 626 #ifdef __DEBUG_PAR__ 627 MPI_Barrier(comm); 628 if(!rank) { 629 std::cout<<"Partition: Stage-2 passed."<<std::endl; 630 } 631 MPI_Barrier(comm); 632 #endif 633 634 int * sendSz = new int [npes]; 635 assert(sendSz); 636 637 int * recvSz = new int [npes]; 638 assert(recvSz); 639 640 int * sendOff = new int [npes]; 641 assert(sendOff); 642 sendOff[0] = 0; 643 644 int * recvOff = new int [npes]; 645 assert(recvOff); 646 recvOff[0] = 0; 647 648 // compute the partition offsets and sizes so that All2Allv can be performed. 649 // initialize ... 650 651 #pragma omp parallel for 652 for (int i = 0; i < npes; i++) { 653 sendSz[i] = 0; 654 } 655 656 // Now determine the average load ... 657 DendroIntL npesLong = npes; 658 DendroIntL avgLoad = (totalWt/npesLong); 659 660 DendroIntL extra = (totalWt%npesLong); 661 662 //The Heart of the algorithm.... 663 if(avgLoad > 0) { 664 for (DendroIntL i = 0; i < nlSize; i++) { 665 if(lscn[i] == 0) { 666 sendSz[0]++; 667 }else { 668 int ind=0; 669 if ( lscn[i] <= (extra*(avgLoad + 1)) ) { 670 ind = ((lscn[i] - 1)/(avgLoad + 1)); 671 }else { 672 ind = ((lscn[i] - (1 + extra))/avgLoad); 673 } 674 assert(ind < npes); 675 sendSz[ind]++; 676 }//end if-else 677 }//end for */ 678 /* 679 //This is more effecient and parallelizable than the above. 680 //This has a bug trying a simpler approach below. 681 int ind_min,ind_max; 682 ind_min=(lscn[0]*npesLong)/totalWt-1; 683 ind_max=(lscn[nlSize-1]*npesLong)/totalWt+2; 684 if(ind_min< 0 )ind_min=0; 685 if(ind_max>=npesLong)ind_max=npesLong; 686 #pragma omp parallel for 687 for(int i=ind_min;i<ind_max;i++){ 688 DendroIntL wt1=(totalWt*i)/npesLong; 689 DendroIntL wt2=(totalWt*(i+1))/npesLong; 690 int end = std::upper_bound(&lscn[0], &lscn[nlSize], wt2, std::less<DendroIntL>())-&lscn[0]; 691 int start = std::upper_bound(&lscn[0], &lscn[nlSize], wt1, std::less<DendroIntL>())-&lscn[0]; 692 if(i==npesLong-1)end =nlSize; 693 if(i== 0)start=0 ; 694 sendSz[i]=end-start; 695 }// */ 696 697 #ifdef __DEBUG_PAR__ 698 int tmp_sum=0; 699 for(int i=0;i<npes;i++) tmp_sum+=sendSz[i]; 700 assert(tmp_sum==nlSize); 701 #endif 702 703 }else { 704 sendSz[0]+= nlSize; 705 }//end if-else 706 707 #ifdef __DEBUG_PAR__ 708 MPI_Barrier(comm); 709 if(!rank) { 710 std::cout<<"Partition: Stage-3 passed."<<std::endl; 711 } 712 MPI_Barrier(comm); 713 #endif 714 715 if(rank < (npes-1)) { 716 MPI_Status statusWait; 717 MPI_Wait(&request, &statusWait); 718 } 719 720 // communicate with other procs how many you shall be sending and get how 721 // many to recieve from whom. 722 par::Mpi_Alltoall<int>(sendSz, recvSz, 1, comm); 723 724 #ifdef __DEBUG_PAR__ 725 DendroIntL totSendToOthers = 0; 726 DendroIntL totRecvFromOthers = 0; 727 for (int i = 0; i < npes; i++) { 728 if(rank != i) { 729 totSendToOthers += sendSz[i]; 730 totRecvFromOthers += recvSz[i]; 731 } 732 } 733 #endif 734 735 DendroIntL nn=0; // new value of nlSize, ie the local nodes. 736 #pragma omp parallel for reduction(+:nn) 737 for (int i = 0; i < npes; i++) { 738 nn += recvSz[i]; 739 } 740 741 // compute offsets ... 742 omp_par::scan(sendSz,sendOff,npes); 743 omp_par::scan(recvSz,recvOff,npes); 744 745 #ifdef __DEBUG_PAR__ 746 MPI_Barrier(comm); 747 if(!rank) { 748 std::cout<<"Partition: Stage-4 passed."<<std::endl; 749 } 750 MPI_Barrier(comm); 751 #endif 752 753 // allocate memory for the new arrays ... 754 std::vector<T > newNodes(nn); 755 756 #ifdef __DEBUG_PAR__ 757 MPI_Barrier(comm); 758 if(!rank) { 759 std::cout<<"Partition: Final alloc successful."<<std::endl; 760 } 761 MPI_Barrier(comm); 762 #endif 763 764 // perform All2All ... 765 T* nodeListPtr = NULL; 766 T* newNodesPtr = NULL; 767 if(!nodeList.empty()) { 768 nodeListPtr = &(*(nodeList.begin())); 769 } 770 if(!newNodes.empty()) { 771 newNodesPtr = &(*(newNodes.begin())); 772 } 773 par::Mpi_Alltoallv_sparse<T>(nodeListPtr, sendSz, sendOff, 774 newNodesPtr, recvSz, recvOff, comm); 775 776 #ifdef __DEBUG_PAR__ 777 MPI_Barrier(comm); 778 if(!rank) { 779 std::cout<<"Partition: Stage-5 passed."<<std::endl; 780 } 781 MPI_Barrier(comm); 782 #endif 783 784 // reset the pointer ... 785 swap(nodeList, newNodes); 786 newNodes.clear(); 787 788 // clean up... 789 if(!nEmpty) { 790 delete [] lscn; 791 delete [] wts; 792 } 793 delete [] sendSz; 794 sendSz = NULL; 795 796 delete [] sendOff; 797 sendOff = NULL; 798 799 delete [] recvSz; 800 recvSz = NULL; 801 802 delete [] recvOff; 803 recvOff = NULL; 804 805 #ifdef __DEBUG_PAR__ 806 MPI_Barrier(comm); 807 if(!rank) { 808 std::cout<<"Partition: Stage-6 passed."<<std::endl; 809 } 810 MPI_Barrier(comm); 811 #endif 812 813 }//end function 814 815 /* Hari Notes .... 816 * 817 * Avoid unwanted allocations within Hypersort ... 818 * 819 * 1. try to sort in place ... no output buffer, user can create a copy if 820 * needed. 821 * 2. have a std::vector<T> container for rbuff. the space required can be 822 * reserved before doing MPI_SendRecv 823 * 3. alternatively, keep a send buffer and recv into original buffer. 824 * 825 */ 826 template<typename T> HyperQuickSort(std::vector<T> & arr,MPI_Comm comm_)827 int HyperQuickSort(std::vector<T>& arr, MPI_Comm comm_){ // O( ((N/p)+log(p))*(log(N/p)+log(p)) ) 828 #ifdef __PROFILE_WITH_BARRIER__ 829 MPI_Barrier(comm); 830 #endif 831 #ifdef _PROFILE_SORT 832 long bytes_comm=0; 833 total_sort.start(); 834 #endif 835 836 837 // Copy communicator. 838 MPI_Comm comm=comm_; 839 840 // Get comm size and rank. 841 int npes, myrank, rank_; 842 MPI_Comm_size(comm, &npes); 843 MPI_Comm_rank(comm, &myrank); 844 rank_ = myrank; 845 846 if(npes==1){ 847 #ifdef _PROFILE_SORT 848 seq_sort.start(); 849 #endif 850 omp_par::merge_sort(&arr[0],&arr[arr.size()]); 851 #ifdef _PROFILE_SORT 852 seq_sort.stop(); 853 total_sort.stop(); 854 #endif 855 } 856 // buffers ... keeping all allocations together 857 std::vector<T> commBuff; 858 std::vector<T> mergeBuff; 859 std::vector<int> glb_splt_cnts(npes); 860 std::vector<int> glb_splt_disp(npes,0); 861 862 863 int omp_p=omp_get_max_threads(); 864 srand(myrank); 865 866 // Local and global sizes. O(log p) 867 long totSize, nelem = arr.size(); assert(nelem); 868 par::Mpi_Allreduce<long>(&nelem, &totSize, 1, MPI_SUM, comm); 869 long nelem_ = nelem; 870 871 // Local sort. 872 #ifdef _PROFILE_SORT 873 seq_sort.start(); 874 #endif 875 omp_par::merge_sort(&arr[0], &arr[arr.size()]); 876 #ifdef _PROFILE_SORT 877 seq_sort.stop(); 878 #endif 879 880 // Binary split and merge in each iteration. 881 while(npes>1 && totSize>0){ // O(log p) iterations. 882 883 //Determine splitters. O( log(N/p) + log(p) ) 884 #ifdef _PROFILE_SORT 885 hyper_compute_splitters.start(); 886 #endif 887 T split_key; 888 long totSize_new; 889 //while(true) 890 { 891 // Take random splitters. O( 1 ) -- Let p * splt_count = glb_splt_count = const = 100~1000 892 int splt_count = (1000*nelem)/totSize; 893 if (npes>1000) 894 splt_count = ( ((float)rand()/(float)RAND_MAX)*totSize < (1000*nelem) ? 1 : 0 ); 895 896 if ( splt_count > nelem ) 897 splt_count = nelem; 898 899 std::vector<T> splitters(splt_count); 900 for(size_t i=0;i<splt_count;i++) 901 splitters[i]=arr[rand()%nelem]; 902 903 // Gather all splitters. O( log(p) ) 904 int glb_splt_count; 905 906 par::Mpi_Allgather<int>(&splt_count, &glb_splt_cnts[0], 1, comm); 907 omp_par::scan(&glb_splt_cnts[0],&glb_splt_disp[0],npes); 908 909 glb_splt_count = glb_splt_cnts[npes-1] + glb_splt_disp[npes-1]; 910 911 std::vector<T> glb_splitters(glb_splt_count); 912 913 MPI_Allgatherv(&splitters[0], splt_count, par::Mpi_datatype<T>::value(), 914 &glb_splitters[0], &glb_splt_cnts[0], &glb_splt_disp[0], 915 par::Mpi_datatype<T>::value(), comm); 916 917 // Determine split key. O( log(N/p) + log(p) ) 918 std::vector<long> disp(glb_splt_count,0); 919 920 if(nelem>0){ 921 #pragma omp parallel for 922 for(size_t i=0;i<glb_splt_count;i++){ 923 disp[i]=std::lower_bound(&arr[0], &arr[nelem], glb_splitters[i]) - &arr[0]; 924 } 925 } 926 std::vector<long> glb_disp(glb_splt_count,0); 927 MPI_Allreduce(&disp[0], &glb_disp[0], glb_splt_count, par::Mpi_datatype<long>::value(), MPI_SUM, comm); 928 929 long* split_disp = &glb_disp[0]; 930 for(size_t i=0; i<glb_splt_count; i++) 931 if ( abs(glb_disp[i] - totSize/2) < abs(*split_disp - totSize/2) ) 932 split_disp = &glb_disp[i]; 933 split_key = glb_splitters[split_disp - &glb_disp[0]]; 934 935 totSize_new=(myrank<=(npes-1)/2?*split_disp:totSize-*split_disp); 936 //double err=(((double)*split_disp)/(totSize/2))-1.0; 937 //if(fabs(err)<0.01 || npes<=16) break; 938 //else if(!myrank) std::cout<<err<<'\n'; 939 } 940 #ifdef _PROFILE_SORT 941 hyper_compute_splitters.stop(); 942 #endif 943 944 // Split problem into two. O( N/p ) 945 int split_id=(npes-1)/2; 946 { 947 #ifdef _PROFILE_SORT 948 hyper_communicate.start(); 949 #endif 950 int new_p0 = (myrank<=split_id?0:split_id+1); 951 int cmp_p0 = (myrank> split_id?0:split_id+1); 952 int new_np = (myrank<=split_id? split_id+1: npes-split_id-1); 953 int cmp_np = (myrank> split_id? split_id+1: npes-split_id-1); 954 955 int partner = myrank+cmp_p0-new_p0; 956 if(partner>=npes) partner=npes-1; 957 assert(partner>=0); 958 959 bool extra_partner=( npes%2==1 && npes-1==myrank ); 960 961 // Exchange send sizes. 962 char *sbuff, *lbuff; 963 964 int rsize=0, ssize=0, lsize=0; 965 int ext_rsize=0, ext_ssize=0; 966 size_t split_indx=(nelem>0?std::lower_bound(&arr[0], &arr[nelem], split_key)-&arr[0]:0); 967 ssize= (myrank> split_id? split_indx: nelem-split_indx )*sizeof(T); 968 sbuff=(char*)(myrank> split_id? &arr[0] : &arr[split_indx]); 969 lsize= (myrank<=split_id? split_indx: nelem-split_indx )*sizeof(T); 970 lbuff=(char*)(myrank<=split_id? &arr[0] : &arr[split_indx]); 971 972 MPI_Status status; 973 MPI_Sendrecv (& ssize,1,MPI_INT, partner,0, & rsize,1,MPI_INT, partner, 0,comm,&status); 974 if(extra_partner) MPI_Sendrecv(&ext_ssize,1,MPI_INT,split_id,0, &ext_rsize,1,MPI_INT,split_id, 0,comm,&status); 975 976 // Exchange data. 977 commBuff.reserve(rsize/sizeof(T)); 978 char* rbuff = (char *)(&commBuff[0]); 979 char* ext_rbuff=(ext_rsize>0? new char[ext_rsize]: NULL); 980 MPI_Sendrecv (sbuff,ssize,MPI_BYTE, partner,0, rbuff, rsize,MPI_BYTE, partner, 0,comm,&status); 981 if(extra_partner) MPI_Sendrecv( NULL, 0,MPI_BYTE,split_id,0, ext_rbuff,ext_rsize,MPI_BYTE,split_id, 0,comm,&status); 982 #ifdef _PROFILE_SORT 983 bytes_comm += ssize; 984 hyper_communicate.stop(); 985 hyper_merge.start(); 986 #endif 987 988 int nbuff_size=lsize+rsize+ext_rsize; 989 mergeBuff.reserve(nbuff_size/sizeof(T)); 990 char* nbuff= (char *)(&mergeBuff[0]); // new char[nbuff_size]; 991 omp_par::merge<T*>((T*)lbuff, (T*)&lbuff[lsize], (T*)rbuff, (T*)&rbuff[rsize], (T*)nbuff, omp_p, std::less<T>()); 992 if(ext_rsize>0 && nbuff!=NULL){ 993 // XXX case not handled 994 char* nbuff1= new char[nbuff_size]; 995 omp_par::merge<T*>((T*)nbuff, (T*)&nbuff[lsize+rsize], (T*)ext_rbuff, (T*)&ext_rbuff[ext_rsize], (T*)nbuff1, omp_p, std::less<T>()); 996 if(nbuff!=NULL) delete[] nbuff; 997 nbuff=nbuff1; 998 } 999 1000 // Copy new data. 1001 totSize=totSize_new; 1002 nelem = nbuff_size/sizeof(T); 1003 /* 1004 if(arr_!=NULL) delete[] arr_; 1005 arr_=(T*) nbuff; nbuff=NULL; 1006 */ 1007 mergeBuff.swap(arr); 1008 1009 //Free memory. 1010 // if( rbuff!=NULL) delete[] rbuff; 1011 if(ext_rbuff!=NULL) delete[] ext_rbuff; 1012 #ifdef _PROFILE_SORT 1013 hyper_merge.stop(); 1014 #endif 1015 } 1016 1017 {// Split comm. O( log(p) ) ?? 1018 #ifdef _PROFILE_SORT 1019 hyper_comm_split.start(); 1020 #endif 1021 MPI_Comm scomm; 1022 MPI_Comm_split(comm, myrank<=split_id, myrank, &scomm ); 1023 comm=scomm; 1024 npes =(myrank<=split_id? split_id+1: npes -split_id-1); 1025 myrank=(myrank<=split_id? myrank : myrank-split_id-1); 1026 #ifdef _PROFILE_SORT 1027 hyper_comm_split.stop(); 1028 #endif 1029 } 1030 } 1031 1032 // par::partitionW<T>(SortedElem, NULL , comm_); 1033 // par::partitionW<T>(arr, NULL , comm_); 1034 1035 #ifdef _PROFILE_SORT 1036 total_sort.stop(); 1037 1038 par::Mpi_Allreduce<long>(&bytes_comm, &total_bytes, 1, MPI_SUM, comm_); 1039 // if(!rank_) printf("Total comm is %ld bytes\n", total_comm); 1040 1041 #endif 1042 return 1; 1043 }//end function 1044 1045 //-------------------------------------------------------------------------------- 1046 template<typename T> HyperQuickSort(std::vector<T> & arr,std::vector<T> & SortedElem,MPI_Comm comm_)1047 int HyperQuickSort(std::vector<T>& arr, std::vector<T> & SortedElem, MPI_Comm comm_){ // O( ((N/p)+log(p))*(log(N/p)+log(p)) ) 1048 #ifdef __PROFILE_WITH_BARRIER__ 1049 MPI_Barrier(comm); 1050 #endif 1051 #ifdef _PROFILE_SORT 1052 total_sort.start(); 1053 #endif 1054 1055 // Copy communicator. 1056 MPI_Comm comm=comm_; 1057 1058 // Get comm size and rank. 1059 int npes, myrank, myrank_; 1060 MPI_Comm_size(comm, &npes); 1061 MPI_Comm_rank(comm, &myrank); myrank_=myrank; 1062 if(npes==1){ 1063 // @dhairya isn't this wrong for the !sort-in-place case ... 1064 #ifdef _PROFILE_SORT 1065 seq_sort.start(); 1066 #endif 1067 omp_par::merge_sort(&arr[0],&arr[arr.size()]); 1068 #ifdef _PROFILE_SORT 1069 seq_sort.stop(); 1070 #endif 1071 SortedElem = arr; 1072 #ifdef _PROFILE_SORT 1073 total_sort.stop(); 1074 #endif 1075 } 1076 1077 int omp_p=omp_get_max_threads(); 1078 srand(myrank); 1079 1080 // Local and global sizes. O(log p) 1081 DendroIntL totSize, nelem = arr.size(); assert(nelem); 1082 par::Mpi_Allreduce<DendroIntL>(&nelem, &totSize, 1, MPI_SUM, comm); 1083 DendroIntL nelem_ = nelem; 1084 1085 // Local sort. 1086 #ifdef _PROFILE_SORT 1087 seq_sort.start(); 1088 #endif 1089 T* arr_=new T[nelem]; memcpy (&arr_[0], &arr[0], nelem*sizeof(T)); 1090 omp_par::merge_sort(&arr_[0], &arr_[arr.size()]); 1091 #ifdef _PROFILE_SORT 1092 seq_sort.stop(); 1093 #endif 1094 // Binary split and merge in each iteration. 1095 while(npes>1 && totSize>0){ // O(log p) iterations. 1096 1097 //Determine splitters. O( log(N/p) + log(p) ) 1098 #ifdef _PROFILE_SORT 1099 hyper_compute_splitters.start(); 1100 #endif 1101 T split_key; 1102 DendroIntL totSize_new; 1103 //while(true) 1104 { 1105 // Take random splitters. O( 1 ) -- Let p * splt_count = glb_splt_count = const = 100~1000 1106 int splt_count=(1000*nelem)/totSize; 1107 if(npes>1000) splt_count=(((float)rand()/(float)RAND_MAX)*totSize<(1000*nelem)?1:0); 1108 if(splt_count>nelem) splt_count=nelem; 1109 std::vector<T> splitters(splt_count); 1110 for(size_t i=0;i<splt_count;i++) 1111 splitters[i]=arr_[rand()%nelem]; 1112 1113 // Gather all splitters. O( log(p) ) 1114 int glb_splt_count; 1115 std::vector<int> glb_splt_cnts(npes); 1116 std::vector<int> glb_splt_disp(npes,0); 1117 par::Mpi_Allgather<int>(&splt_count, &glb_splt_cnts[0], 1, comm); 1118 omp_par::scan(&glb_splt_cnts[0],&glb_splt_disp[0],npes); 1119 glb_splt_count=glb_splt_cnts[npes-1]+glb_splt_disp[npes-1]; 1120 std::vector<T> glb_splitters(glb_splt_count); 1121 MPI_Allgatherv(& splitters[0], splt_count, par::Mpi_datatype<T>::value(), 1122 &glb_splitters[0], &glb_splt_cnts[0], &glb_splt_disp[0], 1123 par::Mpi_datatype<T>::value(), comm); 1124 1125 // Determine split key. O( log(N/p) + log(p) ) 1126 std::vector<DendroIntL> disp(glb_splt_count,0); 1127 if(nelem>0){ 1128 #pragma omp parallel for 1129 for(size_t i=0;i<glb_splt_count;i++){ 1130 disp[i]=std::lower_bound(&arr_[0], &arr_[nelem], glb_splitters[i])-&arr_[0]; 1131 } 1132 } 1133 std::vector<DendroIntL> glb_disp(glb_splt_count,0); 1134 MPI_Allreduce(&disp[0], &glb_disp[0], glb_splt_count, par::Mpi_datatype<DendroIntL>::value(), MPI_SUM, comm); 1135 1136 DendroIntL* split_disp=&glb_disp[0]; 1137 for(size_t i=0;i<glb_splt_count;i++) 1138 if( labs(glb_disp[i]-totSize/2) < labs(*split_disp-totSize/2)) split_disp=&glb_disp[i]; 1139 split_key=glb_splitters[split_disp-&glb_disp[0]]; 1140 1141 totSize_new=(myrank<=(npes-1)/2?*split_disp:totSize-*split_disp); 1142 //double err=(((double)*split_disp)/(totSize/2))-1.0; 1143 //if(fabs(err)<0.01 || npes<=16) break; 1144 //else if(!myrank) std::cout<<err<<'\n'; 1145 } 1146 #ifdef _PROFILE_SORT 1147 hyper_compute_splitters.stop(); 1148 #endif 1149 1150 // Split problem into two. O( N/p ) 1151 int split_id=(npes-1)/2; 1152 { 1153 #ifdef _PROFILE_SORT 1154 hyper_communicate.start(); 1155 #endif 1156 1157 int new_p0=(myrank<=split_id?0:split_id+1); 1158 int cmp_p0=(myrank> split_id?0:split_id+1); 1159 int new_np=(myrank<=split_id? split_id+1: npes-split_id-1); 1160 int cmp_np=(myrank> split_id? split_id+1: npes-split_id-1); 1161 1162 int partner = myrank+cmp_p0-new_p0; 1163 if(partner>=npes) partner=npes-1; 1164 assert(partner>=0); 1165 1166 bool extra_partner=( npes%2==1 && npes-1==myrank ); 1167 1168 // Exchange send sizes. 1169 char *sbuff, *lbuff; 1170 int rsize=0, ssize=0, lsize=0; 1171 int ext_rsize=0, ext_ssize=0; 1172 size_t split_indx=(nelem>0?std::lower_bound(&arr_[0], &arr_[nelem], split_key)-&arr_[0]:0); 1173 ssize= (myrank> split_id? split_indx: nelem-split_indx )*sizeof(T); 1174 sbuff=(char*)(myrank> split_id? &arr_[0] : &arr_[split_indx]); 1175 lsize= (myrank<=split_id? split_indx: nelem-split_indx )*sizeof(T); 1176 lbuff=(char*)(myrank<=split_id? &arr_[0] : &arr_[split_indx]); 1177 1178 MPI_Status status; 1179 MPI_Sendrecv (& ssize,1,MPI_INT, partner,0, & rsize,1,MPI_INT, partner, 0,comm,&status); 1180 if(extra_partner) MPI_Sendrecv(&ext_ssize,1,MPI_INT,split_id,0, &ext_rsize,1,MPI_INT,split_id, 0,comm,&status); 1181 1182 // Exchange data. 1183 char* rbuff= new char[ rsize] ; 1184 char* ext_rbuff=(ext_rsize>0? new char[ext_rsize]: NULL); 1185 MPI_Sendrecv (sbuff,ssize,MPI_BYTE, partner,0, rbuff, rsize,MPI_BYTE, partner, 0,comm,&status); 1186 if(extra_partner) MPI_Sendrecv( NULL, 0,MPI_BYTE,split_id,0, ext_rbuff,ext_rsize,MPI_BYTE,split_id, 0,comm,&status); 1187 #ifdef _PROFILE_SORT 1188 hyper_communicate.stop(); 1189 hyper_merge.start(); 1190 #endif 1191 int nbuff_size=lsize+rsize+ext_rsize; 1192 char* nbuff= new char[nbuff_size]; 1193 omp_par::merge<T*>((T*)lbuff, (T*)&lbuff[lsize], (T*)rbuff, (T*)&rbuff[rsize], (T*)nbuff, omp_p, std::less<T>()); 1194 if(ext_rsize>0 && nbuff!=NULL){ 1195 char* nbuff1= new char[nbuff_size]; 1196 omp_par::merge<T*>((T*)nbuff, (T*)&nbuff[lsize+rsize], (T*)ext_rbuff, (T*)&ext_rbuff[ext_rsize], (T*)nbuff1, omp_p, std::less<T>()); 1197 if(nbuff!=NULL) delete[] nbuff; 1198 nbuff=nbuff1; 1199 } 1200 1201 // Copy new data. 1202 totSize=totSize_new; 1203 nelem = nbuff_size/sizeof(T); 1204 if(arr_!=NULL) delete[] arr_; 1205 arr_=(T*) nbuff; nbuff=NULL; 1206 1207 //Free memory. 1208 if( rbuff!=NULL) delete[] rbuff; 1209 if(ext_rbuff!=NULL) delete[] ext_rbuff; 1210 #ifdef _PROFILE_SORT 1211 hyper_merge.stop(); 1212 #endif 1213 } 1214 1215 #ifdef _PROFILE_SORT 1216 hyper_comm_split.start(); 1217 #endif 1218 {// Split comm. O( log(p) ) ?? 1219 MPI_Comm scomm; 1220 MPI_Comm_split(comm, myrank<=split_id, myrank, &scomm ); 1221 comm=scomm; 1222 npes =(myrank<=split_id? split_id+1: npes -split_id-1); 1223 myrank=(myrank<=split_id? myrank : myrank-split_id-1); 1224 } 1225 #ifdef _PROFILE_SORT 1226 hyper_comm_split.stop(); 1227 #endif 1228 } 1229 1230 SortedElem.resize(nelem); 1231 SortedElem.assign(arr_, &arr_[nelem]); 1232 if(arr_!=NULL) delete[] arr_; 1233 1234 #ifdef _PROFILE_SORT 1235 sort_partitionw.start(); 1236 #endif 1237 // par::partitionW<T>(SortedElem, NULL , comm_); 1238 #ifdef _PROFILE_SORT 1239 sort_partitionw.stop(); 1240 #endif 1241 1242 #ifdef _PROFILE_SORT 1243 total_sort.stop(); 1244 #endif 1245 return 1; 1246 }//end function 1247 // */ 1248 1249 template<typename T> HyperQuickSort_kway(std::vector<T> & arr,std::vector<T> & SortedElem,MPI_Comm comm_)1250 int HyperQuickSort_kway(std::vector<T>& arr, std::vector<T> & SortedElem, MPI_Comm comm_) { 1251 #ifdef _PROFILE_SORT 1252 total_sort.clear(); 1253 seq_sort.clear(); 1254 hyper_compute_splitters.clear(); 1255 hyper_communicate.clear(); 1256 hyper_merge.clear(); 1257 hyper_comm_split.clear(); 1258 sort_partitionw.clear(); 1259 MPI_Barrier(comm_); 1260 1261 total_sort.start(); 1262 #endif 1263 unsigned int kway = KWAY; 1264 int omp_p=omp_get_max_threads(); 1265 1266 // Copy communicator. 1267 MPI_Comm comm=comm_; 1268 1269 // Get comm size and rank. 1270 int npes, myrank; 1271 MPI_Comm_size(comm, &npes); 1272 MPI_Comm_rank(comm, &myrank); 1273 srand(myrank); 1274 1275 // Local and global sizes. O(log p) 1276 size_t totSize, nelem = arr.size(); assert(nelem); 1277 par::Mpi_Allreduce<size_t>(&nelem, &totSize, 1, MPI_SUM, comm); 1278 std::vector<T> arr_(nelem*2); //Extra buffer. 1279 std::vector<T> arr__(nelem*2); //Extra buffer. 1280 1281 // Local sort. 1282 #ifdef _PROFILE_SORT 1283 seq_sort.start(); 1284 #endif 1285 omp_par::merge_sort(&arr[0], &arr[arr.size()]); 1286 #ifdef _PROFILE_SORT 1287 seq_sort.stop(); 1288 #endif 1289 1290 while(npes>1 && totSize>0){ 1291 if(kway>npes) kway = npes; 1292 int blk_size=npes/kway; assert(blk_size*kway==npes); 1293 int blk_id=myrank/blk_size, new_pid=myrank%blk_size; 1294 1295 // Determine splitters. 1296 #ifdef _PROFILE_SORT 1297 hyper_compute_splitters.start(); 1298 #endif 1299 std::vector<T> split_key = par::Sorted_approx_Select(arr, kway-1, comm); 1300 #ifdef _PROFILE_SORT 1301 hyper_compute_splitters.stop(); 1302 #endif 1303 1304 {// Communication 1305 #ifdef _PROFILE_SORT 1306 hyper_communicate.start(); 1307 #endif 1308 // Determine send_size. 1309 std::vector<int> send_size(kway), send_disp(kway+1); send_disp[0]=0; send_disp[kway]=arr.size(); 1310 for(int i=1;i<kway;i++) send_disp[i]=std::lower_bound(&arr[0], &arr[arr.size()], split_key[i-1])-&arr[0]; 1311 for(int i=0;i<kway;i++) send_size[i]=send_disp[i+1]-send_disp[i]; 1312 1313 // Get recv_size. 1314 int recv_iter=0; 1315 std::vector<T*> recv_ptr(kway); 1316 std::vector<size_t> recv_cnt(kway); 1317 std::vector<int> recv_size(kway), recv_disp(kway+1,0); 1318 for(int i_=0;i_<=kway/2;i_++){ 1319 int i1=(blk_id+i_)%kway; 1320 int i2=(blk_id+kway-i_)%kway; 1321 MPI_Status status; 1322 for(int j=0;j<(i_==0 || i_==kway/2?1:2);j++){ 1323 int i=(i_==0?i1:((j+blk_id/i_)%2?i1:i2)); 1324 int partner=blk_size*i+new_pid; 1325 MPI_Sendrecv(&send_size[ i ], 1, MPI_INT, partner, 0, 1326 &recv_size[recv_iter], 1, MPI_INT, partner, 0, comm, &status); 1327 recv_disp[recv_iter+1]=recv_disp[recv_iter]+recv_size[recv_iter]; 1328 recv_ptr[recv_iter]=&arr_[recv_disp[recv_iter]]; 1329 recv_cnt[recv_iter]=recv_size[recv_iter]; 1330 recv_iter++; 1331 } 1332 } 1333 1334 // Communicate data. 1335 int asynch_count=2; 1336 recv_iter=0; 1337 int merg_indx=2; 1338 std::vector<MPI_Request> reqst(kway*2); 1339 std::vector<MPI_Status> status(kway*2); 1340 arr_ .resize(recv_disp[kway]); 1341 arr__.resize(recv_disp[kway]); 1342 for(int i_=0;i_<=kway/2;i_++){ 1343 int i1=(blk_id+i_)%kway; 1344 int i2=(blk_id+kway-i_)%kway; 1345 for(int j=0;j<(i_==0 || i_==kway/2?1:2);j++){ 1346 int i=(i_==0?i1:((j+blk_id/i_)%2?i1:i2)); 1347 int partner=blk_size*i+new_pid; 1348 1349 if(recv_iter-asynch_count-1>=0) MPI_Waitall(2, &reqst[(recv_iter-asynch_count-1)*2], &status[(recv_iter-asynch_count-1)*2]); 1350 par::Mpi_Irecv <T>(&arr_[recv_disp[recv_iter]], recv_size[recv_iter], partner, 1, comm, &reqst[recv_iter*2+0]); 1351 par::Mpi_Issend<T>(&arr [send_disp[ i ]], send_size[ i ], partner, 1, comm, &reqst[recv_iter*2+1]); 1352 recv_iter++; 1353 1354 int flag[2]={0,0}; 1355 if(recv_iter>merg_indx) MPI_Test(&reqst[(merg_indx-1)*2],&flag[0],&status[(merg_indx-1)*2]); 1356 if(recv_iter>merg_indx) MPI_Test(&reqst[(merg_indx-2)*2],&flag[1],&status[(merg_indx-2)*2]); 1357 if(flag[0] && flag[1]){ 1358 T* A=&arr_[0]; T* B=&arr__[0]; 1359 for(int s=2;merg_indx%s==0;s*=2){ 1360 //std ::merge(&A[recv_disp[merg_indx-s/2]],&A[recv_disp[merg_indx ]], 1361 // &A[recv_disp[merg_indx-s ]],&A[recv_disp[merg_indx-s/2]], &B[recv_disp[merg_indx-s]]); 1362 omp_par::merge(&A[recv_disp[merg_indx-s/2]],&A[recv_disp[merg_indx ]], 1363 &A[recv_disp[merg_indx-s ]],&A[recv_disp[merg_indx-s/2]], &B[recv_disp[merg_indx-s]],omp_p,std::less<T>()); 1364 T* C=A; A=B; B=C; // Swap 1365 } 1366 merg_indx+=2; 1367 } 1368 } 1369 } 1370 #ifdef _PROFILE_SORT 1371 hyper_communicate.stop(); 1372 hyper_merge.start(); 1373 #endif 1374 // Merge remaining parts. 1375 while(merg_indx<=(int)kway){ 1376 MPI_Waitall(1, &reqst[(merg_indx-1)*2], &status[(merg_indx-1)*2]); 1377 MPI_Waitall(1, &reqst[(merg_indx-2)*2], &status[(merg_indx-2)*2]); 1378 { 1379 T* A=&arr_[0]; T* B=&arr__[0]; 1380 for(int s=2;merg_indx%s==0;s*=2){ 1381 //std ::merge(&A[recv_disp[merg_indx-s/2]],&A[recv_disp[merg_indx ]], 1382 // &A[recv_disp[merg_indx-s ]],&A[recv_disp[merg_indx-s/2]], &B[recv_disp[merg_indx-s]]); 1383 omp_par::merge(&A[recv_disp[merg_indx-s/2]],&A[recv_disp[merg_indx ]], 1384 &A[recv_disp[merg_indx-s ]],&A[recv_disp[merg_indx-s/2]], &B[recv_disp[merg_indx-s]],omp_p,std::less<T>()); 1385 T* C=A; A=B; B=C; // Swap 1386 } 1387 merg_indx+=2; 1388 } 1389 } 1390 {// Swap buffers. 1391 int swap_cond=0; 1392 for(int s=2;kway%s==0;s*=2) swap_cond++; 1393 if(swap_cond%2==0) swap(arr,arr_); 1394 else swap(arr,arr__); 1395 } 1396 } 1397 1398 #ifdef _PROFILE_SORT 1399 hyper_merge.stop(); 1400 hyper_comm_split.start(); 1401 #endif 1402 {// Split comm. kway O( log(p) ) ?? 1403 MPI_Comm scomm; 1404 MPI_Comm_split(comm, blk_id, myrank, &scomm ); 1405 if(comm!=comm_) MPI_Comm_free(&comm); 1406 comm = scomm; 1407 1408 MPI_Comm_size(comm, &npes); 1409 MPI_Comm_rank(comm, &myrank); 1410 } 1411 #ifdef _PROFILE_SORT 1412 hyper_comm_split.stop(); 1413 #endif 1414 } 1415 #ifdef _PROFILE_SORT 1416 total_sort.stop(); 1417 #endif 1418 SortedElem=arr; 1419 return 1; 1420 } 1421 1422 //~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 1423 1424 // for sc13 -- mem effecient HykSort 1425 template<typename T> HyperQuickSort_kway(std::vector<T> & arr,MPI_Comm comm_)1426 int HyperQuickSort_kway(std::vector<T>& arr, MPI_Comm comm_) { 1427 #ifdef _PROFILE_SORT 1428 total_sort.clear(); 1429 seq_sort.clear(); 1430 hyper_compute_splitters.clear(); 1431 hyper_communicate.clear(); 1432 hyper_merge.clear(); 1433 hyper_comm_split.clear(); 1434 sort_partitionw.clear(); 1435 MPI_Barrier(comm_); 1436 1437 total_sort.start(); 1438 #endif 1439 unsigned int kway = KWAY; 1440 int omp_p = omp_get_max_threads(); 1441 1442 // Copy communicator. 1443 MPI_Comm comm = comm_; 1444 1445 // Get comm size and rank. 1446 int npes, myrank; 1447 MPI_Comm_size(comm, &npes); 1448 MPI_Comm_rank(comm, &myrank); 1449 1450 srand(myrank); 1451 1452 // Local and global sizes. O(log p) 1453 size_t totSize, nelem = arr.size(); assert(nelem); 1454 par::Mpi_Allreduce<size_t>(&nelem, &totSize, 1, MPI_SUM, comm); 1455 1456 // dummy array for now ... 1457 std::vector<T> arr_(128); // (nelem*2); //Extra buffer. 1458 std::vector<T> arr__; // (nelem*2); //Extra buffer. 1459 1460 // Local sort. 1461 #ifdef _PROFILE_SORT 1462 seq_sort.start(); 1463 #endif 1464 omp_par::merge_sort(&arr[0], &arr[arr.size()]); 1465 #ifdef _PROFILE_SORT 1466 seq_sort.stop(); 1467 #endif 1468 1469 while(npes>1 && totSize>0){ 1470 if(kway>npes) kway = npes; 1471 int blk_size=npes/kway; assert(blk_size*kway==npes); 1472 int blk_id=myrank/blk_size, new_pid=myrank%blk_size; 1473 1474 // Determine splitters. 1475 #ifdef _PROFILE_SORT 1476 hyper_compute_splitters.start(); 1477 #endif 1478 std::vector<T> split_key = par::Sorted_approx_Select(arr, kway-1, comm); 1479 #ifdef _PROFILE_SORT 1480 hyper_compute_splitters.stop(); 1481 #endif 1482 1483 {// Communication 1484 #ifdef _PROFILE_SORT 1485 hyper_communicate.start(); 1486 #endif 1487 // Determine send_size. 1488 std::vector<int> send_size(kway), send_disp(kway+1); send_disp[0] = 0; send_disp[kway] = arr.size(); 1489 for(int i=1;i<kway;i++) send_disp[i] = std::lower_bound(&arr[0], &arr[arr.size()], split_key[i-1]) - &arr[0]; 1490 for(int i=0;i<kway;i++) send_size[i] = send_disp[i+1] - send_disp[i]; 1491 1492 // Get recv_size. 1493 int recv_iter=0; 1494 std::vector<T*> recv_ptr(kway); 1495 std::vector<size_t> recv_cnt(kway); 1496 std::vector<int> recv_size(kway), recv_disp(kway+1,0); 1497 for(int i_=0;i_<=kway/2;i_++){ 1498 int i1=(blk_id+i_)%kway; 1499 int i2=(blk_id+kway-i_)%kway; 1500 MPI_Status status; 1501 for(int j=0;j<(i_==0 || i_==kway/2?1:2);j++){ 1502 int i=(i_==0?i1:((j+blk_id/i_)%2?i1:i2)); 1503 int partner=blk_size*i+new_pid; 1504 MPI_Sendrecv(&send_size[ i ], 1, MPI_INT, partner, 0, 1505 &recv_size[recv_iter], 1, MPI_INT, partner, 0, comm, &status); 1506 recv_disp[recv_iter+1] = recv_disp[recv_iter]+recv_size[recv_iter]; 1507 recv_ptr[recv_iter]=&arr_[0] + recv_disp[recv_iter]; //! @hari - only setting address, doesnt need to be allocated yet (except for 0) 1508 recv_cnt[recv_iter]=recv_size[recv_iter]; 1509 recv_iter++; 1510 } 1511 } 1512 1513 // Communicate data. 1514 int asynch_count=2; 1515 recv_iter=0; 1516 int merg_indx=2; 1517 std::vector<MPI_Request> reqst(kway*2); 1518 std::vector<MPI_Status> status(kway*2); 1519 arr_ .resize(recv_disp[kway]); 1520 arr__.resize(recv_disp[kway]); 1521 for(int i_=0;i_<=kway/2;i_++){ 1522 int i1=(blk_id+i_)%kway; 1523 int i2=(blk_id+kway-i_)%kway; 1524 for(int j=0;j<(i_==0 || i_==kway/2?1:2);j++){ 1525 int i=(i_==0?i1:((j+blk_id/i_)%2?i1:i2)); 1526 int partner=blk_size*i+new_pid; 1527 1528 if(recv_iter-asynch_count-1>=0) MPI_Waitall(2, &reqst[(recv_iter-asynch_count-1)*2], &status[(recv_iter-asynch_count-1)*2]); 1529 1530 par::Mpi_Irecv <T>(&arr_[recv_disp[recv_iter]], recv_size[recv_iter], partner, 1, comm, &reqst[recv_iter*2+0]); 1531 par::Mpi_Issend<T>(&arr [send_disp[ i ]], send_size[ i ], partner, 1, comm, &reqst[recv_iter*2+1]); 1532 1533 recv_iter++; 1534 1535 int flag[2]={0,0}; 1536 if ( recv_iter > merg_indx ) MPI_Test(&reqst[(merg_indx-1)*2], &flag[0], &status[(merg_indx-1)*2]); 1537 if ( recv_iter > merg_indx ) MPI_Test(&reqst[(merg_indx-2)*2], &flag[1], &status[(merg_indx-2)*2]); 1538 if (flag[0] && flag[1]){ 1539 T* A=&arr_[0]; T* B=&arr__[0]; 1540 for(int s=2; merg_indx%s==0; s*=2){ 1541 //std ::merge(&A[recv_disp[merg_indx-s/2]],&A[recv_disp[merg_indx ]], 1542 // &A[recv_disp[merg_indx-s ]],&A[recv_disp[merg_indx-s/2]], &B[recv_disp[merg_indx-s]]); 1543 omp_par::merge(&A[recv_disp[merg_indx-s/2]],&A[recv_disp[merg_indx ]], 1544 &A[recv_disp[merg_indx-s ]],&A[recv_disp[merg_indx-s/2]], &B[recv_disp[merg_indx-s]], omp_p,std::less<T>()); 1545 T* C=A; A=B; B=C; // Swap 1546 } 1547 merg_indx+=2; 1548 } 1549 } 1550 } 1551 #ifdef _PROFILE_SORT 1552 hyper_communicate.stop(); 1553 hyper_merge.start(); 1554 #endif 1555 // Merge remaining parts. 1556 while(merg_indx<=(int)kway){ 1557 MPI_Waitall(1, &reqst[(merg_indx-1)*2], &status[(merg_indx-1)*2]); 1558 MPI_Waitall(1, &reqst[(merg_indx-2)*2], &status[(merg_indx-2)*2]); 1559 { 1560 T* A=&arr_[0]; T* B=&arr__[0]; 1561 for(int s=2;merg_indx%s==0;s*=2){ 1562 //std ::merge(&A[recv_disp[merg_indx-s/2]],&A[recv_disp[merg_indx ]], 1563 // &A[recv_disp[merg_indx-s ]],&A[recv_disp[merg_indx-s/2]], &B[recv_disp[merg_indx-s]]); 1564 omp_par::merge(&A[recv_disp[merg_indx-s/2]],&A[recv_disp[merg_indx ]], 1565 &A[recv_disp[merg_indx-s ]],&A[recv_disp[merg_indx-s/2]], &B[recv_disp[merg_indx-s]],omp_p,std::less<T>()); 1566 T* C=A; A=B; B=C; // Swap 1567 } 1568 merg_indx+=2; 1569 } 1570 } 1571 {// Swap buffers. 1572 int swap_cond=0; 1573 for(int s=2;kway%s==0;s*=2) swap_cond++; 1574 if(swap_cond%2==0) swap(arr,arr_); 1575 else swap(arr,arr__); 1576 } 1577 } 1578 1579 #ifdef _PROFILE_SORT 1580 hyper_merge.stop(); 1581 hyper_comm_split.start(); 1582 #endif 1583 {// Split comm. kway O( log(p) ) ?? 1584 MPI_Comm scomm; 1585 MPI_Comm_split(comm, blk_id, myrank, &scomm ); 1586 if(comm!=comm_) MPI_Comm_free(&comm); 1587 comm = scomm; 1588 1589 MPI_Comm_size(comm, &npes); 1590 MPI_Comm_rank(comm, &myrank); 1591 } 1592 #ifdef _PROFILE_SORT 1593 hyper_comm_split.stop(); 1594 #endif 1595 } 1596 #ifdef _PROFILE_SORT 1597 total_sort.stop(); 1598 #endif 1599 return 1; 1600 } // mem effecient HykSort 1601 1602 1603 /// ----------- low mem verison - sc13 ----------------------------------- 1604 template<typename T> sampleSort(std::vector<T> & arr,MPI_Comm comm)1605 int sampleSort(std::vector<T>& arr, MPI_Comm comm){ 1606 #ifdef __PROFILE_WITH_BARRIER__ 1607 MPI_Barrier(comm); 1608 #endif 1609 1610 #ifdef _PROFILE_SORT 1611 total_sort.start(); 1612 #endif 1613 1614 int npes; 1615 1616 MPI_Comm_size(comm, &npes); 1617 1618 assert(arr.size()); 1619 1620 if (npes == 1) { 1621 #ifdef _PROFILE_SORT 1622 seq_sort.start(); 1623 #endif 1624 omp_par::merge_sort(&arr[0],&arr[arr.size()]); 1625 #ifdef _PROFILE_SORT 1626 seq_sort.stop(); 1627 total_sort.stop(); 1628 #endif 1629 } 1630 1631 int myrank; 1632 MPI_Comm_rank(comm, &myrank); 1633 1634 DendroIntL nelem = arr.size(); 1635 DendroIntL nelemCopy = nelem; 1636 DendroIntL totSize; 1637 par::Mpi_Allreduce<DendroIntL>(&nelemCopy, &totSize, 1, MPI_SUM, comm); 1638 1639 DendroIntL npesLong = npes; 1640 const DendroIntL FIVE = 5; 1641 1642 1643 if(totSize < (FIVE*npesLong*npesLong)) { 1644 #ifdef __DEBUG_PAR__ 1645 if(!myrank) { 1646 std::cout <<" Using bitonic sort since totSize < (5*(npes^2)). totSize: " 1647 <<totSize<<" npes: "<<npes <<std::endl; 1648 } 1649 #endif 1650 #ifdef __DEBUG_PAR__ 1651 MPI_Barrier(comm); 1652 if(!myrank) { 1653 std::cout<<"SampleSort (small n): Stage-1 passed."<<std::endl; 1654 } 1655 MPI_Barrier(comm); 1656 #endif 1657 1658 MPI_Comm new_comm; 1659 if(totSize < npesLong) { 1660 if(!myrank) { 1661 std::cout<<" Input to sort is small. splittingComm: " 1662 <<npes<<" -> "<< totSize<<std::endl; 1663 } 1664 par::splitCommUsingSplittingRank(static_cast<int>(totSize), &new_comm, comm); 1665 } else { 1666 new_comm = comm; 1667 } 1668 1669 #ifdef __DEBUG_PAR__ 1670 MPI_Barrier(comm); 1671 if(!myrank) { 1672 std::cout<<"SampleSort (small n): Stage-2 passed."<<std::endl; 1673 } 1674 MPI_Barrier(comm); 1675 #endif 1676 1677 if(!arr.empty()) { 1678 par::bitonicSort<T>(arr, new_comm); 1679 } 1680 1681 if(comm!=new_comm)MPI_Comm_free(&new_comm); 1682 1683 1684 #ifdef __DEBUG_PAR__ 1685 MPI_Barrier(comm); 1686 if(!myrank) { 1687 std::cout<<"SampleSort (small n): Stage-3 passed."<<std::endl; 1688 } 1689 MPI_Barrier(comm); 1690 #endif 1691 }// end if 1692 1693 #ifdef __DEBUG_PAR__ 1694 if(!myrank) { 1695 std::cout<<"Using sample sort to sort nodes. n/p^2 is fine."<<std::endl; 1696 } 1697 #endif 1698 1699 //Re-part arr so that each proc. has atleast p elements. 1700 #ifdef _PROFILE_SORT 1701 sort_partitionw.start(); 1702 #endif 1703 par::partitionW<T>(arr, NULL, comm); 1704 #ifdef _PROFILE_SORT 1705 sort_partitionw.stop(); 1706 #endif 1707 nelem = arr.size(); 1708 1709 #ifdef _PROFILE_SORT 1710 seq_sort.start(); 1711 #endif 1712 omp_par::merge_sort(&arr[0],&arr[arr.size()]); 1713 #ifdef _PROFILE_SORT 1714 seq_sort.stop(); 1715 #endif 1716 unsigned long idx; 1717 // std::vector<IndexHolder<T> > sendSplits(npes-1); 1718 std::vector< IndexHolder<T> > splitters; 1719 std::vector< std::pair<T, DendroIntL> > splitters_pair = par::Sorted_approx_Select_skewed( arr, npes-1, comm); 1720 1721 for (int i=0; i<splitters_pair.size(); ++i) { 1722 splitters.push_back ( IndexHolder<T> ( splitters_pair[i].first, splitters_pair[i].second ) ); 1723 } 1724 1725 T key_last; 1726 DendroIntL zero = 0; 1727 MPI_Bcast (&key_last, 1, par::Mpi_datatype<T>::value(), npes-1, comm); 1728 1729 splitters.push_back( IndexHolder<T>(key_last, zero) ); 1730 1731 omp_par::merge_sort(&splitters[0], &splitters[splitters.size()]); 1732 1733 IndexHolder<T> *splittersPtr = NULL; 1734 if(!splitters.empty()) { 1735 splittersPtr = &(*(splitters.begin())); 1736 } 1737 1738 int *sendcnts = new int[npes]; 1739 assert(sendcnts); 1740 1741 int * recvcnts = new int[npes]; 1742 assert(recvcnts); 1743 1744 int * sdispls = new int[npes]; 1745 assert(sdispls); 1746 1747 int * rdispls = new int[npes]; 1748 assert(rdispls); 1749 1750 #pragma omp parallel for 1751 for(int k = 0; k < npes; k++){ 1752 sendcnts[k] = 0; 1753 } 1754 1755 { 1756 int omp_p=omp_get_max_threads(); 1757 int* proc_split = new int[omp_p+1]; 1758 DendroIntL* lst_split_indx = new DendroIntL[omp_p+1]; 1759 proc_split[0]=0; 1760 lst_split_indx[0]=0; 1761 lst_split_indx[omp_p]=nelem; 1762 #pragma omp parallel for 1763 for(int i=1;i<omp_p;i++){ 1764 //proc_split[i] = seq::BinSearch(&splittersPtr[0],&splittersPtr[npes-1],arr[i*nelem/omp_p],std::less<T>()); 1765 idx = 2*myrank*nelem/npes + i*(size_t)nelem/omp_p; 1766 IndexHolder<T> key( arr[i*(size_t)nelem/omp_p], idx); 1767 proc_split[i] = std::upper_bound( &splittersPtr[0], &splittersPtr[npes-1], key, std::less<IndexHolder<T> >()) - &splittersPtr[0]; 1768 if(proc_split[i]<npes-1){ 1769 //lst_split_indx[i]=seq::BinSearch(&arr[0],&arr[nelem],splittersPtr[proc_split[i]],std::less<T>()); 1770 lst_split_indx[i] = std::upper_bound(&arr[0], &arr[nelem], splittersPtr[proc_split[i]].value , std::less<T>()) - &arr[0]; 1771 }else{ 1772 proc_split[i] = npes-1; 1773 lst_split_indx[i] = nelem; 1774 } 1775 } 1776 idx = 2*myrank*nelem/npes; 1777 #pragma omp parallel for 1778 for (int i=0;i<omp_p;i++){ 1779 int sendcnts_=0; 1780 int k=proc_split[i]; 1781 for (DendroIntL j = lst_split_indx[i]; j < lst_split_indx[i+1]; j++) { 1782 if ( IndexHolder<T>(arr[j],idx+j) <= splitters[k]) { 1783 sendcnts_++; 1784 } else{ 1785 if(sendcnts_>0) 1786 sendcnts[k]=sendcnts_; 1787 sendcnts_=0; 1788 k = seq::UpperBound< IndexHolder<T> >(npes-1, splittersPtr, k+1, IndexHolder<T>(arr[j],idx+j) ); 1789 if (k == (npes-1) ){ 1790 //could not find any splitter >= arr[j] 1791 sendcnts_ = (nelem - j); 1792 break; 1793 } else { 1794 assert(k < (npes-1)); 1795 assert(splitters[k].value >= arr[j]); 1796 sendcnts_++; 1797 } 1798 }//end if-else 1799 }//end for j 1800 if(sendcnts_>0) 1801 sendcnts[k]=sendcnts_; 1802 } 1803 delete [] lst_split_indx; 1804 delete [] proc_split; 1805 } 1806 1807 par::Mpi_Alltoall<int>(sendcnts, recvcnts, 1, comm); 1808 1809 sdispls[0] = 0; rdispls[0] = 0; 1810 1811 omp_par::scan(sendcnts,sdispls,npes); 1812 omp_par::scan(recvcnts,rdispls,npes); 1813 1814 DendroIntL nsorted = rdispls[npes-1] + recvcnts[npes-1]; 1815 std::vector<T> SortedElem(nsorted); 1816 1817 T* arrPtr = NULL; 1818 T* SortedElemPtr = NULL; 1819 if(!arr.empty()) { 1820 arrPtr = &(*(arr.begin())); 1821 } 1822 if(!SortedElem.empty()) { 1823 SortedElemPtr = &(*(SortedElem.begin())); 1824 } 1825 #ifdef _PROFILE_SORT 1826 sample_prepare_scatter.stop(); 1827 #endif 1828 1829 #ifdef _PROFILE_SORT 1830 sample_do_all2all.start(); 1831 #endif 1832 // par::Mpi_Alltoallv_dense<T>(arrPtr, sendcnts, sdispls, SortedElemPtr, recvcnts, rdispls, comm); 1833 Mpi_Alltoallv(arrPtr, sendcnts, sdispls, SortedElemPtr, recvcnts, rdispls, comm); 1834 #ifdef _PROFILE_SORT 1835 sample_do_all2all.stop(); 1836 #endif 1837 arr.swap(SortedElem); 1838 SortedElem.clear(); 1839 delete [] sendcnts; 1840 sendcnts = NULL; 1841 1842 delete [] recvcnts; 1843 recvcnts = NULL; 1844 1845 delete [] sdispls; 1846 sdispls = NULL; 1847 1848 delete [] rdispls; 1849 rdispls = NULL; 1850 1851 #ifdef _PROFILE_SORT 1852 seq_sort.start(); 1853 #endif 1854 omp_par::merge_sort(&arr[0], &arr[nsorted]); 1855 #ifdef _PROFILE_SORT 1856 seq_sort.stop(); 1857 #endif 1858 1859 1860 #ifdef _PROFILE_SORT 1861 total_sort.stop(); 1862 #endif 1863 }//end function 1864 1865 //------------------------------------------------------------------------ 1866 1867 template<typename T> sampleSort(std::vector<T> & arr,std::vector<T> & SortedElem,MPI_Comm comm)1868 int sampleSort(std::vector<T>& arr, std::vector<T> & SortedElem, MPI_Comm comm){ 1869 #ifdef __PROFILE_WITH_BARRIER__ 1870 MPI_Barrier(comm); 1871 #endif 1872 1873 #ifdef _PROFILE_SORT 1874 total_sort.start(); 1875 #endif 1876 1877 int npes; 1878 1879 MPI_Comm_size(comm, &npes); 1880 1881 assert(arr.size()); 1882 1883 if (npes == 1) { 1884 #ifdef _PROFILE_SORT 1885 seq_sort.start(); 1886 #endif 1887 omp_par::merge_sort(&arr[0],&arr[arr.size()]); 1888 #ifdef _PROFILE_SORT 1889 seq_sort.stop(); 1890 #endif 1891 SortedElem = arr; 1892 #ifdef _PROFILE_SORT 1893 total_sort.stop(); 1894 #endif 1895 } 1896 1897 std::vector<T> splitters; 1898 std::vector<T> allsplitters; 1899 1900 int myrank; 1901 MPI_Comm_rank(comm, &myrank); 1902 1903 DendroIntL nelem = arr.size(); 1904 DendroIntL nelemCopy = nelem; 1905 DendroIntL totSize; 1906 par::Mpi_Allreduce<DendroIntL>(&nelemCopy, &totSize, 1, MPI_SUM, comm); 1907 1908 DendroIntL npesLong = npes; 1909 const DendroIntL FIVE = 5; 1910 1911 if(totSize < (FIVE*npesLong*npesLong)) { 1912 if(!myrank) { 1913 std::cout <<" Using bitonic sort since totSize < (5*(npes^2)). totSize: " 1914 <<totSize<<" npes: "<<npes <<std::endl; 1915 } 1916 // par::partitionW<T>(arr, NULL, comm); 1917 1918 #ifdef __DEBUG_PAR__ 1919 MPI_Barrier(comm); 1920 if(!myrank) { 1921 std::cout<<"SampleSort (small n): Stage-1 passed."<<std::endl; 1922 } 1923 MPI_Barrier(comm); 1924 #endif 1925 1926 SortedElem = arr; 1927 MPI_Comm new_comm; 1928 if(totSize < npesLong) { 1929 if(!myrank) { 1930 std::cout<<" Input to sort is small. splittingComm: " 1931 <<npes<<" -> "<< totSize<<std::endl; 1932 } 1933 par::splitCommUsingSplittingRank(static_cast<int>(totSize), &new_comm, comm); 1934 } else { 1935 new_comm = comm; 1936 } 1937 1938 #ifdef __DEBUG_PAR__ 1939 MPI_Barrier(comm); 1940 if(!myrank) { 1941 std::cout<<"SampleSort (small n): Stage-2 passed."<<std::endl; 1942 } 1943 MPI_Barrier(comm); 1944 #endif 1945 1946 if(!SortedElem.empty()) { 1947 par::bitonicSort<T>(SortedElem, new_comm); 1948 } 1949 1950 if(comm!=new_comm) MPI_Comm_free(&new_comm); 1951 1952 1953 #ifdef __DEBUG_PAR__ 1954 MPI_Barrier(comm); 1955 if(!myrank) { 1956 std::cout<<"SampleSort (small n): Stage-3 passed."<<std::endl; 1957 } 1958 MPI_Barrier(comm); 1959 #endif 1960 1961 }// end if 1962 1963 #ifdef __DEBUG_PAR__ 1964 if(!myrank) { 1965 std::cout<<"Using sample sort to sort nodes. n/p^2 is fine."<<std::endl; 1966 } 1967 #endif 1968 1969 //Re-part arr so that each proc. has atleast p elements. 1970 #ifdef _PROFILE_SORT 1971 sort_partitionw.start(); 1972 #endif 1973 // par::partitionW<T>(arr, NULL, comm); 1974 #ifdef _PROFILE_SORT 1975 sort_partitionw.stop(); 1976 #endif 1977 nelem = arr.size(); 1978 1979 #ifdef _PROFILE_SORT 1980 seq_sort.start(); 1981 #endif 1982 omp_par::merge_sort(&arr[0],&arr[arr.size()]); 1983 #ifdef _PROFILE_SORT 1984 seq_sort.stop(); 1985 #endif 1986 1987 std::vector<T> sendSplits(npes-1); 1988 splitters.resize(npes); 1989 1990 #pragma omp parallel for 1991 for(int i = 1; i < npes; i++) { 1992 sendSplits[i-1] = arr[i*nelem/npes]; 1993 }//end for i 1994 1995 #ifdef _PROFILE_SORT 1996 sample_sort_splitters.start(); 1997 #endif 1998 // sort sendSplits using bitonic ... 1999 par::bitonicSort<T>(sendSplits,comm); 2000 #ifdef _PROFILE_SORT 2001 sample_sort_splitters.stop(); 2002 #endif 2003 2004 2005 #ifdef _PROFILE_SORT 2006 sample_prepare_scatter.start(); 2007 #endif 2008 // All gather with last element of splitters. 2009 T* sendSplitsPtr = NULL; 2010 T* splittersPtr = NULL; 2011 if(sendSplits.size() > static_cast<unsigned int>(npes-2)) { 2012 sendSplitsPtr = &(*(sendSplits.begin() + (npes -2))); 2013 } 2014 if(!splitters.empty()) { 2015 splittersPtr = &(*(splitters.begin())); 2016 } 2017 par::Mpi_Allgather<T>(sendSplitsPtr, splittersPtr, 1, comm); 2018 2019 sendSplits.clear(); 2020 2021 int *sendcnts = new int[npes]; 2022 assert(sendcnts); 2023 2024 int * recvcnts = new int[npes]; 2025 assert(recvcnts); 2026 2027 int * sdispls = new int[npes]; 2028 assert(sdispls); 2029 2030 int * rdispls = new int[npes]; 2031 assert(rdispls); 2032 2033 #pragma omp parallel for 2034 for(int k = 0; k < npes; k++){ 2035 sendcnts[k] = 0; 2036 } 2037 2038 { 2039 int omp_p=omp_get_max_threads(); 2040 int* proc_split = new int[omp_p+1]; 2041 DendroIntL* lst_split_indx = new DendroIntL[omp_p+1]; 2042 proc_split[0]=0; 2043 lst_split_indx[0]=0; 2044 lst_split_indx[omp_p]=nelem; 2045 #pragma omp parallel for 2046 for(int i=1;i<omp_p;i++){ 2047 //proc_split[i] = seq::BinSearch(&splittersPtr[0],&splittersPtr[npes-1],arr[i*nelem/omp_p],std::less<T>()); 2048 proc_split[i] = std::upper_bound(&splittersPtr[0],&splittersPtr[npes-1],arr[i*(size_t)nelem/omp_p],std::less<T>())-&splittersPtr[0]; 2049 if(proc_split[i]<npes-1){ 2050 //lst_split_indx[i]=seq::BinSearch(&arr[0],&arr[nelem],splittersPtr[proc_split[i]],std::less<T>()); 2051 lst_split_indx[i]=std::upper_bound(&arr[0],&arr[nelem],splittersPtr[proc_split[i]],std::less<T>())-&arr[0]; 2052 }else{ 2053 proc_split[i]=npes-1; 2054 lst_split_indx[i]=nelem; 2055 } 2056 } 2057 #pragma omp parallel for 2058 for (int i=0;i<omp_p;i++){ 2059 int sendcnts_=0; 2060 int k=proc_split[i]; 2061 for (DendroIntL j = lst_split_indx[i]; j < lst_split_indx[i+1]; j++) { 2062 if (arr[j] <= splitters[k]) { 2063 sendcnts_++; 2064 } else{ 2065 if(sendcnts_>0) 2066 sendcnts[k]=sendcnts_; 2067 sendcnts_=0; 2068 k = seq::UpperBound<T>(npes-1, splittersPtr, k+1, arr[j]); 2069 if (k == (npes-1) ){ 2070 //could not find any splitter >= arr[j] 2071 sendcnts_ = (nelem - j); 2072 break; 2073 } else { 2074 assert(k < (npes-1)); 2075 assert(splitters[k] >= arr[j]); 2076 sendcnts_++; 2077 } 2078 }//end if-else 2079 }//end for j 2080 if(sendcnts_>0) 2081 sendcnts[k]=sendcnts_; 2082 } 2083 delete [] lst_split_indx; 2084 delete [] proc_split; 2085 } 2086 2087 par::Mpi_Alltoall<int>(sendcnts, recvcnts, 1, comm); 2088 2089 sdispls[0] = 0; rdispls[0] = 0; 2090 2091 omp_par::scan(sendcnts,sdispls,npes); 2092 omp_par::scan(recvcnts,rdispls,npes); 2093 2094 DendroIntL nsorted = rdispls[npes-1] + recvcnts[npes-1]; 2095 SortedElem.resize(nsorted); 2096 2097 T* arrPtr = NULL; 2098 T* SortedElemPtr = NULL; 2099 if(!arr.empty()) { 2100 arrPtr = &(*(arr.begin())); 2101 } 2102 if(!SortedElem.empty()) { 2103 SortedElemPtr = &(*(SortedElem.begin())); 2104 } 2105 #ifdef _PROFILE_SORT 2106 sample_prepare_scatter.stop(); 2107 #endif 2108 2109 #ifdef _PROFILE_SORT 2110 sample_do_all2all.start(); 2111 #endif 2112 par::Mpi_Alltoallv_dense<T>(arrPtr, sendcnts, sdispls, 2113 SortedElemPtr, recvcnts, rdispls, comm); 2114 #ifdef _PROFILE_SORT 2115 sample_do_all2all.stop(); 2116 #endif 2117 arr.clear(); 2118 2119 delete [] sendcnts; 2120 sendcnts = NULL; 2121 2122 delete [] recvcnts; 2123 recvcnts = NULL; 2124 2125 delete [] sdispls; 2126 sdispls = NULL; 2127 2128 delete [] rdispls; 2129 rdispls = NULL; 2130 2131 #ifdef _PROFILE_SORT 2132 seq_sort.start(); 2133 #endif 2134 omp_par::merge_sort(&SortedElem[0], &SortedElem[nsorted]); 2135 #ifdef _PROFILE_SORT 2136 seq_sort.stop(); 2137 #endif 2138 2139 2140 #ifdef _PROFILE_SORT 2141 total_sort.stop(); 2142 #endif 2143 }//end function 2144 2145 /********************************************************************/ 2146 /* 2147 * which_keys is one of KEEP_HIGH or KEEP_LOW 2148 * partner is the processor with which to Merge and Split. 2149 * 2150 */ 2151 template <typename T> MergeSplit(std::vector<T> & local_list,int which_keys,int partner,MPI_Comm comm)2152 void MergeSplit( std::vector<T> &local_list, int which_keys, int partner, MPI_Comm comm) { 2153 2154 MPI_Status status; 2155 int send_size = local_list.size(); 2156 int recv_size = 0; 2157 2158 // first communicate how many you will send and how many you will receive ... 2159 2160 int my_rank; 2161 MPI_Comm_rank(comm, &my_rank); 2162 2163 par::Mpi_Sendrecv<int, int>( &send_size , 1, partner, 0, 2164 &recv_size, 1, partner, 0, comm, &status); 2165 2166 // if (!my_rank || my_rank==2) 2167 // std::cout << my_rank << " <--> " << partner << " -> " << send_size << " <- " << recv_size << std::endl; 2168 2169 std::vector<T> temp_list( recv_size, local_list[0] ); 2170 2171 T* local_listPtr = NULL; 2172 T* temp_listPtr = NULL; 2173 if(!local_list.empty()) { 2174 local_listPtr = &(*(local_list.begin())); 2175 } 2176 if(!temp_list.empty()) { 2177 temp_listPtr = &(*(temp_list.begin())); 2178 } 2179 2180 par::Mpi_Sendrecv<T, T>( local_listPtr, send_size, partner, 2181 1, temp_listPtr, recv_size, partner, 1, comm, &status); 2182 2183 2184 MergeLists<T>(local_list, temp_list, which_keys); 2185 2186 temp_list.clear(); 2187 } // Merge_split 2188 2189 template <typename T> Par_bitonic_sort_incr(std::vector<T> & local_list,int proc_set_size,MPI_Comm comm)2190 void Par_bitonic_sort_incr( std::vector<T> &local_list, int proc_set_size, MPI_Comm comm ) { 2191 int eor_bit; 2192 int proc_set_dim; 2193 int stage; 2194 int partner; 2195 int my_rank; 2196 2197 MPI_Comm_rank(comm, &my_rank); 2198 2199 proc_set_dim = 0; 2200 int x = proc_set_size; 2201 while (x > 1) { 2202 x = x >> 1; 2203 proc_set_dim++; 2204 } 2205 2206 eor_bit = (1 << (proc_set_dim - 1) ); 2207 2208 for (stage = 0; stage < proc_set_dim; stage++) { 2209 partner = (my_rank ^ eor_bit); 2210 2211 if (my_rank < partner) { 2212 MergeSplit<T> ( local_list, KEEP_LOW, partner, comm); 2213 } else { 2214 MergeSplit<T> ( local_list, KEEP_HIGH, partner, comm); 2215 } 2216 2217 eor_bit = (eor_bit >> 1); 2218 } 2219 } // Par_bitonic_sort_incr 2220 2221 2222 template <typename T> Par_bitonic_sort_decr(std::vector<T> & local_list,int proc_set_size,MPI_Comm comm)2223 void Par_bitonic_sort_decr( std::vector<T> &local_list, int proc_set_size, MPI_Comm comm) { 2224 int eor_bit; 2225 int proc_set_dim; 2226 int stage; 2227 int partner; 2228 int my_rank; 2229 2230 MPI_Comm_rank(comm, &my_rank); 2231 2232 proc_set_dim = 0; 2233 int x = proc_set_size; 2234 while (x > 1) { 2235 x = x >> 1; 2236 proc_set_dim++; 2237 } 2238 2239 eor_bit = (1 << (proc_set_dim - 1)); 2240 2241 for (stage = 0; stage < proc_set_dim; stage++) { 2242 partner = my_rank ^ eor_bit; 2243 2244 if (my_rank > partner) { 2245 MergeSplit<T> ( local_list, KEEP_LOW, partner, comm); 2246 } else { 2247 MergeSplit<T> ( local_list, KEEP_HIGH, partner, comm); 2248 } 2249 2250 eor_bit = (eor_bit >> 1); 2251 } 2252 2253 } // Par_bitonic_sort_decr 2254 2255 template <typename T> Par_bitonic_merge_incr(std::vector<T> & local_list,int proc_set_size,MPI_Comm comm)2256 void Par_bitonic_merge_incr( std::vector<T> &local_list, int proc_set_size, MPI_Comm comm ) { 2257 int partner; 2258 int rank, npes; 2259 2260 MPI_Comm_rank(comm, &rank); 2261 MPI_Comm_size(comm, &npes); 2262 2263 unsigned int num_left = binOp::getPrevHighestPowerOfTwo(npes); 2264 unsigned int num_right = npes - num_left; 2265 2266 // 1, Do merge between the k right procs and the highest k left procs. 2267 if ( (static_cast<unsigned int>(rank) < num_left) && 2268 (static_cast<unsigned int>(rank) >= (num_left - num_right)) ) { 2269 partner = static_cast<unsigned int>(rank) + num_right; 2270 MergeSplit<T> ( local_list, KEEP_LOW, partner, comm); 2271 } else if (static_cast<unsigned int>(rank) >= num_left) { 2272 partner = static_cast<unsigned int>(rank) - num_right; 2273 MergeSplit<T> ( local_list, KEEP_HIGH, partner, comm); 2274 } 2275 } 2276 2277 template <typename T> bitonicSort_binary(std::vector<T> & in,MPI_Comm comm)2278 void bitonicSort_binary(std::vector<T> & in, MPI_Comm comm) { 2279 int proc_set_size; 2280 unsigned int and_bit; 2281 int rank; 2282 int npes; 2283 2284 MPI_Comm_size(comm, &npes); 2285 2286 #ifdef __DEBUG_PAR__ 2287 assert(npes > 1); 2288 assert(!(npes & (npes-1))); 2289 assert(!(in.empty())); 2290 #endif 2291 2292 MPI_Comm_rank(comm, &rank); 2293 2294 for (proc_set_size = 2, and_bit = 2; 2295 proc_set_size <= npes; 2296 proc_set_size = proc_set_size*2, 2297 and_bit = and_bit << 1) { 2298 2299 if ((rank & and_bit) == 0) { 2300 Par_bitonic_sort_incr<T>( in, proc_set_size, comm); 2301 } else { 2302 Par_bitonic_sort_decr<T>( in, proc_set_size, comm); 2303 } 2304 }//end for 2305 } 2306 2307 template <typename T> bitonicSort(std::vector<T> & in,MPI_Comm comm)2308 void bitonicSort(std::vector<T> & in, MPI_Comm comm) { 2309 int rank; 2310 int npes; 2311 2312 MPI_Comm_size(comm, &npes); 2313 MPI_Comm_rank(comm, &rank); 2314 2315 assert(!(in.empty())); 2316 2317 omp_par::merge_sort(&in[0],&in[in.size()]); 2318 MPI_Barrier(comm); 2319 2320 if(npes > 1) { 2321 2322 // check if npes is a power of two ... 2323 bool isPower = (!(npes & (npes - 1))); 2324 2325 if ( isPower ) { 2326 bitonicSort_binary<T>(in, comm); 2327 } else { 2328 MPI_Comm new_comm; 2329 2330 // Since npes is not a power of two, we shall split the problem in two ... 2331 // 2332 // 1. Create 2 comm groups ... one for the 2^d portion and one for the 2333 // remainder. 2334 unsigned int splitter = splitCommBinary(comm, &new_comm); 2335 2336 if ( static_cast<unsigned int>(rank) < splitter) { 2337 bitonicSort_binary<T>(in, new_comm); 2338 } else { 2339 bitonicSort<T>(in, new_comm); 2340 } 2341 MPI_Comm_free(&new_comm); 2342 // 3. Do a special merge of the two segments. (original comm). 2343 Par_bitonic_merge_incr( in, binOp::getNextHighestPowerOfTwo(npes), comm ); 2344 2345 splitter = splitCommBinaryNoFlip(comm, &new_comm); 2346 2347 // 4. Now a final sort on the segments. 2348 if (static_cast<unsigned int>(rank) < splitter) { 2349 bitonicSort_binary<T>(in, new_comm); 2350 } else { 2351 bitonicSort<T>(in, new_comm); 2352 } 2353 MPI_Comm_free(&new_comm); 2354 }//end if isPower of 2 2355 }//end if single processor 2356 }//end function 2357 2358 template <typename T> MergeLists(std::vector<T> & listA,std::vector<T> & listB,int KEEP_WHAT)2359 void MergeLists( std::vector<T> &listA, std::vector<T> &listB, 2360 int KEEP_WHAT) { 2361 2362 T _low, _high; 2363 2364 assert(!(listA.empty())); 2365 assert(!(listB.empty())); 2366 2367 _low = ( (listA[0] > listB[0]) ? listA[0] : listB[0]); 2368 _high = ( (listA[listA.size()-1] < listB[listB.size()-1]) ? 2369 listA[listA.size()-1] : listB[listB.size()-1]); 2370 2371 // We will do a full merge first ... 2372 size_t list_size = listA.size() + listB.size(); 2373 2374 std::vector<T> scratch_list(list_size); 2375 2376 unsigned int index1 = 0; 2377 unsigned int index2 = 0; 2378 2379 for (size_t i = 0; i < list_size; i++) { 2380 //The order of (A || B) is important here, 2381 //so that index2 remains within bounds 2382 if ( (index1 < listA.size()) && ( (index2 >= listB.size()) || (listA[index1] <= listB[index2]) ) ) { 2383 scratch_list[i] = listA[index1]; 2384 index1++; 2385 } else { 2386 scratch_list[i] = listB[index2]; 2387 index2++; 2388 } 2389 } 2390 2391 //Scratch list is sorted at this point. 2392 2393 listA.clear(); 2394 listB.clear(); 2395 if ( KEEP_WHAT == KEEP_LOW ) { 2396 int ii=0; 2397 while ( ( (scratch_list[ii] < _low) || (ii < (list_size/2)) ) && (scratch_list[ii] <= _high) ) { 2398 ii++; 2399 } 2400 2401 if(ii) { 2402 listA.insert(listA.end(), scratch_list.begin(), 2403 (scratch_list.begin() + ii)); 2404 } 2405 } else { 2406 int ii = (list_size - 1); 2407 while ( ( (ii >= (list_size/2)) 2408 && (scratch_list[ii] >= _low) ) 2409 || (scratch_list[ii] > _high) ) { 2410 ii--; 2411 } 2412 if(ii < (list_size - 1) ) { 2413 listA.insert(listA.begin(), (scratch_list.begin() + (ii + 1)), 2414 (scratch_list.begin() + list_size)); 2415 } 2416 } 2417 scratch_list.clear(); 2418 }//end function 2419 2420 2421 template<typename T> Sorted_Sample_Select(std::vector<T> & arr,unsigned int kway,std::vector<unsigned int> & min_idx,std::vector<unsigned int> & max_idx,std::vector<DendroIntL> & splitter_ranks,MPI_Comm comm)2422 std::vector<T> Sorted_Sample_Select(std::vector<T>& arr, unsigned int kway, std::vector<unsigned int>& min_idx, std::vector<unsigned int>& max_idx, std::vector<DendroIntL>& splitter_ranks, MPI_Comm comm) { 2423 int rank, npes; 2424 MPI_Comm_size(comm, &npes); 2425 MPI_Comm_rank(comm, &rank); 2426 2427 //------------------------------------------- 2428 DendroIntL totSize, nelem = arr.size(); 2429 par::Mpi_Allreduce<DendroIntL>(&nelem, &totSize, 1, MPI_SUM, comm); 2430 2431 //Determine splitters. O( log(N/p) + log(p) ) 2432 int splt_count = (1000*kway*nelem)/totSize; 2433 if (npes>1000*kway) splt_count = (((float)rand()/(float)RAND_MAX)*totSize<(1000*kway*nelem)?1:0); 2434 if (splt_count>nelem) splt_count=nelem; 2435 std::vector<T> splitters(splt_count); 2436 for(size_t i=0;i<splt_count;i++) 2437 splitters[i] = arr[rand()%nelem]; 2438 2439 // Gather all splitters. O( log(p) ) 2440 int glb_splt_count; 2441 std::vector<int> glb_splt_cnts(npes); 2442 std::vector<int> glb_splt_disp(npes,0); 2443 par::Mpi_Allgather<int>(&splt_count, &glb_splt_cnts[0], 1, comm); 2444 omp_par::scan(&glb_splt_cnts[0],&glb_splt_disp[0],npes); 2445 glb_splt_count = glb_splt_cnts[npes-1] + glb_splt_disp[npes-1]; 2446 std::vector<T> glb_splitters(glb_splt_count); 2447 MPI_Allgatherv(& splitters[0], splt_count, par::Mpi_datatype<T>::value(), 2448 &glb_splitters[0], &glb_splt_cnts[0], &glb_splt_disp[0], 2449 par::Mpi_datatype<T>::value(), comm); 2450 2451 // rank splitters. O( log(N/p) + log(p) ) 2452 std::vector<DendroIntL> disp(glb_splt_count,0); 2453 if(nelem>0){ 2454 #pragma omp parallel for 2455 for(size_t i=0; i<glb_splt_count; i++){ 2456 disp[i] = std::lower_bound(&arr[0], &arr[nelem], glb_splitters[i]) - &arr[0]; 2457 } 2458 } 2459 std::vector<DendroIntL> glb_disp(glb_splt_count, 0); 2460 MPI_Allreduce(&disp[0], &glb_disp[0], glb_splt_count, par::Mpi_datatype<DendroIntL>::value(), MPI_SUM, comm); 2461 2462 splitter_ranks.clear(); splitter_ranks.resize(kway); 2463 min_idx.clear(); min_idx.resize(kway); 2464 max_idx.clear(); max_idx.resize(kway); 2465 std::vector<T> split_keys(kway); 2466 #pragma omp parallel for 2467 for (unsigned int qq=0; qq<kway; qq++) { 2468 DendroIntL* _disp = &glb_disp[0]; 2469 DendroIntL* _mind = &glb_disp[0]; 2470 DendroIntL* _maxd = &glb_disp[0]; 2471 DendroIntL optSplitter = ((qq+1)*totSize)/(kway+1); 2472 // if (!rank) std::cout << "opt " << qq << " - " << optSplitter << std::endl; 2473 for(size_t i=0; i<glb_splt_count; i++) { 2474 if(labs(glb_disp[i] - optSplitter) < labs(*_disp - optSplitter)) { 2475 _disp = &glb_disp[i]; 2476 } 2477 if( (glb_disp[i] > optSplitter) && ( labs(glb_disp[i] - optSplitter) < labs(*_maxd - optSplitter)) ) { 2478 _maxd = &glb_disp[i]; 2479 } 2480 if( (glb_disp[i] < optSplitter) && ( labs(optSplitter - glb_disp[i]) < labs(optSplitter - *_mind)) ) { 2481 _mind = &glb_disp[i]; 2482 } 2483 } 2484 split_keys[qq] = glb_splitters[_disp - &glb_disp[0]]; 2485 min_idx[qq] = std::lower_bound(&arr[0], &arr[nelem], glb_splitters[_mind - &glb_disp[0]]) - &arr[0]; 2486 max_idx[qq] = std::upper_bound(&arr[0], &arr[nelem], glb_splitters[_maxd - &glb_disp[0]]) - &arr[0]; 2487 splitter_ranks[qq] = optSplitter - *_mind; 2488 } 2489 2490 return split_keys; 2491 } 2492 2493 template<typename T> Sorted_approx_Select_helper(std::vector<T> & arr,std::vector<size_t> & exp_rank,std::vector<T> & splt_key,int beta,std::vector<size_t> & start,std::vector<size_t> & end,size_t & max_err,MPI_Comm comm)2494 void Sorted_approx_Select_helper(std::vector<T>& arr, std::vector<size_t>& exp_rank, std::vector<T>& splt_key, int beta, std::vector<size_t>& start, std::vector<size_t>& end, size_t& max_err, MPI_Comm comm) { 2495 2496 int rank, npes; 2497 MPI_Comm_size(comm, &npes); 2498 MPI_Comm_rank(comm, &rank); 2499 2500 size_t nelem=arr.size(); 2501 int kway=exp_rank.size(); 2502 std::vector<size_t> locSize(kway), totSize(kway); 2503 for(int i=0;i<kway;i++) locSize[i]=end[i]-start[i]; 2504 par::Mpi_Allreduce<size_t>(&locSize[0], &totSize[0], kway, MPI_SUM, comm); 2505 2506 //------------------------------------------- 2507 std::vector<T> loc_splt; 2508 for(int i=0;i<kway;i++){ 2509 int splt_count = (totSize[i]==0?1:(beta*(end[i]-start[i]))/totSize[i]); 2510 if (npes>beta) splt_count = (((float)rand()/(float)RAND_MAX)*totSize[i]<(beta*locSize[i])?1:0); 2511 for(int j=0;j<splt_count;j++) loc_splt.push_back(arr[start[i]+rand()%(locSize[i]+1)]); 2512 std::sort(&loc_splt[loc_splt.size()-splt_count],&loc_splt[loc_splt.size()]); 2513 } 2514 2515 int splt_count=loc_splt.size(); 2516 2517 // Gather all splitters. O( log(p) ) 2518 int glb_splt_count; 2519 std::vector<int> glb_splt_cnts(npes); 2520 std::vector<int> glb_splt_disp(npes,0); 2521 par::Mpi_Allgather<int>(&splt_count, &glb_splt_cnts[0], 1, comm); 2522 omp_par::scan(&glb_splt_cnts[0],&glb_splt_disp[0],npes); 2523 glb_splt_count = glb_splt_cnts[npes-1] + glb_splt_disp[npes-1]; 2524 std::vector<T> glb_splt(glb_splt_count); 2525 MPI_Allgatherv(&loc_splt[0], splt_count, par::Mpi_datatype<T>::value(), 2526 &glb_splt[0], &glb_splt_cnts[0], &glb_splt_disp[0], par::Mpi_datatype<T>::value(), comm); 2527 //MPI_Barrier(comm); tt[dbg_cnt]+=omp_get_wtime(); dbg_cnt++; ////////////////////////////////////////////////////////////////////// 2528 std::sort(&glb_splt[0],&glb_splt[glb_splt_count]); 2529 //MPI_Barrier(comm); tt[dbg_cnt]+=omp_get_wtime(); dbg_cnt++; ////////////////////////////////////////////////////////////////////// 2530 2531 // rank splitters. O( log(N/p) + log(p) ) 2532 std::vector<size_t> loc_rank(glb_splt_count,0); 2533 if(nelem>0){ 2534 #pragma omp parallel for 2535 for(size_t i=0; i<glb_splt_count; i++){ 2536 loc_rank[i] = std::lower_bound(&arr[0], &arr[nelem], glb_splt[i]) - &arr[0]; 2537 } 2538 } 2539 //MPI_Barrier(comm); tt[dbg_cnt]+=omp_get_wtime(); dbg_cnt++; ////////////////////////////////////////////////////////////////////// 2540 std::vector<size_t> glb_rank(glb_splt_count, 0); 2541 MPI_Allreduce(&loc_rank[0], &glb_rank[0], glb_splt_count, par::Mpi_datatype<size_t>::value(), MPI_SUM, comm); 2542 //MPI_Barrier(comm); tt[dbg_cnt]+=omp_get_wtime(); dbg_cnt++; ////////////////////////////////////////////////////////////////////// 2543 2544 size_t new_max_err=0; 2545 std::vector<T> split_keys(kway); 2546 // #pragma omp parallel for 2547 for (int i=0; i<kway; i++) { 2548 int ub_indx=std::upper_bound(&glb_rank[0], &glb_rank[glb_splt_count], exp_rank[i])-&glb_rank[0]; 2549 int lb_indx=ub_indx-1; if(lb_indx<0) lb_indx=0; 2550 size_t err=labs(glb_rank[lb_indx]-exp_rank[i]); 2551 2552 if(err<max_err){ 2553 if(glb_rank[lb_indx]>exp_rank[i]) start[i]=0; 2554 else start[i] = loc_rank[lb_indx]; 2555 if(ub_indx==glb_splt_count) end[i]=nelem; 2556 else end[i] = loc_rank[ub_indx]; 2557 splt_key[i]=glb_splt[lb_indx]; 2558 if(new_max_err<err) new_max_err=err; 2559 } 2560 } 2561 max_err=new_max_err; 2562 } 2563 2564 template<typename T> Sorted_approx_Select_recursive(std::vector<T> & arr,unsigned int kway,MPI_Comm comm)2565 std::vector<T> Sorted_approx_Select_recursive(std::vector<T>& arr, unsigned int kway, MPI_Comm comm) { 2566 int rank, npes; 2567 MPI_Comm_size(comm, &npes); 2568 MPI_Comm_rank(comm, &rank); 2569 2570 //------------------------------------------- 2571 DendroIntL totSize, nelem = arr.size(); 2572 par::Mpi_Allreduce<DendroIntL>(&nelem, &totSize, 1, MPI_SUM, comm); 2573 2574 double tol=1e-2/kway; 2575 int beta=pow(1.0/tol,1.0/3.0)*3.0; 2576 std::vector<T> splt_key(kway); 2577 std::vector<size_t> start(kway,0); 2578 std::vector<size_t> end(kway,nelem); 2579 std::vector<size_t> exp_rank(kway); 2580 for(int i=0;i<kway;i++) exp_rank[i]=((i+1)*totSize)/(kway+1); 2581 2582 size_t max_error=totSize; 2583 while(max_error>totSize*tol){ 2584 Sorted_approx_Select_helper(arr, exp_rank, splt_key, beta, start, end, max_error, comm); 2585 } 2586 2587 return splt_key; 2588 } 2589 2590 template<typename T> Sorted_approx_Select(std::vector<T> & arr,unsigned int kway,MPI_Comm comm)2591 std::vector<T> Sorted_approx_Select(std::vector<T>& arr, unsigned int kway, MPI_Comm comm) { 2592 int rank, npes; 2593 MPI_Comm_size(comm, &npes); 2594 MPI_Comm_rank(comm, &rank); 2595 2596 //------------------------------------------- 2597 DendroIntL totSize, nelem = arr.size(); 2598 par::Mpi_Allreduce<DendroIntL>(&nelem, &totSize, 1, MPI_SUM, comm); 2599 2600 //Determine splitters. O( log(N/p) + log(p) ) 2601 int splt_count = (1000*kway*nelem)/totSize; 2602 if (npes>1000*kway) splt_count = (((float)rand()/(float)RAND_MAX)*totSize<(1000*kway*nelem)?1:0); 2603 if (splt_count>nelem) splt_count=nelem; 2604 std::vector<T> splitters(splt_count); 2605 for(size_t i=0;i<splt_count;i++) 2606 splitters[i] = arr[rand()%nelem]; 2607 2608 // Gather all splitters. O( log(p) ) 2609 int glb_splt_count; 2610 std::vector<int> glb_splt_cnts(npes); 2611 std::vector<int> glb_splt_disp(npes,0); 2612 par::Mpi_Allgather<int>(&splt_count, &glb_splt_cnts[0], 1, comm); 2613 omp_par::scan(&glb_splt_cnts[0],&glb_splt_disp[0],npes); 2614 glb_splt_count = glb_splt_cnts[npes-1] + glb_splt_disp[npes-1]; 2615 std::vector<T> glb_splitters(glb_splt_count); 2616 MPI_Allgatherv(& splitters[0], splt_count, par::Mpi_datatype<T>::value(), 2617 &glb_splitters[0], &glb_splt_cnts[0], &glb_splt_disp[0], 2618 par::Mpi_datatype<T>::value(), comm); 2619 2620 // rank splitters. O( log(N/p) + log(p) ) 2621 std::vector<DendroIntL> disp(glb_splt_count,0); 2622 if(nelem>0){ 2623 #pragma omp parallel for 2624 for(size_t i=0; i<glb_splt_count; i++){ 2625 disp[i] = std::lower_bound(&arr[0], &arr[nelem], glb_splitters[i]) - &arr[0]; 2626 } 2627 } 2628 std::vector<DendroIntL> glb_disp(glb_splt_count, 0); 2629 MPI_Allreduce(&disp[0], &glb_disp[0], glb_splt_count, par::Mpi_datatype<DendroIntL>::value(), MPI_SUM, comm); 2630 2631 std::vector<T> split_keys(kway); 2632 #pragma omp parallel for 2633 for (unsigned int qq=0; qq<kway; qq++) { 2634 DendroIntL* _disp = &glb_disp[0]; 2635 DendroIntL optSplitter = ((qq+1)*totSize)/(kway+1); 2636 // if (!rank) std::cout << "opt " << qq << " - " << optSplitter << std::endl; 2637 for(size_t i=0; i<glb_splt_count; i++) { 2638 if(labs(glb_disp[i] - optSplitter) < labs(*_disp - optSplitter)) { 2639 _disp = &glb_disp[i]; 2640 } 2641 } 2642 split_keys[qq] = glb_splitters[_disp - &glb_disp[0]]; 2643 } 2644 2645 return split_keys; 2646 } 2647 2648 template<typename T> Sorted_approx_Select_skewed(std::vector<T> & arr,unsigned int kway,MPI_Comm comm)2649 std::vector<std::pair<T, DendroIntL> > Sorted_approx_Select_skewed (std::vector<T>& arr, unsigned int kway, MPI_Comm comm) { 2650 int rank, npes; 2651 MPI_Comm_size(comm, &npes); 2652 MPI_Comm_rank(comm, &rank); 2653 2654 //------------------------------------------- 2655 DendroIntL totSize, nelem = arr.size(); 2656 par::Mpi_Allreduce<DendroIntL>(&nelem, &totSize, 1, MPI_SUM, comm); 2657 2658 //Determine splitters. O( log(N/p) + log(p) ) 2659 int splt_count = (1000*kway*nelem)/totSize; 2660 if (npes>1000*kway) splt_count = (((float)rand()/(float)RAND_MAX)*totSize<(1000*kway*nelem)?1:0); 2661 if (splt_count>nelem) splt_count=nelem; 2662 2663 //! this changes to a pair ? 2664 // long should be sufficient for some time at least 2665 // 1<<63 <- 9,223,372,036,854,775,808 (9 Quintillion ) 2666 std::vector<T> splitters(splt_count); 2667 std::vector<DendroIntL> dup_ranks(splt_count); 2668 for(size_t i=0;i<splt_count;i++) { 2669 dup_ranks[i] = (2*i*totSize/npes) + rand()%nelem; 2670 splitters[i] = arr[rand()%nelem]; 2671 } 2672 2673 // std::cout << rank << ": got splitters and indices " << std::endl; 2674 // Gather all splitters. O( log(p) ) 2675 int glb_splt_count; 2676 std::vector<int> glb_splt_cnts(npes); 2677 std::vector<int> glb_splt_disp(npes,0); 2678 2679 par::Mpi_Allgather<int>(&splt_count, &glb_splt_cnts[0], 1, comm); 2680 omp_par::scan(&glb_splt_cnts[0],&glb_splt_disp[0],npes); 2681 glb_splt_count = glb_splt_cnts[npes-1] + glb_splt_disp[npes-1]; 2682 2683 std::vector<T> glb_splitters (glb_splt_count); 2684 std::vector<DendroIntL> glb_dup_ranks (glb_splt_count); 2685 MPI_Allgatherv(& dup_ranks[0], splt_count, par::Mpi_datatype<DendroIntL>::value(), 2686 &glb_dup_ranks[0], &glb_splt_cnts[0], &glb_splt_disp[0], 2687 par::Mpi_datatype<DendroIntL>::value(), comm); 2688 MPI_Allgatherv(& splitters[0], splt_count, par::Mpi_datatype<T>::value(), 2689 &glb_splitters[0], &glb_splt_cnts[0], &glb_splt_disp[0], 2690 par::Mpi_datatype<T>::value(), comm); 2691 2692 2693 // std::cout << rank << ": ranking splitters " << std::endl; 2694 // rank splitters. O( log(N/p) + log(p) ) 2695 std::vector<DendroIntL> disp(glb_splt_count, 0); 2696 DendroIntL dLow, dHigh; 2697 if(nelem>0){ 2698 #pragma omp parallel for 2699 for(size_t i=0; i<glb_splt_count; i++){ 2700 // disp[i] = std::lower_bound(&arr[0], &arr[nelem], glb_splitters[i]) - &arr[0]; 2701 dLow = std::lower_bound(&arr[0], &arr[nelem], glb_splitters[i]) - &arr[0]; 2702 dHigh = std::upper_bound(&arr[0], &arr[nelem], glb_splitters[i]) - &arr[0]; 2703 if ( (dHigh-dLow) > 1 ) { 2704 DendroIntL sRank = glb_dup_ranks[i]*npes/2/totSize; 2705 if (sRank < rank ) { 2706 disp[i] = dLow; 2707 } else if (sRank > rank) { 2708 disp[i] = dHigh; 2709 } else { 2710 disp[i] = glb_dup_ranks[i] - (2*rank*totSize/npes); 2711 } 2712 } else { 2713 disp[i] = dLow; 2714 } 2715 } 2716 } 2717 std::vector<DendroIntL> glb_disp(glb_splt_count, 0); 2718 2719 // std::cout << rank << ": all reduce " << std::endl; 2720 MPI_Allreduce(&disp[0], &glb_disp[0], glb_splt_count, par::Mpi_datatype<DendroIntL>::value(), MPI_SUM, comm); 2721 2722 std::vector< std::pair<T, DendroIntL> > split_keys(kway); 2723 std::pair<T, DendroIntL> key_pair; 2724 #pragma omp parallel for 2725 for (unsigned int qq=0; qq<kway; qq++) { 2726 DendroIntL* _disp = &glb_disp[0]; 2727 DendroIntL optSplitter = ((qq+1)*totSize)/(kway+1); 2728 // if (!rank) std::cout << "opt " << qq << " - " << optSplitter << std::endl; 2729 for(size_t i=0; i<glb_splt_count; i++) { 2730 if(labs(glb_disp[i] - optSplitter) < labs(*_disp - optSplitter)) { 2731 _disp = &glb_disp[i]; 2732 } 2733 } 2734 2735 key_pair.first = glb_splitters[_disp - &glb_disp[0]]; 2736 key_pair.second = glb_dup_ranks[_disp - &glb_disp[0]]; 2737 split_keys[qq] = key_pair; // 2738 } 2739 2740 // std::cout << rank << ": all done" << std::endl; 2741 return split_keys; 2742 } 2743 2744 }//end namespace 2745 2746