1 
2 /**
3   @file parUtils.txx
4   @brief Definitions of the templated functions in the par module.
5   @author Rahul S. Sampath, rahul.sampath@gmail.com
6   @author Hari Sundar, hsundar@gmail.com
7   @author Shravan Veerapaneni, shravan@seas.upenn.edu
8   @author Santi Swaroop Adavani, santis@gmail.com
9  */
10 
11 #include "binUtils.h"
12 #include "seqUtils.h"
13 #include "dtypes.h"
14 
15 #include <cassert>
16 #include <iostream>
17 #include <algorithm>
18 #include <cstring>
19 
20 
21 #include "indexHolder.h"
22 
23 #ifdef _PROFILE_SORT
24   #include "sort_profiler.h"
25 #endif
26 
27 #include "ompUtils.h"
28 
29 
30 #include <mpi.h>
31 
32 #ifdef __DEBUG__
33 #ifndef __DEBUG_PAR__
34 #define __DEBUG_PAR__
35 #endif
36 #endif
37 
38 #ifndef KWAY
39 		#define KWAY 64
40 #endif
41 
42 namespace par {
43 
44   template <typename T>
Mpi_Isend(T * buf,int count,int dest,int tag,MPI_Comm comm,MPI_Request * request)45     inline int Mpi_Isend(T* buf, int count, int dest, int tag,
46         MPI_Comm comm, MPI_Request* request) {
47 
48       MPI_Isend(buf, count, par::Mpi_datatype<T>::value(),
49           dest, tag, comm, request);
50 
51       return 1;
52 
53     }
54 
55   template <typename T>
Mpi_Issend(T * buf,int count,int dest,int tag,MPI_Comm comm,MPI_Request * request)56     inline int Mpi_Issend(T* buf, int count, int dest, int tag,
57         MPI_Comm comm, MPI_Request* request) {
58 
59       MPI_Issend(buf, count, par::Mpi_datatype<T>::value(),
60           dest, tag, comm, request);
61 
62       return 1;
63 
64     }
65 
66   template <typename T>
Mpi_Recv(T * buf,int count,int source,int tag,MPI_Comm comm,MPI_Status * status)67     inline int Mpi_Recv(T* buf, int count, int source, int tag,
68         MPI_Comm comm, MPI_Status* status) {
69 
70       MPI_Recv(buf, count, par::Mpi_datatype<T>::value(),
71           source, tag, comm, status);
72 
73       return 1;
74 
75     }
76 
77   template <typename T>
Mpi_Irecv(T * buf,int count,int source,int tag,MPI_Comm comm,MPI_Request * request)78     inline int Mpi_Irecv(T* buf, int count, int source, int tag,
79         MPI_Comm comm, MPI_Request* request) {
80 
81       MPI_Irecv(buf, count, par::Mpi_datatype<T>::value(),
82           source, tag, comm, request);
83 
84       return 1;
85 
86     }
87 
88   template <typename T, typename S>
Mpi_Sendrecv(T * sendBuf,int sendCount,int dest,int sendTag,S * recvBuf,int recvCount,int source,int recvTag,MPI_Comm comm,MPI_Status * status)89     inline int Mpi_Sendrecv( T* sendBuf, int sendCount, int dest, int sendTag,
90         S* recvBuf, int recvCount, int source, int recvTag,
91         MPI_Comm comm, MPI_Status* status) {
92 
93         return MPI_Sendrecv(sendBuf, sendCount, par::Mpi_datatype<T>::value(), dest, sendTag,
94             recvBuf, recvCount, par::Mpi_datatype<S>::value(), source, recvTag, comm, status);
95 
96     }
97 
98   template <typename T>
Mpi_Scan(T * sendbuf,T * recvbuf,int count,MPI_Op op,MPI_Comm comm)99     inline int Mpi_Scan( T* sendbuf, T* recvbuf, int count, MPI_Op op, MPI_Comm comm) {
100 #ifdef __PROFILE_WITH_BARRIER__
101       MPI_Barrier(comm);
102 #endif
103         return MPI_Scan(sendbuf, recvbuf, count, par::Mpi_datatype<T>::value(), op, comm);
104     }
105 
106   template <typename T>
Mpi_Allreduce(T * sendbuf,T * recvbuf,int count,MPI_Op op,MPI_Comm comm)107     inline int Mpi_Allreduce(T* sendbuf, T* recvbuf, int count, MPI_Op op, MPI_Comm comm) {
108 #ifdef __PROFILE_WITH_BARRIER__
109       MPI_Barrier(comm);
110 #endif
111 
112         return MPI_Allreduce(sendbuf, recvbuf, count, par::Mpi_datatype<T>::value(), op, comm);
113     }
114 
115   template <typename T>
Mpi_Alltoall(T * sendbuf,T * recvbuf,int count,MPI_Comm comm)116     inline int Mpi_Alltoall(T* sendbuf, T* recvbuf, int count, MPI_Comm comm) {
117 #ifdef __PROFILE_WITH_BARRIER__
118       MPI_Barrier(comm);
119 #endif
120 
121         return MPI_Alltoall(sendbuf, count, par::Mpi_datatype<T>::value(),
122             recvbuf, count, par::Mpi_datatype<T>::value(), comm);
123 
124     }
125 
126   template <typename T>
Mpi_Alltoallv(T * sendbuf,int * sendcnts,int * sdispls,T * recvbuf,int * recvcnts,int * rdispls,MPI_Comm comm)127     inline int Mpi_Alltoallv
128     (T* sendbuf, int* sendcnts, int* sdispls,
129      T* recvbuf, int* recvcnts, int* rdispls, MPI_Comm comm) {
130 #ifdef __PROFILE_WITH_BARRIER__
131       MPI_Barrier(comm);
132 #endif
133 
134         return MPI_Alltoallv(
135             sendbuf, sendcnts, sdispls, par::Mpi_datatype<T>::value(),
136             recvbuf, recvcnts, rdispls, par::Mpi_datatype<T>::value(),
137             comm);
138         return 0;
139     }
140 
141   template <typename T>
Mpi_Gather(T * sendBuffer,T * recvBuffer,int count,int root,MPI_Comm comm)142     inline int Mpi_Gather( T* sendBuffer, T* recvBuffer, int count, int root, MPI_Comm comm) {
143 #ifdef __PROFILE_WITH_BARRIER__
144       MPI_Barrier(comm);
145 #endif
146 
147         return MPI_Gather(sendBuffer, count, par::Mpi_datatype<T>::value(),
148             recvBuffer, count, par::Mpi_datatype<T>::value(), root, comm);
149 
150     }
151 
152   template <typename T>
Mpi_Bcast(T * buffer,int count,int root,MPI_Comm comm)153     inline int Mpi_Bcast(T* buffer, int count, int root, MPI_Comm comm) {
154 #ifdef __PROFILE_WITH_BARRIER__
155       MPI_Barrier(comm);
156 #endif
157 
158         return MPI_Bcast(buffer, count, par::Mpi_datatype<T>::value(), root, comm);
159 
160     }
161 
162   template <typename T>
Mpi_Reduce(T * sendbuf,T * recvbuf,int count,MPI_Op op,int root,MPI_Comm comm)163     inline int Mpi_Reduce(T* sendbuf, T* recvbuf, int count, MPI_Op op, int root, MPI_Comm comm) {
164 #ifdef __PROFILE_WITH_BARRIER__
165       MPI_Barrier(comm);
166 #endif
167 
168         return MPI_Reduce(sendbuf, recvbuf, count, par::Mpi_datatype<T>::value(), op, root, comm);
169 
170     }
171 
172   template <typename T>
Mpi_Allgatherv(T * sendBuf,int sendCount,T * recvBuf,int * recvCounts,int * displs,MPI_Comm comm)173     int Mpi_Allgatherv(T* sendBuf, int sendCount, T* recvBuf,
174         int* recvCounts, int* displs, MPI_Comm comm) {
175 #ifdef __PROFILE_WITH_BARRIER__
176       MPI_Barrier(comm);
177 #endif
178 
179 #ifdef __USE_A2A_FOR_MPI_ALLGATHER__
180 
181       int maxSendCount;
182       int npes, rank;
183 
184       MPI_Comm_size(comm, &npes);
185       MPI_Comm_rank(comm, &rank);
186 
187       par::Mpi_Allreduce<int>(&sendCount, &maxSendCount, 1, MPI_MAX, comm);
188 
189       T* dummySendBuf = new T[maxSendCount*npes];
190       assert(dummySendBuf);
191 
192       #pragma omp parallel for
193       for(int i = 0; i < npes; i++) {
194         for(int j = 0; j < sendCount; j++) {
195           dummySendBuf[(i*maxSendCount) + j] = sendBuf[j];
196         }
197       }
198 
199       T* dummyRecvBuf = new T[maxSendCount*npes];
200       assert(dummyRecvBuf);
201 
202       par::Mpi_Alltoall<T>(dummySendBuf, dummyRecvBuf, maxSendCount, comm);
203 
204       #pragma omp parallel for
205       for(int i = 0; i < npes; i++) {
206         for(int j = 0; j < recvCounts[i]; j++) {
207           recvBuf[displs[i] + j] = dummyRecvBuf[(i*maxSendCount) + j];
208         }
209       }
210 
211       delete [] dummySendBuf;
212       delete [] dummyRecvBuf;
213       return 1;
214 
215 #else
216 
217       return MPI_Allgatherv(sendBuf, sendCount, par::Mpi_datatype<T>::value(),
218           recvBuf, recvCounts, displs, par::Mpi_datatype<T>::value(), comm);
219 
220 #endif
221 
222     }
223 
224   template <typename T>
Mpi_Allgather(T * sendBuf,T * recvBuf,int count,MPI_Comm comm)225     int Mpi_Allgather(T* sendBuf, T* recvBuf, int count, MPI_Comm comm) {
226 #ifdef __PROFILE_WITH_BARRIER__
227       MPI_Barrier(comm);
228 #endif
229 
230 #ifdef __USE_A2A_FOR_MPI_ALLGATHER__
231 
232       int npes;
233       MPI_Comm_size(comm, &npes);
234       T* dummySendBuf = new T[count*npes];
235       assert(dummySendBuf);
236       #pragma omp parallel for
237       for(int i = 0; i < npes; i++) {
238         for(int j = 0; j < count; j++) {
239           dummySendBuf[(i*count) + j] = sendBuf[j];
240         }
241       }
242       par::Mpi_Alltoall<T>(dummySendBuf, recvBuf, count, comm);
243       delete [] dummySendBuf;
244       return 1;
245 
246 #else
247 
248       return MPI_Allgather(sendBuf, count, par::Mpi_datatype<T>::value(),
249           recvBuf, count, par::Mpi_datatype<T>::value(), comm);
250 
251 #endif
252 
253     }
254 
255   template <typename T>
Mpi_Alltoallv_sparse(T * sendbuf,int * sendcnts,int * sdispls,T * recvbuf,int * recvcnts,int * rdispls,MPI_Comm comm)256     int Mpi_Alltoallv_sparse(T* sendbuf, int* sendcnts, int* sdispls,
257         T* recvbuf, int* recvcnts, int* rdispls, MPI_Comm comm) {
258 #ifdef __PROFILE_WITH_BARRIER__
259       MPI_Barrier(comm);
260 #endif
261 
262 #ifndef ALLTOALLV_FIX
263       return Mpi_Alltoallv
264         (sendbuf, sendcnts, sdispls,
265          recvbuf, recvcnts, rdispls, comm);
266 #else
267 
268       int npes, rank;
269       MPI_Comm_size(comm, &npes);
270       MPI_Comm_rank(comm, &rank);
271 
272       int commCnt = 0;
273 
274       #pragma omp parallel for reduction(+:commCnt)
275       for(int i = 0; i < rank; i++) {
276         if(sendcnts[i] > 0) {
277           commCnt++;
278         }
279         if(recvcnts[i] > 0) {
280           commCnt++;
281         }
282       }
283 
284       #pragma omp parallel for reduction(+:commCnt)
285       for(int i = (rank+1); i < npes; i++) {
286         if(sendcnts[i] > 0) {
287           commCnt++;
288         }
289         if(recvcnts[i] > 0) {
290           commCnt++;
291         }
292       }
293 
294       MPI_Request* requests = new MPI_Request[commCnt];
295       assert(requests);
296 
297       MPI_Status* statuses = new MPI_Status[commCnt];
298       assert(statuses);
299 
300       commCnt = 0;
301 
302       //First place all recv requests. Do not recv from self.
303       for(int i = 0; i < rank; i++) {
304         if(recvcnts[i] > 0) {
305           par::Mpi_Irecv<T>( &(recvbuf[rdispls[i]]) , recvcnts[i], i, 1,
306               comm, &(requests[commCnt]) );
307           commCnt++;
308         }
309       }
310 
311       for(int i = (rank + 1); i < npes; i++) {
312         if(recvcnts[i] > 0) {
313           par::Mpi_Irecv<T>( &(recvbuf[rdispls[i]]) , recvcnts[i], i, 1,
314               comm, &(requests[commCnt]) );
315           commCnt++;
316         }
317       }
318 
319       //Next send the messages. Do not send to self.
320       for(int i = 0; i < rank; i++) {
321         if(sendcnts[i] > 0) {
322           par::Mpi_Issend<T>( &(sendbuf[sdispls[i]]), sendcnts[i], i, 1,
323               comm, &(requests[commCnt]) );
324           commCnt++;
325         }
326       }
327 
328       for(int i = (rank + 1); i < npes; i++) {
329         if(sendcnts[i] > 0) {
330           par::Mpi_Issend<T>( &(sendbuf[sdispls[i]]), sendcnts[i],
331               i, 1, comm, &(requests[commCnt]) );
332           commCnt++;
333         }
334       }
335 
336       //Now copy local portion.
337 #ifdef __DEBUG_PAR__
338       assert(sendcnts[rank] == recvcnts[rank]);
339 #endif
340 
341       #pragma omp parallel for
342       for(int i = 0; i < sendcnts[rank]; i++) {
343         recvbuf[rdispls[rank] + i] = sendbuf[sdispls[rank] + i];
344       }
345 
346       MPI_Waitall(commCnt, requests, statuses);
347 
348       delete [] requests;
349       delete [] statuses;
350       return 1;
351 #endif
352     }
353 
354 //*
355   template <typename T>
Mpi_Alltoallv_dense(T * sbuff_,int * s_cnt_,int * sdisp_,T * rbuff_,int * r_cnt_,int * rdisp_,MPI_Comm c)356     int Mpi_Alltoallv_dense(T* sbuff_, int* s_cnt_, int* sdisp_,
357         T* rbuff_, int* r_cnt_, int* rdisp_, MPI_Comm c){
358 
359 #ifdef __PROFILE_WITH_BARRIER__
360       MPI_Barrier(comm);
361 #endif
362 
363 #ifndef ALLTOALLV_FIX
364       return Mpi_Alltoallv
365         (sbuff_, s_cnt_, sdisp_,
366          rbuff_, r_cnt_, rdisp_, c);
367 #else
368   int kway = KWAY;
369   int np, pid;
370   MPI_Comm_size(c, &np);
371   MPI_Comm_rank(c, &pid);
372   int range[2]={0,np};
373   int split_id, partner;
374 
375   std::vector<int> s_cnt(np);
376   #pragma omp parallel for
377   for(int i=0;i<np;i++){
378     s_cnt[i]=s_cnt_[i]*sizeof(T)+2*sizeof(int);
379   }
380   std::vector<int> sdisp(np); sdisp[0]=0;
381   omp_par::scan(&s_cnt[0],&sdisp[0],np);
382 
383   char* sbuff=new char[sdisp[np-1]+s_cnt[np-1]];
384   #pragma omp parallel for
385   for(int i=0;i<np;i++){
386     ((int*)&sbuff[sdisp[i]])[0]=s_cnt[i];
387     ((int*)&sbuff[sdisp[i]])[1]=pid;
388     memcpy(&sbuff[sdisp[i]]+2*sizeof(int),&sbuff_[sdisp_[i]],s_cnt[i]-2*sizeof(int));
389   }
390 
391   //int t_indx=0;
392   int iter_cnt=0;
393   while(range[1]-range[0]>1){
394     iter_cnt++;
395     if(kway>range[1]-range[0])
396       kway=range[1]-range[0];
397 
398     std::vector<int> new_range(kway+1);
399     for(int i=0;i<=kway;i++)
400       new_range[i]=(range[0]*(kway-i)+range[1]*i)/kway;
401     int p_class=(std::upper_bound(&new_range[0],&new_range[kway],pid)-&new_range[0]-1);
402     int new_np=new_range[p_class+1]-new_range[p_class];
403     int new_pid=pid-new_range[p_class];
404 
405     //Communication.
406     {
407       std::vector<int> r_cnt    (new_np*kway, 0);
408       std::vector<int> r_cnt_ext(new_np*kway, 0);
409       //Exchange send sizes.
410       for(int i=0;i<kway;i++){
411         MPI_Status status;
412         int cmp_np=new_range[i+1]-new_range[i];
413         int partner=(new_pid<cmp_np?       new_range[i]+new_pid: new_range[i+1]-1) ;
414         assert(     (new_pid<cmp_np? true: new_range[i]+new_pid==new_range[i+1]  )); //Remove this.
415         MPI_Sendrecv(&s_cnt[new_range[i]-new_range[0]], cmp_np, MPI_INT, partner, 0,
416                      &r_cnt[new_np   *i ], new_np, MPI_INT, partner, 0, c, &status);
417 
418         //Handle extra communication.
419         if(new_pid==new_np-1 && cmp_np>new_np){
420           int partner=new_range[i+1]-1;
421           std::vector<int> s_cnt_ext(cmp_np, 0);
422           MPI_Sendrecv(&s_cnt_ext[       0], cmp_np, MPI_INT, partner, 0,
423                        &r_cnt_ext[new_np*i], new_np, MPI_INT, partner, 0, c, &status);
424         }
425       }
426 
427       //Allocate receive buffer.
428       std::vector<int> rdisp    (new_np*kway, 0);
429       std::vector<int> rdisp_ext(new_np*kway, 0);
430       int rbuff_size, rbuff_size_ext;
431       char *rbuff, *rbuff_ext;
432       {
433         omp_par::scan(&r_cnt    [0], &rdisp    [0],new_np*kway);
434         omp_par::scan(&r_cnt_ext[0], &rdisp_ext[0],new_np*kway);
435         rbuff_size     = rdisp    [new_np*kway-1] + r_cnt    [new_np*kway-1];
436         rbuff_size_ext = rdisp_ext[new_np*kway-1] + r_cnt_ext[new_np*kway-1];
437         rbuff     = new char[rbuff_size    ];
438         rbuff_ext = new char[rbuff_size_ext];
439       }
440 
441       //Sendrecv data.
442       //*
443       int my_block=kway;
444       while(pid<new_range[my_block]) my_block--;
445 //      MPI_Barrier(c);
446       for(int i_=0;i_<=kway/2;i_++){
447         int i1=(my_block+i_)%kway;
448         int i2=(my_block+kway-i_)%kway;
449 
450         for(int j=0;j<(i_==0 || i_==kway/2?1:2);j++){
451           int i=(i_==0?i1:((j+my_block/i_)%2?i1:i2));
452           MPI_Status status;
453           int cmp_np=new_range[i+1]-new_range[i];
454           int partner=(new_pid<cmp_np?       new_range[i]+new_pid: new_range[i+1]-1) ;
455 
456           int send_dsp     =sdisp[new_range[i  ]-new_range[0]  ];
457           int send_dsp_last=sdisp[new_range[i+1]-new_range[0]-1];
458           int send_cnt     =s_cnt[new_range[i+1]-new_range[0]-1]+send_dsp_last-send_dsp;
459 
460 //          ttt=omp_get_wtime();
461           MPI_Sendrecv(&sbuff[send_dsp], send_cnt, MPI_BYTE, partner, 0,
462                        &rbuff[rdisp[new_np  * i ]], r_cnt[new_np  *(i+1)-1]+rdisp[new_np  *(i+1)-1]-rdisp[new_np  * i ], MPI_BYTE, partner, 0, c, &status);
463 
464           //Handle extra communication.
465           if(pid==new_np-1 && cmp_np>new_np){
466             int partner=new_range[i+1]-1;
467             std::vector<int> s_cnt_ext(cmp_np, 0);
468             MPI_Sendrecv(                       NULL,                                                                       0, MPI_BYTE, partner, 0,
469                          &rbuff[rdisp_ext[new_np*i]], r_cnt_ext[new_np*(i+1)-1]+rdisp_ext[new_np*(i+1)-1]-rdisp_ext[new_np*i], MPI_BYTE, partner, 0, c, &status);
470           }
471         }
472       }
473 
474       //Rearrange received data.
475       {
476         if(sbuff!=NULL) delete[] sbuff;
477         sbuff=new char[rbuff_size+rbuff_size_ext];
478 
479         std::vector<int>  cnt_new(2*new_np*kway, 0);
480         std::vector<int> disp_new(2*new_np*kway, 0);
481         for(int i=0;i<new_np;i++)
482         for(int j=0;j<kway;j++){
483           cnt_new[(i*2  )*kway+j]=r_cnt    [j*new_np+i];
484           cnt_new[(i*2+1)*kway+j]=r_cnt_ext[j*new_np+i];
485         }
486         omp_par::scan(&cnt_new[0], &disp_new[0],2*new_np*kway);
487 
488         #pragma omp parallel for
489         for(int i=0;i<new_np;i++)
490         for(int j=0;j<kway;j++){
491           memcpy(&sbuff[disp_new[(i*2  )*kway+j]], &rbuff    [rdisp    [j*new_np+i]], r_cnt    [j*new_np+i]);
492           memcpy(&sbuff[disp_new[(i*2+1)*kway+j]], &rbuff_ext[rdisp_ext[j*new_np+i]], r_cnt_ext[j*new_np+i]);
493         }
494 
495         //Free memory.
496         if(rbuff    !=NULL) delete[] rbuff    ;
497         if(rbuff_ext!=NULL) delete[] rbuff_ext;
498 
499         s_cnt.clear();
500         s_cnt.resize(new_np,0);
501         sdisp.resize(new_np);
502         for(int i=0;i<new_np;i++){
503           for(int j=0;j<2*kway;j++)
504             s_cnt[i]+=cnt_new[i*2*kway+j];
505           sdisp[i]=disp_new[i*2*kway];
506         }
507       }
508     }
509 
510     range[0]=new_range[p_class  ];
511     range[1]=new_range[p_class+1];
512   }
513 
514   //Copy data to rbuff_.
515   std::vector<char*> buff_ptr(np);
516   char* tmp_ptr=sbuff;
517   for(int i=0;i<np;i++){
518     int& blk_size=((int*)tmp_ptr)[0];
519     buff_ptr[i]=tmp_ptr;
520     tmp_ptr+=blk_size;
521   }
522   #pragma omp parallel for
523   for(int i=0;i<np;i++){
524     int& blk_size=((int*)buff_ptr[i])[0];
525     int& src_pid=((int*)buff_ptr[i])[1];
526     assert(blk_size-2*sizeof(int)<=r_cnt_[src_pid]*sizeof(T));
527     memcpy(&rbuff_[rdisp_[src_pid]],buff_ptr[i]+2*sizeof(int),blk_size-2*sizeof(int));
528   }
529 
530   //Free memory.
531   if(sbuff   !=NULL) delete[] sbuff;
532 return 1;
533 #endif
534 
535     }
536 
537 
538 	template<typename T>
defaultWeight(const T * a)539     unsigned int defaultWeight(const T *a){
540       return 1;
541     }
542 
543   template<typename T>
partitionW(std::vector<T> & nodeList,unsigned int (* getWeight)(const T *),MPI_Comm comm)544     int partitionW(std::vector<T>& nodeList, unsigned int (*getWeight)(const T *), MPI_Comm comm){
545 #ifdef __PROFILE_WITH_BARRIER__
546       MPI_Barrier(comm);
547 #endif
548       int npes;
549 
550       MPI_Comm_size(comm, &npes);
551 
552       if(getWeight == NULL) {
553         getWeight = par::defaultWeight<T>;
554       }
555 
556       int rank;
557 
558       MPI_Comm_rank(comm, &rank);
559 
560       MPI_Request request;
561       MPI_Status status;
562       const bool nEmpty = nodeList.empty();
563 
564       DendroIntL  off1= 0, off2= 0, localWt= 0, totalWt = 0;
565 
566       DendroIntL* wts = NULL;
567       DendroIntL* lscn = NULL;
568       DendroIntL nlSize = nodeList.size();
569       if(nlSize) {
570         wts = new DendroIntL[nlSize];
571         assert(wts);
572 
573         lscn= new DendroIntL[nlSize];
574         assert(lscn);
575       }
576 
577       // First construct arrays of id and wts.
578       #pragma omp parallel for reduction(+:localWt)
579       for (DendroIntL i = 0; i < nlSize; i++){
580         wts[i] = (*getWeight)( &(nodeList[i]) );
581         localWt+=wts[i];
582       }
583 
584 #ifdef __DEBUG_PAR__
585       MPI_Barrier(comm);
586       if(!rank) {
587         std::cout<<"Partition: Stage-1 passed."<<std::endl;
588       }
589       MPI_Barrier(comm);
590 #endif
591 
592       // compute the total weight of the problem ...
593       par::Mpi_Allreduce<DendroIntL>(&localWt, &totalWt, 1, MPI_SUM, comm);
594 
595       // perform a local scan on the weights first ...
596       DendroIntL zero = 0;
597       if(!nEmpty) {
598         lscn[0]=wts[0];
599 //        for (DendroIntL i = 1; i < nlSize; i++) {
600 //          lscn[i] = wts[i] + lscn[i-1];
601 //        }//end for
602         omp_par::scan(&wts[1],lscn,nlSize);
603         // now scan with the final members of
604         par::Mpi_Scan<DendroIntL>(lscn+nlSize-1, &off1, 1, MPI_SUM, comm );
605       } else{
606         par::Mpi_Scan<DendroIntL>(&zero, &off1, 1, MPI_SUM, comm );
607       }
608 
609       // communicate the offsets ...
610       if (rank < (npes-1)){
611         par::Mpi_Issend<DendroIntL>( &off1, 1, rank+1, 0, comm, &request );
612       }
613       if (rank){
614         par::Mpi_Recv<DendroIntL>( &off2, 1, rank-1, 0, comm, &status );
615       }
616       else{
617         off2 = 0;
618       }
619 
620       // add offset to local array
621       #pragma omp parallel for
622       for (DendroIntL i = 0; i < nlSize; i++) {
623         lscn[i] = lscn[i] + off2;       // This has the global scan results now ...
624       }//end for
625 
626 #ifdef __DEBUG_PAR__
627       MPI_Barrier(comm);
628       if(!rank) {
629         std::cout<<"Partition: Stage-2 passed."<<std::endl;
630       }
631       MPI_Barrier(comm);
632 #endif
633 
634       int * sendSz = new int [npes];
635       assert(sendSz);
636 
637       int * recvSz = new int [npes];
638       assert(recvSz);
639 
640       int * sendOff = new int [npes];
641       assert(sendOff);
642       sendOff[0] = 0;
643 
644       int * recvOff = new int [npes];
645       assert(recvOff);
646       recvOff[0] = 0;
647 
648       // compute the partition offsets and sizes so that All2Allv can be performed.
649       // initialize ...
650 
651       #pragma omp parallel for
652       for (int i = 0; i < npes; i++) {
653         sendSz[i] = 0;
654       }
655 
656       // Now determine the average load ...
657       DendroIntL npesLong = npes;
658       DendroIntL avgLoad = (totalWt/npesLong);
659 
660       DendroIntL extra = (totalWt%npesLong);
661 
662       //The Heart of the algorithm....
663       if(avgLoad > 0) {
664         for (DendroIntL i = 0; i < nlSize; i++) {
665           if(lscn[i] == 0) {
666             sendSz[0]++;
667           }else {
668             int ind=0;
669             if ( lscn[i] <= (extra*(avgLoad + 1)) ) {
670               ind = ((lscn[i] - 1)/(avgLoad + 1));
671             }else {
672               ind = ((lscn[i] - (1 + extra))/avgLoad);
673             }
674             assert(ind < npes);
675             sendSz[ind]++;
676           }//end if-else
677         }//end for */
678 /*
679         //This is more effecient and parallelizable than the above.
680         //This has a bug trying a simpler approach below.
681         int ind_min,ind_max;
682         ind_min=(lscn[0]*npesLong)/totalWt-1;
683         ind_max=(lscn[nlSize-1]*npesLong)/totalWt+2;
684         if(ind_min< 0       )ind_min=0;
685         if(ind_max>=npesLong)ind_max=npesLong;
686         #pragma omp parallel for
687         for(int i=ind_min;i<ind_max;i++){
688           DendroIntL wt1=(totalWt*i)/npesLong;
689           DendroIntL wt2=(totalWt*(i+1))/npesLong;
690           int end = std::upper_bound(&lscn[0], &lscn[nlSize], wt2, std::less<DendroIntL>())-&lscn[0];
691           int start = std::upper_bound(&lscn[0], &lscn[nlSize], wt1, std::less<DendroIntL>())-&lscn[0];
692           if(i==npesLong-1)end  =nlSize;
693           if(i==         0)start=0     ;
694           sendSz[i]=end-start;
695         }// */
696 
697 #ifdef __DEBUG_PAR__
698         int tmp_sum=0;
699         for(int i=0;i<npes;i++) tmp_sum+=sendSz[i];
700         assert(tmp_sum==nlSize);
701 #endif
702 
703       }else {
704         sendSz[0]+= nlSize;
705       }//end if-else
706 
707 #ifdef __DEBUG_PAR__
708       MPI_Barrier(comm);
709       if(!rank) {
710         std::cout<<"Partition: Stage-3 passed."<<std::endl;
711       }
712       MPI_Barrier(comm);
713 #endif
714 
715       if(rank < (npes-1)) {
716         MPI_Status statusWait;
717         MPI_Wait(&request, &statusWait);
718       }
719 
720       // communicate with other procs how many you shall be sending and get how
721       // many to recieve from whom.
722       par::Mpi_Alltoall<int>(sendSz, recvSz, 1, comm);
723 
724 #ifdef __DEBUG_PAR__
725       DendroIntL totSendToOthers = 0;
726       DendroIntL totRecvFromOthers = 0;
727       for (int i = 0; i < npes; i++) {
728         if(rank != i) {
729           totSendToOthers += sendSz[i];
730           totRecvFromOthers += recvSz[i];
731         }
732       }
733 #endif
734 
735       DendroIntL nn=0; // new value of nlSize, ie the local nodes.
736       #pragma omp parallel for reduction(+:nn)
737       for (int i = 0; i < npes; i++) {
738         nn += recvSz[i];
739       }
740 
741       // compute offsets ...
742       omp_par::scan(sendSz,sendOff,npes);
743       omp_par::scan(recvSz,recvOff,npes);
744 
745 #ifdef __DEBUG_PAR__
746       MPI_Barrier(comm);
747       if(!rank) {
748         std::cout<<"Partition: Stage-4 passed."<<std::endl;
749       }
750 			MPI_Barrier(comm);
751 #endif
752 
753       // allocate memory for the new arrays ...
754       std::vector<T > newNodes(nn);
755 
756 #ifdef __DEBUG_PAR__
757       MPI_Barrier(comm);
758       if(!rank) {
759         std::cout<<"Partition: Final alloc successful."<<std::endl;
760       }
761       MPI_Barrier(comm);
762 #endif
763 
764       // perform All2All  ...
765       T* nodeListPtr = NULL;
766       T* newNodesPtr = NULL;
767       if(!nodeList.empty()) {
768         nodeListPtr = &(*(nodeList.begin()));
769       }
770       if(!newNodes.empty()) {
771         newNodesPtr = &(*(newNodes.begin()));
772       }
773       par::Mpi_Alltoallv_sparse<T>(nodeListPtr, sendSz, sendOff,
774           newNodesPtr, recvSz, recvOff, comm);
775 
776 #ifdef __DEBUG_PAR__
777       MPI_Barrier(comm);
778       if(!rank) {
779         std::cout<<"Partition: Stage-5 passed."<<std::endl;
780       }
781       MPI_Barrier(comm);
782 #endif
783 
784       // reset the pointer ...
785       swap(nodeList, newNodes);
786       newNodes.clear();
787 
788       // clean up...
789       if(!nEmpty) {
790         delete [] lscn;
791         delete [] wts;
792       }
793       delete [] sendSz;
794       sendSz = NULL;
795 
796       delete [] sendOff;
797       sendOff = NULL;
798 
799       delete [] recvSz;
800       recvSz = NULL;
801 
802       delete [] recvOff;
803       recvOff = NULL;
804 
805 #ifdef __DEBUG_PAR__
806       MPI_Barrier(comm);
807       if(!rank) {
808         std::cout<<"Partition: Stage-6 passed."<<std::endl;
809       }
810       MPI_Barrier(comm);
811 #endif
812 
813     }//end function
814 
815   /* Hari Notes ....
816    *
817    * Avoid unwanted allocations within Hypersort ...
818    *
819    * 1. try to sort in place ... no output buffer, user can create a copy if
820    *    needed.
821    * 2. have a std::vector<T> container for rbuff. the space required can be
822    *    reserved before doing MPI_SendRecv
823    * 3. alternatively, keep a send buffer and recv into original buffer.
824    *
825    */
826   template<typename T>
HyperQuickSort(std::vector<T> & arr,MPI_Comm comm_)827     int HyperQuickSort(std::vector<T>& arr, MPI_Comm comm_){ // O( ((N/p)+log(p))*(log(N/p)+log(p)) )
828 #ifdef __PROFILE_WITH_BARRIER__
829       MPI_Barrier(comm);
830 #endif
831 #ifdef _PROFILE_SORT
832     long bytes_comm=0;
833     total_sort.start();
834 #endif
835 
836 
837       // Copy communicator.
838       MPI_Comm comm=comm_;
839 
840       // Get comm size and rank.
841       int npes, myrank, rank_;
842       MPI_Comm_size(comm, &npes);
843       MPI_Comm_rank(comm, &myrank);
844       rank_ = myrank;
845 
846       if(npes==1){
847 #ifdef _PROFILE_SORT
848 				seq_sort.start();
849 #endif
850 				omp_par::merge_sort(&arr[0],&arr[arr.size()]);
851 #ifdef _PROFILE_SORT
852 				seq_sort.stop();
853 				total_sort.stop();
854 #endif
855       }
856       // buffers ... keeping all allocations together
857       std::vector<T>  commBuff;
858       std::vector<T>  mergeBuff;
859       std::vector<int> glb_splt_cnts(npes);
860       std::vector<int> glb_splt_disp(npes,0);
861 
862 
863       int omp_p=omp_get_max_threads();
864       srand(myrank);
865 
866       // Local and global sizes. O(log p)
867       long totSize, nelem = arr.size(); assert(nelem);
868       par::Mpi_Allreduce<long>(&nelem, &totSize, 1, MPI_SUM, comm);
869       long nelem_ = nelem;
870 
871       // Local sort.
872 #ifdef _PROFILE_SORT
873 			seq_sort.start();
874 #endif
875       omp_par::merge_sort(&arr[0], &arr[arr.size()]);
876 #ifdef _PROFILE_SORT
877 			seq_sort.stop();
878 #endif
879 
880       // Binary split and merge in each iteration.
881       while(npes>1 && totSize>0){ // O(log p) iterations.
882 
883         //Determine splitters. O( log(N/p) + log(p) )
884 #ifdef _PROFILE_SORT
885     hyper_compute_splitters.start();
886 #endif
887         T split_key;
888         long totSize_new;
889         //while(true)
890         {
891           // Take random splitters. O( 1 ) -- Let p * splt_count = glb_splt_count = const = 100~1000
892           int splt_count = (1000*nelem)/totSize;
893           if (npes>1000)
894             splt_count = ( ((float)rand()/(float)RAND_MAX)*totSize < (1000*nelem) ? 1 : 0 );
895 
896           if ( splt_count > nelem )
897 						splt_count = nelem;
898 
899           std::vector<T> splitters(splt_count);
900           for(size_t i=0;i<splt_count;i++)
901             splitters[i]=arr[rand()%nelem];
902 
903           // Gather all splitters. O( log(p) )
904           int glb_splt_count;
905 
906           par::Mpi_Allgather<int>(&splt_count, &glb_splt_cnts[0], 1, comm);
907           omp_par::scan(&glb_splt_cnts[0],&glb_splt_disp[0],npes);
908 
909           glb_splt_count = glb_splt_cnts[npes-1] + glb_splt_disp[npes-1];
910 
911           std::vector<T> glb_splitters(glb_splt_count);
912 
913           MPI_Allgatherv(&splitters[0], splt_count, par::Mpi_datatype<T>::value(),
914                          &glb_splitters[0], &glb_splt_cnts[0], &glb_splt_disp[0],
915                          par::Mpi_datatype<T>::value(), comm);
916 
917           // Determine split key. O( log(N/p) + log(p) )
918           std::vector<long> disp(glb_splt_count,0);
919 
920           if(nelem>0){
921             #pragma omp parallel for
922             for(size_t i=0;i<glb_splt_count;i++){
923               disp[i]=std::lower_bound(&arr[0], &arr[nelem], glb_splitters[i]) - &arr[0];
924             }
925           }
926           std::vector<long> glb_disp(glb_splt_count,0);
927           MPI_Allreduce(&disp[0], &glb_disp[0], glb_splt_count, par::Mpi_datatype<long>::value(), MPI_SUM, comm);
928 
929           long* split_disp = &glb_disp[0];
930           for(size_t i=0; i<glb_splt_count; i++)
931             if ( abs(glb_disp[i] - totSize/2) < abs(*split_disp - totSize/2) )
932 							split_disp = &glb_disp[i];
933           split_key = glb_splitters[split_disp - &glb_disp[0]];
934 
935           totSize_new=(myrank<=(npes-1)/2?*split_disp:totSize-*split_disp);
936           //double err=(((double)*split_disp)/(totSize/2))-1.0;
937           //if(fabs(err)<0.01 || npes<=16) break;
938           //else if(!myrank) std::cout<<err<<'\n';
939         }
940 #ifdef _PROFILE_SORT
941     hyper_compute_splitters.stop();
942 #endif
943 
944         // Split problem into two. O( N/p )
945         int split_id=(npes-1)/2;
946         {
947 #ifdef _PROFILE_SORT
948       hyper_communicate.start();
949 #endif
950           int new_p0 = (myrank<=split_id?0:split_id+1);
951           int cmp_p0 = (myrank> split_id?0:split_id+1);
952           int new_np = (myrank<=split_id? split_id+1: npes-split_id-1);
953           int cmp_np = (myrank> split_id? split_id+1: npes-split_id-1);
954 
955           int partner = myrank+cmp_p0-new_p0;
956           if(partner>=npes) partner=npes-1;
957           assert(partner>=0);
958 
959           bool extra_partner=( npes%2==1  && npes-1==myrank );
960 
961           // Exchange send sizes.
962           char *sbuff, *lbuff;
963 
964           int     rsize=0,     ssize=0, lsize=0;
965           int ext_rsize=0, ext_ssize=0;
966           size_t split_indx=(nelem>0?std::lower_bound(&arr[0], &arr[nelem], split_key)-&arr[0]:0);
967           ssize=       (myrank> split_id? split_indx: nelem-split_indx )*sizeof(T);
968           sbuff=(char*)(myrank> split_id? &arr[0]   :  &arr[split_indx]);
969           lsize=       (myrank<=split_id? split_indx: nelem-split_indx )*sizeof(T);
970           lbuff=(char*)(myrank<=split_id? &arr[0]   :  &arr[split_indx]);
971 
972           MPI_Status status;
973           MPI_Sendrecv                  (&    ssize,1,MPI_INT, partner,0,   &    rsize,1,MPI_INT, partner,   0,comm,&status);
974           if(extra_partner) MPI_Sendrecv(&ext_ssize,1,MPI_INT,split_id,0,   &ext_rsize,1,MPI_INT,split_id,   0,comm,&status);
975 
976           // Exchange data.
977           commBuff.reserve(rsize/sizeof(T));
978           char*     rbuff = (char *)(&commBuff[0]);
979           char* ext_rbuff=(ext_rsize>0? new char[ext_rsize]: NULL);
980           MPI_Sendrecv                  (sbuff,ssize,MPI_BYTE, partner,0,       rbuff,    rsize,MPI_BYTE, partner,   0,comm,&status);
981           if(extra_partner) MPI_Sendrecv( NULL,    0,MPI_BYTE,split_id,0,   ext_rbuff,ext_rsize,MPI_BYTE,split_id,   0,comm,&status);
982 #ifdef _PROFILE_SORT
983           bytes_comm += ssize;
984           hyper_communicate.stop();
985           hyper_merge.start();
986 #endif
987 
988           int nbuff_size=lsize+rsize+ext_rsize;
989           mergeBuff.reserve(nbuff_size/sizeof(T));
990           char* nbuff= (char *)(&mergeBuff[0]);  // new char[nbuff_size];
991           omp_par::merge<T*>((T*)lbuff, (T*)&lbuff[lsize], (T*)rbuff, (T*)&rbuff[rsize], (T*)nbuff, omp_p, std::less<T>());
992           if(ext_rsize>0 && nbuff!=NULL){
993             // XXX case not handled
994             char* nbuff1= new char[nbuff_size];
995             omp_par::merge<T*>((T*)nbuff, (T*)&nbuff[lsize+rsize], (T*)ext_rbuff, (T*)&ext_rbuff[ext_rsize], (T*)nbuff1, omp_p, std::less<T>());
996             if(nbuff!=NULL) delete[] nbuff;
997             nbuff=nbuff1;
998           }
999 
1000           // Copy new data.
1001           totSize=totSize_new;
1002           nelem = nbuff_size/sizeof(T);
1003           /*
1004           if(arr_!=NULL) delete[] arr_;
1005           arr_=(T*) nbuff; nbuff=NULL;
1006           */
1007           mergeBuff.swap(arr);
1008 
1009           //Free memory.
1010           // if(    rbuff!=NULL) delete[]     rbuff;
1011           if(ext_rbuff!=NULL) delete[] ext_rbuff;
1012 #ifdef _PROFILE_SORT
1013       hyper_merge.stop();
1014 #endif
1015         }
1016 
1017         {// Split comm.  O( log(p) ) ??
1018 #ifdef _PROFILE_SORT
1019     hyper_comm_split.start();
1020 #endif
1021           MPI_Comm scomm;
1022           MPI_Comm_split(comm, myrank<=split_id, myrank, &scomm );
1023           comm=scomm;
1024           npes  =(myrank<=split_id? split_id+1: npes  -split_id-1);
1025           myrank=(myrank<=split_id? myrank    : myrank-split_id-1);
1026 #ifdef _PROFILE_SORT
1027     hyper_comm_split.stop();
1028 #endif
1029         }
1030       }
1031 
1032       // par::partitionW<T>(SortedElem, NULL , comm_);
1033 			// par::partitionW<T>(arr, NULL , comm_);
1034 
1035 #ifdef _PROFILE_SORT
1036   total_sort.stop();
1037 
1038       par::Mpi_Allreduce<long>(&bytes_comm, &total_bytes, 1, MPI_SUM, comm_);
1039       // if(!rank_) printf("Total comm is %ld bytes\n", total_comm);
1040 
1041 #endif
1042 return 1;
1043     }//end function
1044 
1045   //--------------------------------------------------------------------------------
1046   template<typename T>
HyperQuickSort(std::vector<T> & arr,std::vector<T> & SortedElem,MPI_Comm comm_)1047     int HyperQuickSort(std::vector<T>& arr, std::vector<T> & SortedElem, MPI_Comm comm_){ // O( ((N/p)+log(p))*(log(N/p)+log(p)) )
1048 #ifdef __PROFILE_WITH_BARRIER__
1049       MPI_Barrier(comm);
1050 #endif
1051 	#ifdef _PROFILE_SORT
1052 		 		total_sort.start();
1053 	#endif
1054 
1055       // Copy communicator.
1056       MPI_Comm comm=comm_;
1057 
1058       // Get comm size and rank.
1059       int npes, myrank, myrank_;
1060       MPI_Comm_size(comm, &npes);
1061       MPI_Comm_rank(comm, &myrank); myrank_=myrank;
1062       if(npes==1){
1063         // @dhairya isn't this wrong for the !sort-in-place case ...
1064 #ifdef _PROFILE_SORT
1065 		 		seq_sort.start();
1066 #endif
1067 				omp_par::merge_sort(&arr[0],&arr[arr.size()]);
1068 #ifdef _PROFILE_SORT
1069 		 		seq_sort.stop();
1070 #endif
1071 				SortedElem  = arr;
1072 #ifdef _PROFILE_SORT
1073 		 		total_sort.stop();
1074 #endif
1075       }
1076 
1077       int omp_p=omp_get_max_threads();
1078       srand(myrank);
1079 
1080       // Local and global sizes. O(log p)
1081       DendroIntL totSize, nelem = arr.size(); assert(nelem);
1082       par::Mpi_Allreduce<DendroIntL>(&nelem, &totSize, 1, MPI_SUM, comm);
1083       DendroIntL nelem_ = nelem;
1084 
1085       // Local sort.
1086 #ifdef _PROFILE_SORT
1087 		 	seq_sort.start();
1088 #endif
1089       T* arr_=new T[nelem]; memcpy (&arr_[0], &arr[0], nelem*sizeof(T));
1090 			omp_par::merge_sort(&arr_[0], &arr_[arr.size()]);
1091 #ifdef _PROFILE_SORT
1092 		 	seq_sort.stop();
1093 #endif
1094       // Binary split and merge in each iteration.
1095       while(npes>1 && totSize>0){ // O(log p) iterations.
1096 
1097         //Determine splitters. O( log(N/p) + log(p) )
1098 #ifdef _PROFILE_SORT
1099 			 	hyper_compute_splitters.start();
1100 #endif
1101         T split_key;
1102         DendroIntL totSize_new;
1103         //while(true)
1104         {
1105           // Take random splitters. O( 1 ) -- Let p * splt_count = glb_splt_count = const = 100~1000
1106           int splt_count=(1000*nelem)/totSize;
1107           if(npes>1000) splt_count=(((float)rand()/(float)RAND_MAX)*totSize<(1000*nelem)?1:0);
1108           if(splt_count>nelem) splt_count=nelem;
1109           std::vector<T> splitters(splt_count);
1110           for(size_t i=0;i<splt_count;i++)
1111             splitters[i]=arr_[rand()%nelem];
1112 
1113           // Gather all splitters. O( log(p) )
1114           int glb_splt_count;
1115           std::vector<int> glb_splt_cnts(npes);
1116           std::vector<int> glb_splt_disp(npes,0);
1117           par::Mpi_Allgather<int>(&splt_count, &glb_splt_cnts[0], 1, comm);
1118           omp_par::scan(&glb_splt_cnts[0],&glb_splt_disp[0],npes);
1119           glb_splt_count=glb_splt_cnts[npes-1]+glb_splt_disp[npes-1];
1120           std::vector<T> glb_splitters(glb_splt_count);
1121           MPI_Allgatherv(&    splitters[0], splt_count, par::Mpi_datatype<T>::value(),
1122                          &glb_splitters[0], &glb_splt_cnts[0], &glb_splt_disp[0],
1123                          par::Mpi_datatype<T>::value(), comm);
1124 
1125           // Determine split key. O( log(N/p) + log(p) )
1126           std::vector<DendroIntL> disp(glb_splt_count,0);
1127           if(nelem>0){
1128             #pragma omp parallel for
1129             for(size_t i=0;i<glb_splt_count;i++){
1130               disp[i]=std::lower_bound(&arr_[0], &arr_[nelem], glb_splitters[i])-&arr_[0];
1131             }
1132           }
1133           std::vector<DendroIntL> glb_disp(glb_splt_count,0);
1134           MPI_Allreduce(&disp[0], &glb_disp[0], glb_splt_count, par::Mpi_datatype<DendroIntL>::value(), MPI_SUM, comm);
1135 
1136           DendroIntL* split_disp=&glb_disp[0];
1137           for(size_t i=0;i<glb_splt_count;i++)
1138             if( labs(glb_disp[i]-totSize/2) < labs(*split_disp-totSize/2)) split_disp=&glb_disp[i];
1139           split_key=glb_splitters[split_disp-&glb_disp[0]];
1140 
1141           totSize_new=(myrank<=(npes-1)/2?*split_disp:totSize-*split_disp);
1142           //double err=(((double)*split_disp)/(totSize/2))-1.0;
1143           //if(fabs(err)<0.01 || npes<=16) break;
1144           //else if(!myrank) std::cout<<err<<'\n';
1145         }
1146 #ifdef _PROFILE_SORT
1147 			 	hyper_compute_splitters.stop();
1148 #endif
1149 
1150         // Split problem into two. O( N/p )
1151         int split_id=(npes-1)/2;
1152         {
1153 #ifdef _PROFILE_SORT
1154 				 	hyper_communicate.start();
1155 #endif
1156 
1157           int new_p0=(myrank<=split_id?0:split_id+1);
1158           int cmp_p0=(myrank> split_id?0:split_id+1);
1159           int new_np=(myrank<=split_id? split_id+1: npes-split_id-1);
1160           int cmp_np=(myrank> split_id? split_id+1: npes-split_id-1);
1161 
1162           int partner = myrank+cmp_p0-new_p0;
1163           if(partner>=npes) partner=npes-1;
1164           assert(partner>=0);
1165 
1166           bool extra_partner=( npes%2==1  && npes-1==myrank );
1167 
1168           // Exchange send sizes.
1169           char *sbuff, *lbuff;
1170           int     rsize=0,     ssize=0, lsize=0;
1171           int ext_rsize=0, ext_ssize=0;
1172           size_t split_indx=(nelem>0?std::lower_bound(&arr_[0], &arr_[nelem], split_key)-&arr_[0]:0);
1173           ssize=       (myrank> split_id? split_indx: nelem-split_indx )*sizeof(T);
1174           sbuff=(char*)(myrank> split_id? &arr_[0]   :  &arr_[split_indx]);
1175           lsize=       (myrank<=split_id? split_indx: nelem-split_indx )*sizeof(T);
1176           lbuff=(char*)(myrank<=split_id? &arr_[0]   :  &arr_[split_indx]);
1177 
1178           MPI_Status status;
1179           MPI_Sendrecv                  (&    ssize,1,MPI_INT, partner,0,   &    rsize,1,MPI_INT, partner,   0,comm,&status);
1180           if(extra_partner) MPI_Sendrecv(&ext_ssize,1,MPI_INT,split_id,0,   &ext_rsize,1,MPI_INT,split_id,   0,comm,&status);
1181 
1182           // Exchange data.
1183           char*     rbuff=              new char[    rsize]       ;
1184           char* ext_rbuff=(ext_rsize>0? new char[ext_rsize]: NULL);
1185           MPI_Sendrecv                  (sbuff,ssize,MPI_BYTE, partner,0,       rbuff,    rsize,MPI_BYTE, partner,   0,comm,&status);
1186           if(extra_partner) MPI_Sendrecv( NULL,    0,MPI_BYTE,split_id,0,   ext_rbuff,ext_rsize,MPI_BYTE,split_id,   0,comm,&status);
1187 #ifdef _PROFILE_SORT
1188 				 	hyper_communicate.stop();
1189 				 	hyper_merge.start();
1190 #endif
1191           int nbuff_size=lsize+rsize+ext_rsize;
1192           char* nbuff= new char[nbuff_size];
1193           omp_par::merge<T*>((T*)lbuff, (T*)&lbuff[lsize], (T*)rbuff, (T*)&rbuff[rsize], (T*)nbuff, omp_p, std::less<T>());
1194           if(ext_rsize>0 && nbuff!=NULL){
1195             char* nbuff1= new char[nbuff_size];
1196             omp_par::merge<T*>((T*)nbuff, (T*)&nbuff[lsize+rsize], (T*)ext_rbuff, (T*)&ext_rbuff[ext_rsize], (T*)nbuff1, omp_p, std::less<T>());
1197             if(nbuff!=NULL) delete[] nbuff;
1198             nbuff=nbuff1;
1199           }
1200 
1201           // Copy new data.
1202           totSize=totSize_new;
1203           nelem = nbuff_size/sizeof(T);
1204           if(arr_!=NULL) delete[] arr_;
1205           arr_=(T*) nbuff; nbuff=NULL;
1206 
1207           //Free memory.
1208           if(    rbuff!=NULL) delete[]     rbuff;
1209           if(ext_rbuff!=NULL) delete[] ext_rbuff;
1210 #ifdef _PROFILE_SORT
1211 				 	hyper_merge.stop();
1212 #endif
1213         }
1214 
1215 #ifdef _PROFILE_SORT
1216 					hyper_comm_split.start();
1217 #endif
1218         {// Split comm.  O( log(p) ) ??
1219           MPI_Comm scomm;
1220           MPI_Comm_split(comm, myrank<=split_id, myrank, &scomm );
1221           comm=scomm;
1222           npes  =(myrank<=split_id? split_id+1: npes  -split_id-1);
1223           myrank=(myrank<=split_id? myrank    : myrank-split_id-1);
1224         }
1225 #ifdef _PROFILE_SORT
1226 				hyper_comm_split.stop();
1227 #endif
1228       }
1229 
1230       SortedElem.resize(nelem);
1231       SortedElem.assign(arr_, &arr_[nelem]);
1232       if(arr_!=NULL) delete[] arr_;
1233 
1234 #ifdef _PROFILE_SORT
1235 		 	sort_partitionw.start();
1236 #endif
1237 //      par::partitionW<T>(SortedElem, NULL , comm_);
1238 #ifdef _PROFILE_SORT
1239 		 	sort_partitionw.stop();
1240 #endif
1241 
1242 #ifdef _PROFILE_SORT
1243 		 	total_sort.stop();
1244 #endif
1245 return 1;
1246     }//end function
1247 // */
1248 
1249   template<typename T>
HyperQuickSort_kway(std::vector<T> & arr,std::vector<T> & SortedElem,MPI_Comm comm_)1250     int HyperQuickSort_kway(std::vector<T>& arr, std::vector<T> & SortedElem, MPI_Comm comm_) {
1251 #ifdef _PROFILE_SORT
1252 		total_sort.clear();
1253       seq_sort.clear();
1254       hyper_compute_splitters.clear();
1255       hyper_communicate.clear();
1256       hyper_merge.clear();
1257       hyper_comm_split.clear();
1258       sort_partitionw.clear();
1259       MPI_Barrier(comm_);
1260 
1261       total_sort.start();
1262 #endif
1263       unsigned int kway = KWAY;
1264       int omp_p=omp_get_max_threads();
1265 
1266       // Copy communicator.
1267       MPI_Comm comm=comm_;
1268 
1269       // Get comm size and rank.
1270       int npes, myrank;
1271       MPI_Comm_size(comm, &npes);
1272       MPI_Comm_rank(comm, &myrank);
1273       srand(myrank);
1274 
1275       // Local and global sizes. O(log p)
1276       size_t totSize, nelem = arr.size(); assert(nelem);
1277       par::Mpi_Allreduce<size_t>(&nelem, &totSize, 1, MPI_SUM, comm);
1278       std::vector<T> arr_(nelem*2); //Extra buffer.
1279       std::vector<T> arr__(nelem*2); //Extra buffer.
1280 
1281       // Local sort.
1282 #ifdef _PROFILE_SORT
1283       seq_sort.start();
1284 #endif
1285       omp_par::merge_sort(&arr[0], &arr[arr.size()]);
1286 #ifdef _PROFILE_SORT
1287       seq_sort.stop();
1288 #endif
1289 
1290       while(npes>1 && totSize>0){
1291         if(kway>npes) kway = npes;
1292         int blk_size=npes/kway; assert(blk_size*kway==npes);
1293         int blk_id=myrank/blk_size, new_pid=myrank%blk_size;
1294 
1295         // Determine splitters.
1296 #ifdef _PROFILE_SORT
1297         hyper_compute_splitters.start();
1298 #endif
1299         std::vector<T> split_key = par::Sorted_approx_Select(arr, kway-1, comm);
1300 #ifdef _PROFILE_SORT
1301         hyper_compute_splitters.stop();
1302 #endif
1303 
1304         {// Communication
1305 #ifdef _PROFILE_SORT
1306           hyper_communicate.start();
1307 #endif
1308           // Determine send_size.
1309           std::vector<int> send_size(kway), send_disp(kway+1); send_disp[0]=0; send_disp[kway]=arr.size();
1310           for(int i=1;i<kway;i++) send_disp[i]=std::lower_bound(&arr[0], &arr[arr.size()], split_key[i-1])-&arr[0];
1311           for(int i=0;i<kway;i++) send_size[i]=send_disp[i+1]-send_disp[i];
1312 
1313           // Get recv_size.
1314           int recv_iter=0;
1315           std::vector<T*> recv_ptr(kway);
1316           std::vector<size_t> recv_cnt(kway);
1317           std::vector<int> recv_size(kway), recv_disp(kway+1,0);
1318           for(int i_=0;i_<=kway/2;i_++){
1319             int i1=(blk_id+i_)%kway;
1320             int i2=(blk_id+kway-i_)%kway;
1321             MPI_Status status;
1322             for(int j=0;j<(i_==0 || i_==kway/2?1:2);j++){
1323               int i=(i_==0?i1:((j+blk_id/i_)%2?i1:i2));
1324               int partner=blk_size*i+new_pid;
1325               MPI_Sendrecv(&send_size[     i   ], 1, MPI_INT, partner, 0,
1326                            &recv_size[recv_iter], 1, MPI_INT, partner, 0, comm, &status);
1327               recv_disp[recv_iter+1]=recv_disp[recv_iter]+recv_size[recv_iter];
1328               recv_ptr[recv_iter]=&arr_[recv_disp[recv_iter]];
1329               recv_cnt[recv_iter]=recv_size[recv_iter];
1330               recv_iter++;
1331             }
1332           }
1333 
1334           // Communicate data.
1335           int asynch_count=2;
1336           recv_iter=0;
1337 					int merg_indx=2;
1338           std::vector<MPI_Request> reqst(kway*2);
1339           std::vector<MPI_Status> status(kway*2);
1340           arr_ .resize(recv_disp[kway]);
1341           arr__.resize(recv_disp[kway]);
1342           for(int i_=0;i_<=kway/2;i_++){
1343             int i1=(blk_id+i_)%kway;
1344             int i2=(blk_id+kway-i_)%kway;
1345             for(int j=0;j<(i_==0 || i_==kway/2?1:2);j++){
1346               int i=(i_==0?i1:((j+blk_id/i_)%2?i1:i2));
1347               int partner=blk_size*i+new_pid;
1348 
1349               if(recv_iter-asynch_count-1>=0) MPI_Waitall(2, &reqst[(recv_iter-asynch_count-1)*2], &status[(recv_iter-asynch_count-1)*2]);
1350               par::Mpi_Irecv <T>(&arr_[recv_disp[recv_iter]], recv_size[recv_iter], partner, 1, comm, &reqst[recv_iter*2+0]);
1351               par::Mpi_Issend<T>(&arr [send_disp[     i   ]], send_size[     i   ], partner, 1, comm, &reqst[recv_iter*2+1]);
1352               recv_iter++;
1353 
1354               int flag[2]={0,0};
1355               if(recv_iter>merg_indx) MPI_Test(&reqst[(merg_indx-1)*2],&flag[0],&status[(merg_indx-1)*2]);
1356               if(recv_iter>merg_indx) MPI_Test(&reqst[(merg_indx-2)*2],&flag[1],&status[(merg_indx-2)*2]);
1357               if(flag[0] && flag[1]){
1358                 T* A=&arr_[0]; T* B=&arr__[0];
1359                 for(int s=2;merg_indx%s==0;s*=2){
1360                   //std    ::merge(&A[recv_disp[merg_indx-s/2]],&A[recv_disp[merg_indx    ]],
1361                   //               &A[recv_disp[merg_indx-s  ]],&A[recv_disp[merg_indx-s/2]], &B[recv_disp[merg_indx-s]]);
1362                   omp_par::merge(&A[recv_disp[merg_indx-s/2]],&A[recv_disp[merg_indx    ]],
1363                                  &A[recv_disp[merg_indx-s  ]],&A[recv_disp[merg_indx-s/2]], &B[recv_disp[merg_indx-s]],omp_p,std::less<T>());
1364                   T* C=A; A=B; B=C; // Swap
1365                 }
1366                 merg_indx+=2;
1367               }
1368             }
1369           }
1370 #ifdef _PROFILE_SORT
1371 				hyper_communicate.stop();
1372 				hyper_merge.start();
1373 #endif
1374 					// Merge remaining parts.
1375           while(merg_indx<=(int)kway){
1376               MPI_Waitall(1, &reqst[(merg_indx-1)*2], &status[(merg_indx-1)*2]);
1377               MPI_Waitall(1, &reqst[(merg_indx-2)*2], &status[(merg_indx-2)*2]);
1378               {
1379                 T* A=&arr_[0]; T* B=&arr__[0];
1380                 for(int s=2;merg_indx%s==0;s*=2){
1381                   //std    ::merge(&A[recv_disp[merg_indx-s/2]],&A[recv_disp[merg_indx    ]],
1382                   //               &A[recv_disp[merg_indx-s  ]],&A[recv_disp[merg_indx-s/2]], &B[recv_disp[merg_indx-s]]);
1383                   omp_par::merge(&A[recv_disp[merg_indx-s/2]],&A[recv_disp[merg_indx    ]],
1384                                  &A[recv_disp[merg_indx-s  ]],&A[recv_disp[merg_indx-s/2]], &B[recv_disp[merg_indx-s]],omp_p,std::less<T>());
1385                   T* C=A; A=B; B=C; // Swap
1386                 }
1387                 merg_indx+=2;
1388               }
1389           }
1390 					{// Swap buffers.
1391 						int swap_cond=0;
1392             for(int s=2;kway%s==0;s*=2) swap_cond++;
1393 						if(swap_cond%2==0) swap(arr,arr_);
1394 						else swap(arr,arr__);
1395 					}
1396 				}
1397 
1398 #ifdef _PROFILE_SORT
1399 				hyper_merge.stop();
1400 				hyper_comm_split.start();
1401 #endif
1402 				{// Split comm. kway  O( log(p) ) ??
1403     	     MPI_Comm scomm;
1404       	   MPI_Comm_split(comm, blk_id, myrank, &scomm );
1405 					 if(comm!=comm_) MPI_Comm_free(&comm);
1406         	 comm = scomm;
1407 
1408 			     MPI_Comm_size(comm, &npes);
1409            MPI_Comm_rank(comm, &myrank);
1410     	  }
1411 #ifdef _PROFILE_SORT
1412 				hyper_comm_split.stop();
1413 #endif
1414       }
1415 #ifdef _PROFILE_SORT
1416 		 	total_sort.stop();
1417 #endif
1418 			SortedElem=arr;
1419 return 1;
1420     }
1421 
1422 		//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1423 
1424 	// for sc13 --	mem effecient HykSort
1425   template<typename T>
HyperQuickSort_kway(std::vector<T> & arr,MPI_Comm comm_)1426     int HyperQuickSort_kway(std::vector<T>& arr, MPI_Comm comm_) {
1427 #ifdef _PROFILE_SORT
1428 		total_sort.clear();
1429       seq_sort.clear();
1430       hyper_compute_splitters.clear();
1431       hyper_communicate.clear();
1432       hyper_merge.clear();
1433       hyper_comm_split.clear();
1434       sort_partitionw.clear();
1435       MPI_Barrier(comm_);
1436 
1437       total_sort.start();
1438 #endif
1439       unsigned int kway = KWAY;
1440       int omp_p = omp_get_max_threads();
1441 
1442       // Copy communicator.
1443       MPI_Comm comm = comm_;
1444 
1445       // Get comm size and rank.
1446       int npes, myrank;
1447       MPI_Comm_size(comm, &npes);
1448       MPI_Comm_rank(comm, &myrank);
1449 
1450 			srand(myrank);
1451 
1452       // Local and global sizes. O(log p)
1453       size_t totSize, nelem = arr.size(); assert(nelem);
1454 			par::Mpi_Allreduce<size_t>(&nelem, &totSize, 1, MPI_SUM, comm);
1455 
1456       // dummy array for now ...
1457 			std::vector<T> arr_(128); // (nelem*2); //Extra buffer.
1458       std::vector<T> arr__; // (nelem*2); //Extra buffer.
1459 
1460       // Local sort.
1461 #ifdef _PROFILE_SORT
1462       seq_sort.start();
1463 #endif
1464       omp_par::merge_sort(&arr[0], &arr[arr.size()]);
1465 #ifdef _PROFILE_SORT
1466       seq_sort.stop();
1467 #endif
1468 
1469       while(npes>1 && totSize>0){
1470         if(kway>npes) kway = npes;
1471         int blk_size=npes/kway; assert(blk_size*kway==npes);
1472         int blk_id=myrank/blk_size, new_pid=myrank%blk_size;
1473 
1474         // Determine splitters.
1475 #ifdef _PROFILE_SORT
1476         hyper_compute_splitters.start();
1477 #endif
1478         std::vector<T> split_key = par::Sorted_approx_Select(arr, kway-1, comm);
1479 #ifdef _PROFILE_SORT
1480         hyper_compute_splitters.stop();
1481 #endif
1482 
1483         {// Communication
1484 #ifdef _PROFILE_SORT
1485           hyper_communicate.start();
1486 #endif
1487           // Determine send_size.
1488           std::vector<int> send_size(kway), send_disp(kway+1); send_disp[0] = 0; send_disp[kway] = arr.size();
1489           for(int i=1;i<kway;i++) send_disp[i] = std::lower_bound(&arr[0], &arr[arr.size()], split_key[i-1]) - &arr[0];
1490           for(int i=0;i<kway;i++) send_size[i] = send_disp[i+1] - send_disp[i];
1491 
1492           // Get recv_size.
1493           int recv_iter=0;
1494           std::vector<T*> recv_ptr(kway);
1495           std::vector<size_t> recv_cnt(kway);
1496           std::vector<int> recv_size(kway), recv_disp(kway+1,0);
1497           for(int i_=0;i_<=kway/2;i_++){
1498             int i1=(blk_id+i_)%kway;
1499             int i2=(blk_id+kway-i_)%kway;
1500             MPI_Status status;
1501             for(int j=0;j<(i_==0 || i_==kway/2?1:2);j++){
1502               int i=(i_==0?i1:((j+blk_id/i_)%2?i1:i2));
1503               int partner=blk_size*i+new_pid;
1504               MPI_Sendrecv(&send_size[     i   ], 1, MPI_INT, partner, 0,
1505                            &recv_size[recv_iter], 1, MPI_INT, partner, 0, comm, &status);
1506               recv_disp[recv_iter+1] = recv_disp[recv_iter]+recv_size[recv_iter];
1507               recv_ptr[recv_iter]=&arr_[0] + recv_disp[recv_iter]; //! @hari - only setting address, doesnt need to be allocated yet (except for 0)
1508               recv_cnt[recv_iter]=recv_size[recv_iter];
1509               recv_iter++;
1510             }
1511           }
1512 
1513           // Communicate data.
1514           int asynch_count=2;
1515           recv_iter=0;
1516 					int merg_indx=2;
1517           std::vector<MPI_Request> reqst(kway*2);
1518           std::vector<MPI_Status> status(kway*2);
1519           arr_ .resize(recv_disp[kway]);
1520           arr__.resize(recv_disp[kway]);
1521           for(int i_=0;i_<=kway/2;i_++){
1522             int i1=(blk_id+i_)%kway;
1523             int i2=(blk_id+kway-i_)%kway;
1524             for(int j=0;j<(i_==0 || i_==kway/2?1:2);j++){
1525               int i=(i_==0?i1:((j+blk_id/i_)%2?i1:i2));
1526               int partner=blk_size*i+new_pid;
1527 
1528               if(recv_iter-asynch_count-1>=0) MPI_Waitall(2, &reqst[(recv_iter-asynch_count-1)*2], &status[(recv_iter-asynch_count-1)*2]);
1529 
1530 							par::Mpi_Irecv <T>(&arr_[recv_disp[recv_iter]], recv_size[recv_iter], partner, 1, comm, &reqst[recv_iter*2+0]);
1531               par::Mpi_Issend<T>(&arr [send_disp[     i   ]], send_size[     i   ], partner, 1, comm, &reqst[recv_iter*2+1]);
1532 
1533 							recv_iter++;
1534 
1535               int flag[2]={0,0};
1536               if ( recv_iter > merg_indx ) MPI_Test(&reqst[(merg_indx-1)*2], &flag[0], &status[(merg_indx-1)*2]);
1537               if ( recv_iter > merg_indx ) MPI_Test(&reqst[(merg_indx-2)*2], &flag[1], &status[(merg_indx-2)*2]);
1538               if (flag[0] && flag[1]){
1539                 T* A=&arr_[0]; T* B=&arr__[0];
1540                 for(int s=2; merg_indx%s==0; s*=2){
1541                   //std    ::merge(&A[recv_disp[merg_indx-s/2]],&A[recv_disp[merg_indx    ]],
1542                   //               &A[recv_disp[merg_indx-s  ]],&A[recv_disp[merg_indx-s/2]], &B[recv_disp[merg_indx-s]]);
1543                   omp_par::merge(&A[recv_disp[merg_indx-s/2]],&A[recv_disp[merg_indx    ]],
1544                                  &A[recv_disp[merg_indx-s  ]],&A[recv_disp[merg_indx-s/2]], &B[recv_disp[merg_indx-s]], omp_p,std::less<T>());
1545                   T* C=A; A=B; B=C; // Swap
1546                 }
1547                 merg_indx+=2;
1548               }
1549             }
1550           }
1551 #ifdef _PROFILE_SORT
1552 				hyper_communicate.stop();
1553 				hyper_merge.start();
1554 #endif
1555 					// Merge remaining parts.
1556           while(merg_indx<=(int)kway){
1557               MPI_Waitall(1, &reqst[(merg_indx-1)*2], &status[(merg_indx-1)*2]);
1558               MPI_Waitall(1, &reqst[(merg_indx-2)*2], &status[(merg_indx-2)*2]);
1559               {
1560                 T* A=&arr_[0]; T* B=&arr__[0];
1561                 for(int s=2;merg_indx%s==0;s*=2){
1562                   //std    ::merge(&A[recv_disp[merg_indx-s/2]],&A[recv_disp[merg_indx    ]],
1563                   //               &A[recv_disp[merg_indx-s  ]],&A[recv_disp[merg_indx-s/2]], &B[recv_disp[merg_indx-s]]);
1564                   omp_par::merge(&A[recv_disp[merg_indx-s/2]],&A[recv_disp[merg_indx    ]],
1565                                  &A[recv_disp[merg_indx-s  ]],&A[recv_disp[merg_indx-s/2]], &B[recv_disp[merg_indx-s]],omp_p,std::less<T>());
1566                   T* C=A; A=B; B=C; // Swap
1567                 }
1568                 merg_indx+=2;
1569               }
1570           }
1571 					{// Swap buffers.
1572 						int swap_cond=0;
1573             for(int s=2;kway%s==0;s*=2) swap_cond++;
1574 						if(swap_cond%2==0) swap(arr,arr_);
1575 						else swap(arr,arr__);
1576 					}
1577 				}
1578 
1579 #ifdef _PROFILE_SORT
1580 				hyper_merge.stop();
1581 				hyper_comm_split.start();
1582 #endif
1583 				{// Split comm. kway  O( log(p) ) ??
1584     	     MPI_Comm scomm;
1585       	   MPI_Comm_split(comm, blk_id, myrank, &scomm );
1586 					 if(comm!=comm_) MPI_Comm_free(&comm);
1587         	 comm = scomm;
1588 
1589 			     MPI_Comm_size(comm, &npes);
1590            MPI_Comm_rank(comm, &myrank);
1591     	  }
1592 #ifdef _PROFILE_SORT
1593 				hyper_comm_split.stop();
1594 #endif
1595       }
1596 #ifdef _PROFILE_SORT
1597 		 	total_sort.stop();
1598 #endif
1599 return 1;
1600 } // mem effecient HykSort
1601 
1602 
1603   /// ----------- low mem verison - sc13 -----------------------------------
1604   template<typename T>
sampleSort(std::vector<T> & arr,MPI_Comm comm)1605     int sampleSort(std::vector<T>& arr, MPI_Comm comm){
1606 #ifdef __PROFILE_WITH_BARRIER__
1607       MPI_Barrier(comm);
1608 #endif
1609 
1610 #ifdef _PROFILE_SORT
1611 	 		total_sort.start();
1612 #endif
1613 
1614      int npes;
1615 
1616       MPI_Comm_size(comm, &npes);
1617 
1618       assert(arr.size());
1619 
1620       if (npes == 1) {
1621 #ifdef _PROFILE_SORT
1622 				seq_sort.start();
1623 #endif
1624         omp_par::merge_sort(&arr[0],&arr[arr.size()]);
1625 #ifdef _PROFILE_SORT
1626   			seq_sort.stop();
1627 		 		total_sort.stop();
1628 #endif
1629       }
1630 
1631       int myrank;
1632       MPI_Comm_rank(comm, &myrank);
1633 
1634       DendroIntL nelem = arr.size();
1635       DendroIntL nelemCopy = nelem;
1636       DendroIntL totSize;
1637       par::Mpi_Allreduce<DendroIntL>(&nelemCopy, &totSize, 1, MPI_SUM, comm);
1638 
1639       DendroIntL npesLong = npes;
1640       const DendroIntL FIVE = 5;
1641 
1642 
1643       if(totSize < (FIVE*npesLong*npesLong)) {
1644 #ifdef __DEBUG_PAR__
1645         if(!myrank) {
1646           std::cout <<" Using bitonic sort since totSize < (5*(npes^2)). totSize: "
1647             <<totSize<<" npes: "<<npes <<std::endl;
1648         }
1649 #endif
1650 #ifdef __DEBUG_PAR__
1651         MPI_Barrier(comm);
1652         if(!myrank) {
1653           std::cout<<"SampleSort (small n): Stage-1 passed."<<std::endl;
1654         }
1655         MPI_Barrier(comm);
1656 #endif
1657 
1658         MPI_Comm new_comm;
1659         if(totSize < npesLong) {
1660           if(!myrank) {
1661             std::cout<<" Input to sort is small. splittingComm: "
1662               <<npes<<" -> "<< totSize<<std::endl;
1663           }
1664           par::splitCommUsingSplittingRank(static_cast<int>(totSize), &new_comm, comm);
1665         } else {
1666           new_comm = comm;
1667         }
1668 
1669 #ifdef __DEBUG_PAR__
1670         MPI_Barrier(comm);
1671         if(!myrank) {
1672           std::cout<<"SampleSort (small n): Stage-2 passed."<<std::endl;
1673         }
1674         MPI_Barrier(comm);
1675 #endif
1676 
1677         if(!arr.empty()) {
1678           par::bitonicSort<T>(arr, new_comm);
1679         }
1680 
1681 if(comm!=new_comm)MPI_Comm_free(&new_comm);
1682 
1683 
1684 #ifdef __DEBUG_PAR__
1685         MPI_Barrier(comm);
1686         if(!myrank) {
1687           std::cout<<"SampleSort (small n): Stage-3 passed."<<std::endl;
1688         }
1689         MPI_Barrier(comm);
1690 #endif
1691       }// end if
1692 
1693 #ifdef __DEBUG_PAR__
1694       if(!myrank) {
1695         std::cout<<"Using sample sort to sort nodes. n/p^2 is fine."<<std::endl;
1696       }
1697 #endif
1698 
1699       //Re-part arr so that each proc. has atleast p elements.
1700 #ifdef _PROFILE_SORT
1701   		sort_partitionw.start();
1702 #endif
1703 			par::partitionW<T>(arr, NULL, comm);
1704 #ifdef _PROFILE_SORT
1705   		sort_partitionw.stop();
1706 #endif
1707       nelem = arr.size();
1708 
1709 #ifdef _PROFILE_SORT
1710 			seq_sort.start();
1711 #endif
1712       omp_par::merge_sort(&arr[0],&arr[arr.size()]);
1713 #ifdef _PROFILE_SORT
1714 			seq_sort.stop();
1715 #endif
1716       unsigned long idx;
1717       // std::vector<IndexHolder<T> > sendSplits(npes-1);
1718       std::vector< IndexHolder<T> >  splitters;
1719       std::vector< std::pair<T, DendroIntL> >  splitters_pair = par::Sorted_approx_Select_skewed( arr, npes-1, comm);
1720 
1721       for (int i=0; i<splitters_pair.size(); ++i) {
1722         splitters.push_back ( IndexHolder<T> ( splitters_pair[i].first, splitters_pair[i].second ) );
1723       }
1724 
1725       T key_last;
1726       DendroIntL zero = 0;
1727       MPI_Bcast (&key_last, 1, par::Mpi_datatype<T>::value(), npes-1, comm);
1728 
1729       splitters.push_back( IndexHolder<T>(key_last, zero) );
1730 
1731       omp_par::merge_sort(&splitters[0], &splitters[splitters.size()]);
1732 
1733       IndexHolder<T> *splittersPtr = NULL;
1734       if(!splitters.empty()) {
1735         splittersPtr = &(*(splitters.begin()));
1736       }
1737 
1738       int *sendcnts = new int[npes];
1739       assert(sendcnts);
1740 
1741       int * recvcnts = new int[npes];
1742       assert(recvcnts);
1743 
1744       int * sdispls = new int[npes];
1745       assert(sdispls);
1746 
1747       int * rdispls = new int[npes];
1748       assert(rdispls);
1749 
1750       #pragma omp parallel for
1751       for(int k = 0; k < npes; k++){
1752         sendcnts[k] = 0;
1753       }
1754 
1755       {
1756         int omp_p=omp_get_max_threads();
1757         int* proc_split = new int[omp_p+1];
1758         DendroIntL* lst_split_indx = new DendroIntL[omp_p+1];
1759         proc_split[0]=0;
1760         lst_split_indx[0]=0;
1761         lst_split_indx[omp_p]=nelem;
1762         #pragma omp parallel for
1763         for(int i=1;i<omp_p;i++){
1764           //proc_split[i] = seq::BinSearch(&splittersPtr[0],&splittersPtr[npes-1],arr[i*nelem/omp_p],std::less<T>());
1765           idx = 2*myrank*nelem/npes + i*(size_t)nelem/omp_p;
1766           IndexHolder<T> key( arr[i*(size_t)nelem/omp_p], idx);
1767           proc_split[i] = std::upper_bound( &splittersPtr[0], &splittersPtr[npes-1], key, std::less<IndexHolder<T> >())  - &splittersPtr[0];
1768           if(proc_split[i]<npes-1){
1769             //lst_split_indx[i]=seq::BinSearch(&arr[0],&arr[nelem],splittersPtr[proc_split[i]],std::less<T>());
1770             lst_split_indx[i] = std::upper_bound(&arr[0], &arr[nelem], splittersPtr[proc_split[i]].value , std::less<T>()) - &arr[0];
1771           }else{
1772             proc_split[i]     = npes-1;
1773             lst_split_indx[i] = nelem;
1774           }
1775         }
1776         idx = 2*myrank*nelem/npes;
1777         #pragma omp parallel for
1778         for (int i=0;i<omp_p;i++){
1779           int sendcnts_=0;
1780           int k=proc_split[i];
1781           for (DendroIntL j = lst_split_indx[i]; j < lst_split_indx[i+1]; j++) {
1782             if ( IndexHolder<T>(arr[j],idx+j) <= splitters[k]) {
1783               sendcnts_++;
1784             } else{
1785               if(sendcnts_>0)
1786                 sendcnts[k]=sendcnts_;
1787               sendcnts_=0;
1788               k = seq::UpperBound< IndexHolder<T> >(npes-1, splittersPtr, k+1, IndexHolder<T>(arr[j],idx+j) );
1789               if (k == (npes-1) ){
1790                 //could not find any splitter >= arr[j]
1791                 sendcnts_ = (nelem - j);
1792                 break;
1793               } else {
1794                 assert(k < (npes-1));
1795                 assert(splitters[k].value >= arr[j]);
1796                 sendcnts_++;
1797               }
1798             }//end if-else
1799           }//end for j
1800           if(sendcnts_>0)
1801             sendcnts[k]=sendcnts_;
1802         }
1803         delete [] lst_split_indx;
1804         delete [] proc_split;
1805       }
1806 
1807       par::Mpi_Alltoall<int>(sendcnts, recvcnts, 1, comm);
1808 
1809       sdispls[0] = 0; rdispls[0] = 0;
1810 
1811       omp_par::scan(sendcnts,sdispls,npes);
1812       omp_par::scan(recvcnts,rdispls,npes);
1813 
1814       DendroIntL nsorted = rdispls[npes-1] + recvcnts[npes-1];
1815       std::vector<T> SortedElem(nsorted);
1816 
1817       T* arrPtr = NULL;
1818       T* SortedElemPtr = NULL;
1819       if(!arr.empty()) {
1820         arrPtr = &(*(arr.begin()));
1821       }
1822       if(!SortedElem.empty()) {
1823         SortedElemPtr = &(*(SortedElem.begin()));
1824       }
1825 #ifdef _PROFILE_SORT
1826 	 		sample_prepare_scatter.stop();
1827 #endif
1828 
1829 #ifdef _PROFILE_SORT
1830 	 		sample_do_all2all.start();
1831 #endif
1832       // par::Mpi_Alltoallv_dense<T>(arrPtr, sendcnts, sdispls, SortedElemPtr, recvcnts, rdispls, comm);
1833       Mpi_Alltoallv(arrPtr, sendcnts, sdispls, SortedElemPtr, recvcnts, rdispls, comm);
1834 #ifdef _PROFILE_SORT
1835 	 		sample_do_all2all.stop();
1836 #endif
1837       arr.swap(SortedElem);
1838       SortedElem.clear();
1839       delete [] sendcnts;
1840       sendcnts = NULL;
1841 
1842       delete [] recvcnts;
1843       recvcnts = NULL;
1844 
1845       delete [] sdispls;
1846       sdispls = NULL;
1847 
1848       delete [] rdispls;
1849       rdispls = NULL;
1850 
1851 #ifdef _PROFILE_SORT
1852 	 		seq_sort.start();
1853 #endif
1854       omp_par::merge_sort(&arr[0], &arr[nsorted]);
1855 #ifdef _PROFILE_SORT
1856 	 		seq_sort.stop();
1857 #endif
1858 
1859 
1860 #ifdef _PROFILE_SORT
1861 	 		total_sort.stop();
1862 #endif
1863     }//end function
1864 
1865   //------------------------------------------------------------------------
1866 
1867   template<typename T>
sampleSort(std::vector<T> & arr,std::vector<T> & SortedElem,MPI_Comm comm)1868     int sampleSort(std::vector<T>& arr, std::vector<T> & SortedElem, MPI_Comm comm){
1869 #ifdef __PROFILE_WITH_BARRIER__
1870       MPI_Barrier(comm);
1871 #endif
1872 
1873 #ifdef _PROFILE_SORT
1874 	 		total_sort.start();
1875 #endif
1876 
1877      int npes;
1878 
1879       MPI_Comm_size(comm, &npes);
1880 
1881       assert(arr.size());
1882 
1883       if (npes == 1) {
1884 #ifdef _PROFILE_SORT
1885 				seq_sort.start();
1886 #endif
1887         omp_par::merge_sort(&arr[0],&arr[arr.size()]);
1888 #ifdef _PROFILE_SORT
1889   			seq_sort.stop();
1890 #endif
1891 				SortedElem  = arr;
1892 #ifdef _PROFILE_SORT
1893 		 		total_sort.stop();
1894 #endif
1895 			}
1896 
1897       std::vector<T>  splitters;
1898       std::vector<T>  allsplitters;
1899 
1900       int myrank;
1901       MPI_Comm_rank(comm, &myrank);
1902 
1903       DendroIntL nelem = arr.size();
1904       DendroIntL nelemCopy = nelem;
1905       DendroIntL totSize;
1906       par::Mpi_Allreduce<DendroIntL>(&nelemCopy, &totSize, 1, MPI_SUM, comm);
1907 
1908       DendroIntL npesLong = npes;
1909       const DendroIntL FIVE = 5;
1910 
1911       if(totSize < (FIVE*npesLong*npesLong)) {
1912         if(!myrank) {
1913           std::cout <<" Using bitonic sort since totSize < (5*(npes^2)). totSize: "
1914             <<totSize<<" npes: "<<npes <<std::endl;
1915         }
1916 //        par::partitionW<T>(arr, NULL, comm);
1917 
1918 #ifdef __DEBUG_PAR__
1919         MPI_Barrier(comm);
1920         if(!myrank) {
1921           std::cout<<"SampleSort (small n): Stage-1 passed."<<std::endl;
1922         }
1923         MPI_Barrier(comm);
1924 #endif
1925 
1926         SortedElem = arr;
1927         MPI_Comm new_comm;
1928         if(totSize < npesLong) {
1929           if(!myrank) {
1930             std::cout<<" Input to sort is small. splittingComm: "
1931               <<npes<<" -> "<< totSize<<std::endl;
1932           }
1933           par::splitCommUsingSplittingRank(static_cast<int>(totSize), &new_comm, comm);
1934         } else {
1935           new_comm = comm;
1936         }
1937 
1938 #ifdef __DEBUG_PAR__
1939         MPI_Barrier(comm);
1940         if(!myrank) {
1941           std::cout<<"SampleSort (small n): Stage-2 passed."<<std::endl;
1942         }
1943         MPI_Barrier(comm);
1944 #endif
1945 
1946         if(!SortedElem.empty()) {
1947           par::bitonicSort<T>(SortedElem, new_comm);
1948         }
1949 
1950 if(comm!=new_comm) MPI_Comm_free(&new_comm);
1951 
1952 
1953 #ifdef __DEBUG_PAR__
1954         MPI_Barrier(comm);
1955         if(!myrank) {
1956           std::cout<<"SampleSort (small n): Stage-3 passed."<<std::endl;
1957         }
1958         MPI_Barrier(comm);
1959 #endif
1960 
1961       }// end if
1962 
1963 #ifdef __DEBUG_PAR__
1964       if(!myrank) {
1965         std::cout<<"Using sample sort to sort nodes. n/p^2 is fine."<<std::endl;
1966       }
1967 #endif
1968 
1969       //Re-part arr so that each proc. has atleast p elements.
1970 #ifdef _PROFILE_SORT
1971   		sort_partitionw.start();
1972 #endif
1973 //			par::partitionW<T>(arr, NULL, comm);
1974 #ifdef _PROFILE_SORT
1975   		sort_partitionw.stop();
1976 #endif
1977       nelem = arr.size();
1978 
1979 #ifdef _PROFILE_SORT
1980 			seq_sort.start();
1981 #endif
1982       omp_par::merge_sort(&arr[0],&arr[arr.size()]);
1983 #ifdef _PROFILE_SORT
1984 			seq_sort.stop();
1985 #endif
1986 
1987       std::vector<T> sendSplits(npes-1);
1988       splitters.resize(npes);
1989 
1990       #pragma omp parallel for
1991       for(int i = 1; i < npes; i++)         {
1992         sendSplits[i-1] = arr[i*nelem/npes];
1993       }//end for i
1994 
1995 #ifdef _PROFILE_SORT
1996  		  sample_sort_splitters.start();
1997 #endif
1998       // sort sendSplits using bitonic ...
1999       par::bitonicSort<T>(sendSplits,comm);
2000 #ifdef _PROFILE_SORT
2001  		  sample_sort_splitters.stop();
2002 #endif
2003 
2004 
2005 #ifdef _PROFILE_SORT
2006 	 		sample_prepare_scatter.start();
2007 #endif
2008       // All gather with last element of splitters.
2009       T* sendSplitsPtr = NULL;
2010       T* splittersPtr = NULL;
2011       if(sendSplits.size() > static_cast<unsigned int>(npes-2)) {
2012         sendSplitsPtr = &(*(sendSplits.begin() + (npes -2)));
2013       }
2014       if(!splitters.empty()) {
2015         splittersPtr = &(*(splitters.begin()));
2016       }
2017       par::Mpi_Allgather<T>(sendSplitsPtr, splittersPtr, 1, comm);
2018 
2019       sendSplits.clear();
2020 
2021       int *sendcnts = new int[npes];
2022       assert(sendcnts);
2023 
2024       int * recvcnts = new int[npes];
2025       assert(recvcnts);
2026 
2027       int * sdispls = new int[npes];
2028       assert(sdispls);
2029 
2030       int * rdispls = new int[npes];
2031       assert(rdispls);
2032 
2033       #pragma omp parallel for
2034       for(int k = 0; k < npes; k++){
2035         sendcnts[k] = 0;
2036       }
2037 
2038       {
2039         int omp_p=omp_get_max_threads();
2040         int* proc_split = new int[omp_p+1];
2041         DendroIntL* lst_split_indx = new DendroIntL[omp_p+1];
2042         proc_split[0]=0;
2043         lst_split_indx[0]=0;
2044         lst_split_indx[omp_p]=nelem;
2045         #pragma omp parallel for
2046         for(int i=1;i<omp_p;i++){
2047           //proc_split[i] = seq::BinSearch(&splittersPtr[0],&splittersPtr[npes-1],arr[i*nelem/omp_p],std::less<T>());
2048           proc_split[i] = std::upper_bound(&splittersPtr[0],&splittersPtr[npes-1],arr[i*(size_t)nelem/omp_p],std::less<T>())-&splittersPtr[0];
2049           if(proc_split[i]<npes-1){
2050             //lst_split_indx[i]=seq::BinSearch(&arr[0],&arr[nelem],splittersPtr[proc_split[i]],std::less<T>());
2051             lst_split_indx[i]=std::upper_bound(&arr[0],&arr[nelem],splittersPtr[proc_split[i]],std::less<T>())-&arr[0];
2052           }else{
2053             proc_split[i]=npes-1;
2054             lst_split_indx[i]=nelem;
2055           }
2056         }
2057         #pragma omp parallel for
2058         for (int i=0;i<omp_p;i++){
2059           int sendcnts_=0;
2060           int k=proc_split[i];
2061           for (DendroIntL j = lst_split_indx[i]; j < lst_split_indx[i+1]; j++) {
2062             if (arr[j] <= splitters[k]) {
2063               sendcnts_++;
2064             } else{
2065               if(sendcnts_>0)
2066                 sendcnts[k]=sendcnts_;
2067               sendcnts_=0;
2068               k = seq::UpperBound<T>(npes-1, splittersPtr, k+1, arr[j]);
2069               if (k == (npes-1) ){
2070                 //could not find any splitter >= arr[j]
2071                 sendcnts_ = (nelem - j);
2072                 break;
2073               } else {
2074                 assert(k < (npes-1));
2075                 assert(splitters[k] >= arr[j]);
2076                 sendcnts_++;
2077               }
2078             }//end if-else
2079           }//end for j
2080           if(sendcnts_>0)
2081             sendcnts[k]=sendcnts_;
2082         }
2083         delete [] lst_split_indx;
2084         delete [] proc_split;
2085       }
2086 
2087       par::Mpi_Alltoall<int>(sendcnts, recvcnts, 1, comm);
2088 
2089       sdispls[0] = 0; rdispls[0] = 0;
2090 
2091       omp_par::scan(sendcnts,sdispls,npes);
2092       omp_par::scan(recvcnts,rdispls,npes);
2093 
2094       DendroIntL nsorted = rdispls[npes-1] + recvcnts[npes-1];
2095       SortedElem.resize(nsorted);
2096 
2097       T* arrPtr = NULL;
2098       T* SortedElemPtr = NULL;
2099       if(!arr.empty()) {
2100         arrPtr = &(*(arr.begin()));
2101       }
2102       if(!SortedElem.empty()) {
2103         SortedElemPtr = &(*(SortedElem.begin()));
2104       }
2105 #ifdef _PROFILE_SORT
2106 	 		sample_prepare_scatter.stop();
2107 #endif
2108 
2109 #ifdef _PROFILE_SORT
2110 	 		sample_do_all2all.start();
2111 #endif
2112       par::Mpi_Alltoallv_dense<T>(arrPtr, sendcnts, sdispls,
2113           SortedElemPtr, recvcnts, rdispls, comm);
2114 #ifdef _PROFILE_SORT
2115 	 		sample_do_all2all.stop();
2116 #endif
2117       arr.clear();
2118 
2119       delete [] sendcnts;
2120       sendcnts = NULL;
2121 
2122       delete [] recvcnts;
2123       recvcnts = NULL;
2124 
2125       delete [] sdispls;
2126       sdispls = NULL;
2127 
2128       delete [] rdispls;
2129       rdispls = NULL;
2130 
2131 #ifdef _PROFILE_SORT
2132 	 		seq_sort.start();
2133 #endif
2134       omp_par::merge_sort(&SortedElem[0], &SortedElem[nsorted]);
2135 #ifdef _PROFILE_SORT
2136 	 		seq_sort.stop();
2137 #endif
2138 
2139 
2140 #ifdef _PROFILE_SORT
2141 	 		total_sort.stop();
2142 #endif
2143     }//end function
2144 
2145   /********************************************************************/
2146   /*
2147    * which_keys is one of KEEP_HIGH or KEEP_LOW
2148    * partner    is the processor with which to Merge and Split.
2149    *
2150    */
2151   template <typename T>
MergeSplit(std::vector<T> & local_list,int which_keys,int partner,MPI_Comm comm)2152     void MergeSplit( std::vector<T> &local_list, int which_keys, int partner, MPI_Comm  comm) {
2153 
2154       MPI_Status status;
2155       int send_size = local_list.size();
2156       int recv_size = 0;
2157 
2158       // first communicate how many you will send and how many you will receive ...
2159 
2160       int       my_rank;
2161       MPI_Comm_rank(comm, &my_rank);
2162 
2163       par::Mpi_Sendrecv<int, int>( &send_size , 1, partner, 0,
2164           &recv_size, 1, partner, 0, comm, &status);
2165 
2166       // if (!my_rank || my_rank==2)
2167       // std::cout << my_rank << " <--> " << partner << "  -> " << send_size << " <- " << recv_size << std::endl;
2168 
2169       std::vector<T> temp_list( recv_size, local_list[0] );
2170 
2171       T* local_listPtr = NULL;
2172       T* temp_listPtr = NULL;
2173       if(!local_list.empty()) {
2174         local_listPtr = &(*(local_list.begin()));
2175       }
2176       if(!temp_list.empty()) {
2177         temp_listPtr = &(*(temp_list.begin()));
2178       }
2179 
2180       par::Mpi_Sendrecv<T, T>( local_listPtr, send_size, partner,
2181           1, temp_listPtr, recv_size, partner, 1, comm, &status);
2182 
2183 
2184       MergeLists<T>(local_list, temp_list, which_keys);
2185 
2186       temp_list.clear();
2187     } // Merge_split
2188 
2189   template <typename T>
Par_bitonic_sort_incr(std::vector<T> & local_list,int proc_set_size,MPI_Comm comm)2190     void Par_bitonic_sort_incr( std::vector<T> &local_list, int proc_set_size, MPI_Comm  comm ) {
2191       int  eor_bit;
2192       int       proc_set_dim;
2193       int       stage;
2194       int       partner;
2195       int       my_rank;
2196 
2197       MPI_Comm_rank(comm, &my_rank);
2198 
2199       proc_set_dim = 0;
2200       int x = proc_set_size;
2201       while (x > 1) {
2202         x = x >> 1;
2203         proc_set_dim++;
2204       }
2205 
2206       eor_bit = (1 << (proc_set_dim - 1) );
2207 
2208       for (stage = 0; stage < proc_set_dim; stage++) {
2209         partner = (my_rank ^ eor_bit);
2210 
2211         if (my_rank < partner) {
2212           MergeSplit<T> ( local_list,  KEEP_LOW, partner, comm);
2213         } else {
2214           MergeSplit<T> ( local_list, KEEP_HIGH, partner, comm);
2215         }
2216 
2217         eor_bit = (eor_bit >> 1);
2218       }
2219     }  // Par_bitonic_sort_incr
2220 
2221 
2222   template <typename T>
Par_bitonic_sort_decr(std::vector<T> & local_list,int proc_set_size,MPI_Comm comm)2223     void Par_bitonic_sort_decr( std::vector<T> &local_list, int proc_set_size, MPI_Comm  comm) {
2224       int  eor_bit;
2225       int       proc_set_dim;
2226       int       stage;
2227       int       partner;
2228       int       my_rank;
2229 
2230       MPI_Comm_rank(comm, &my_rank);
2231 
2232       proc_set_dim = 0;
2233       int x = proc_set_size;
2234       while (x > 1) {
2235         x = x >> 1;
2236         proc_set_dim++;
2237       }
2238 
2239       eor_bit = (1 << (proc_set_dim - 1));
2240 
2241       for (stage = 0; stage < proc_set_dim; stage++) {
2242         partner = my_rank ^ eor_bit;
2243 
2244         if (my_rank > partner) {
2245           MergeSplit<T> ( local_list,  KEEP_LOW, partner, comm);
2246         } else {
2247           MergeSplit<T> ( local_list, KEEP_HIGH, partner, comm);
2248         }
2249 
2250         eor_bit = (eor_bit >> 1);
2251       }
2252 
2253     } // Par_bitonic_sort_decr
2254 
2255   template <typename T>
Par_bitonic_merge_incr(std::vector<T> & local_list,int proc_set_size,MPI_Comm comm)2256     void Par_bitonic_merge_incr( std::vector<T> &local_list, int proc_set_size, MPI_Comm  comm ) {
2257       int       partner;
2258       int       rank, npes;
2259 
2260       MPI_Comm_rank(comm, &rank);
2261       MPI_Comm_size(comm, &npes);
2262 
2263       unsigned int num_left  =  binOp::getPrevHighestPowerOfTwo(npes);
2264       unsigned int num_right = npes - num_left;
2265 
2266       // 1, Do merge between the k right procs and the highest k left procs.
2267       if ( (static_cast<unsigned int>(rank) < num_left) &&
2268           (static_cast<unsigned int>(rank) >= (num_left - num_right)) ) {
2269         partner = static_cast<unsigned int>(rank) + num_right;
2270         MergeSplit<T> ( local_list,  KEEP_LOW, partner, comm);
2271       } else if (static_cast<unsigned int>(rank) >= num_left) {
2272         partner = static_cast<unsigned int>(rank) - num_right;
2273         MergeSplit<T> ( local_list,  KEEP_HIGH, partner, comm);
2274       }
2275     }
2276 
2277   template <typename T>
bitonicSort_binary(std::vector<T> & in,MPI_Comm comm)2278     void bitonicSort_binary(std::vector<T> & in, MPI_Comm comm) {
2279       int                   proc_set_size;
2280       unsigned int            and_bit;
2281       int               rank;
2282       int               npes;
2283 
2284       MPI_Comm_size(comm, &npes);
2285 
2286 #ifdef __DEBUG_PAR__
2287       assert(npes > 1);
2288       assert(!(npes & (npes-1)));
2289       assert(!(in.empty()));
2290 #endif
2291 
2292       MPI_Comm_rank(comm, &rank);
2293 
2294       for (proc_set_size = 2, and_bit = 2;
2295           proc_set_size <= npes;
2296           proc_set_size = proc_set_size*2,
2297           and_bit = and_bit << 1) {
2298 
2299         if ((rank & and_bit) == 0) {
2300           Par_bitonic_sort_incr<T>( in, proc_set_size, comm);
2301         } else {
2302           Par_bitonic_sort_decr<T>( in, proc_set_size, comm);
2303         }
2304       }//end for
2305     }
2306 
2307   template <typename T>
bitonicSort(std::vector<T> & in,MPI_Comm comm)2308     void bitonicSort(std::vector<T> & in, MPI_Comm comm) {
2309       int               rank;
2310       int               npes;
2311 
2312       MPI_Comm_size(comm, &npes);
2313       MPI_Comm_rank(comm, &rank);
2314 
2315       assert(!(in.empty()));
2316 
2317       omp_par::merge_sort(&in[0],&in[in.size()]);
2318       MPI_Barrier(comm);
2319 
2320       if(npes > 1) {
2321 
2322         // check if npes is a power of two ...
2323         bool isPower = (!(npes & (npes - 1)));
2324 
2325         if ( isPower ) {
2326           bitonicSort_binary<T>(in, comm);
2327         } else {
2328           MPI_Comm new_comm;
2329 
2330           // Since npes is not a power of two, we shall split the problem in two ...
2331           //
2332           // 1. Create 2 comm groups ... one for the 2^d portion and one for the
2333           // remainder.
2334           unsigned int splitter = splitCommBinary(comm, &new_comm);
2335 
2336           if ( static_cast<unsigned int>(rank) < splitter) {
2337             bitonicSort_binary<T>(in, new_comm);
2338           } else {
2339             bitonicSort<T>(in, new_comm);
2340           }
2341             MPI_Comm_free(&new_comm);
2342           // 3. Do a special merge of the two segments. (original comm).
2343           Par_bitonic_merge_incr( in,  binOp::getNextHighestPowerOfTwo(npes), comm );
2344 
2345           splitter = splitCommBinaryNoFlip(comm, &new_comm);
2346 
2347           // 4. Now a final sort on the segments.
2348           if (static_cast<unsigned int>(rank) < splitter) {
2349             bitonicSort_binary<T>(in, new_comm);
2350           } else {
2351             bitonicSort<T>(in, new_comm);
2352           }
2353 MPI_Comm_free(&new_comm);
2354         }//end if isPower of 2
2355       }//end if single processor
2356     }//end function
2357 
2358   template <typename T>
MergeLists(std::vector<T> & listA,std::vector<T> & listB,int KEEP_WHAT)2359     void MergeLists( std::vector<T> &listA, std::vector<T> &listB,
2360         int KEEP_WHAT) {
2361 
2362       T _low, _high;
2363 
2364       assert(!(listA.empty()));
2365       assert(!(listB.empty()));
2366 
2367       _low  = ( (listA[0] > listB[0]) ? listA[0] : listB[0]);
2368       _high = ( (listA[listA.size()-1] < listB[listB.size()-1]) ?
2369           listA[listA.size()-1] : listB[listB.size()-1]);
2370 
2371       // We will do a full merge first ...
2372       size_t list_size = listA.size() + listB.size();
2373 
2374       std::vector<T> scratch_list(list_size);
2375 
2376       unsigned int  index1 = 0;
2377       unsigned int  index2 = 0;
2378 
2379       for (size_t i = 0; i < list_size; i++) {
2380         //The order of (A || B) is important here,
2381         //so that index2 remains within bounds
2382         if ( (index1 < listA.size()) && ( (index2 >= listB.size()) || (listA[index1] <= listB[index2]) ) ) {
2383           scratch_list[i] = listA[index1];
2384           index1++;
2385         } else {
2386           scratch_list[i] = listB[index2];
2387           index2++;
2388         }
2389       }
2390 
2391       //Scratch list is sorted at this point.
2392 
2393       listA.clear();
2394       listB.clear();
2395       if ( KEEP_WHAT == KEEP_LOW ) {
2396         int ii=0;
2397         while ( ( (scratch_list[ii] < _low) || (ii < (list_size/2)) ) && (scratch_list[ii] <= _high) ) {
2398           ii++;
2399         }
2400 
2401         if(ii) {
2402           listA.insert(listA.end(), scratch_list.begin(),
2403               (scratch_list.begin() + ii));
2404         }
2405       } else {
2406         int ii = (list_size - 1);
2407         while ( ( (ii >= (list_size/2))
2408               && (scratch_list[ii] >= _low) )
2409             || (scratch_list[ii] > _high) ) {
2410           ii--;
2411         }
2412         if(ii < (list_size - 1) ) {
2413           listA.insert(listA.begin(), (scratch_list.begin() + (ii + 1)),
2414               (scratch_list.begin() + list_size));
2415         }
2416       }
2417       scratch_list.clear();
2418     }//end function
2419 
2420 
2421 	template<typename T>
Sorted_Sample_Select(std::vector<T> & arr,unsigned int kway,std::vector<unsigned int> & min_idx,std::vector<unsigned int> & max_idx,std::vector<DendroIntL> & splitter_ranks,MPI_Comm comm)2422 		std::vector<T> Sorted_Sample_Select(std::vector<T>& arr, unsigned int kway, std::vector<unsigned int>& min_idx, std::vector<unsigned int>& max_idx, std::vector<DendroIntL>& splitter_ranks, MPI_Comm comm) {
2423 			int rank, npes;
2424       MPI_Comm_size(comm, &npes);
2425 			MPI_Comm_rank(comm, &rank);
2426 
2427 			//-------------------------------------------
2428       DendroIntL totSize, nelem = arr.size();
2429       par::Mpi_Allreduce<DendroIntL>(&nelem, &totSize, 1, MPI_SUM, comm);
2430 
2431 			//Determine splitters. O( log(N/p) + log(p) )
2432       int splt_count = (1000*kway*nelem)/totSize;
2433       if (npes>1000*kway) splt_count = (((float)rand()/(float)RAND_MAX)*totSize<(1000*kway*nelem)?1:0);
2434       if (splt_count>nelem) splt_count=nelem;
2435       std::vector<T> splitters(splt_count);
2436       for(size_t i=0;i<splt_count;i++)
2437         splitters[i] = arr[rand()%nelem];
2438 
2439       // Gather all splitters. O( log(p) )
2440       int glb_splt_count;
2441       std::vector<int> glb_splt_cnts(npes);
2442       std::vector<int> glb_splt_disp(npes,0);
2443       par::Mpi_Allgather<int>(&splt_count, &glb_splt_cnts[0], 1, comm);
2444       omp_par::scan(&glb_splt_cnts[0],&glb_splt_disp[0],npes);
2445       glb_splt_count = glb_splt_cnts[npes-1] + glb_splt_disp[npes-1];
2446       std::vector<T> glb_splitters(glb_splt_count);
2447       MPI_Allgatherv(&    splitters[0], splt_count, par::Mpi_datatype<T>::value(),
2448                      &glb_splitters[0], &glb_splt_cnts[0], &glb_splt_disp[0],
2449                      par::Mpi_datatype<T>::value(), comm);
2450 
2451       // rank splitters. O( log(N/p) + log(p) )
2452       std::vector<DendroIntL> disp(glb_splt_count,0);
2453       if(nelem>0){
2454         #pragma omp parallel for
2455         for(size_t i=0; i<glb_splt_count; i++){
2456           disp[i] = std::lower_bound(&arr[0], &arr[nelem], glb_splitters[i]) - &arr[0];
2457         }
2458       }
2459       std::vector<DendroIntL> glb_disp(glb_splt_count, 0);
2460       MPI_Allreduce(&disp[0], &glb_disp[0], glb_splt_count, par::Mpi_datatype<DendroIntL>::value(), MPI_SUM, comm);
2461 
2462 			splitter_ranks.clear(); splitter_ranks.resize(kway);
2463 			min_idx.clear(); min_idx.resize(kway);
2464 			max_idx.clear(); max_idx.resize(kway);
2465 	    std::vector<T> split_keys(kway);
2466 			#pragma omp parallel for
2467       for (unsigned int qq=0; qq<kway; qq++) {
2468 				DendroIntL* _disp = &glb_disp[0];
2469 				DendroIntL* _mind = &glb_disp[0];
2470 				DendroIntL* _maxd = &glb_disp[0];
2471 				DendroIntL optSplitter = ((qq+1)*totSize)/(kway+1);
2472         // if (!rank) std::cout << "opt " << qq << " - " << optSplitter << std::endl;
2473         for(size_t i=0; i<glb_splt_count; i++) {
2474         	if(labs(glb_disp[i] - optSplitter) < labs(*_disp - optSplitter)) {
2475 						_disp = &glb_disp[i];
2476 					}
2477         	if( (glb_disp[i] > optSplitter) && ( labs(glb_disp[i] - optSplitter) < labs(*_maxd - optSplitter))  ) {
2478 						_maxd = &glb_disp[i];
2479 					}
2480         	if( (glb_disp[i] < optSplitter) && ( labs(optSplitter - glb_disp[i]) < labs(optSplitter - *_mind))  ) {
2481 						_mind = &glb_disp[i];
2482 					}
2483 				}
2484         split_keys[qq] = glb_splitters[_disp - &glb_disp[0]];
2485 				min_idx[qq] = std::lower_bound(&arr[0], &arr[nelem], glb_splitters[_mind - &glb_disp[0]]) - &arr[0];
2486 				max_idx[qq] = std::upper_bound(&arr[0], &arr[nelem], glb_splitters[_maxd - &glb_disp[0]]) - &arr[0];
2487 				splitter_ranks[qq] = optSplitter - *_mind;
2488 			}
2489 
2490 			return split_keys;
2491 		}
2492 
2493   template<typename T>
Sorted_approx_Select_helper(std::vector<T> & arr,std::vector<size_t> & exp_rank,std::vector<T> & splt_key,int beta,std::vector<size_t> & start,std::vector<size_t> & end,size_t & max_err,MPI_Comm comm)2494     void Sorted_approx_Select_helper(std::vector<T>& arr, std::vector<size_t>& exp_rank, std::vector<T>& splt_key, int beta, std::vector<size_t>& start, std::vector<size_t>& end, size_t& max_err, MPI_Comm comm) {
2495 
2496       int rank, npes;
2497       MPI_Comm_size(comm, &npes);
2498       MPI_Comm_rank(comm, &rank);
2499 
2500       size_t nelem=arr.size();
2501       int kway=exp_rank.size();
2502       std::vector<size_t> locSize(kway), totSize(kway);
2503       for(int i=0;i<kway;i++) locSize[i]=end[i]-start[i];
2504       par::Mpi_Allreduce<size_t>(&locSize[0], &totSize[0], kway, MPI_SUM, comm);
2505 
2506       //-------------------------------------------
2507       std::vector<T> loc_splt;
2508       for(int i=0;i<kway;i++){
2509         int splt_count = (totSize[i]==0?1:(beta*(end[i]-start[i]))/totSize[i]);
2510         if (npes>beta) splt_count = (((float)rand()/(float)RAND_MAX)*totSize[i]<(beta*locSize[i])?1:0);
2511         for(int j=0;j<splt_count;j++) loc_splt.push_back(arr[start[i]+rand()%(locSize[i]+1)]);
2512         std::sort(&loc_splt[loc_splt.size()-splt_count],&loc_splt[loc_splt.size()]);
2513       }
2514 
2515 			int splt_count=loc_splt.size();
2516 
2517       // Gather all splitters. O( log(p) )
2518       int glb_splt_count;
2519       std::vector<int> glb_splt_cnts(npes);
2520       std::vector<int> glb_splt_disp(npes,0);
2521       par::Mpi_Allgather<int>(&splt_count, &glb_splt_cnts[0], 1, comm);
2522       omp_par::scan(&glb_splt_cnts[0],&glb_splt_disp[0],npes);
2523       glb_splt_count = glb_splt_cnts[npes-1] + glb_splt_disp[npes-1];
2524       std::vector<T> glb_splt(glb_splt_count);
2525       MPI_Allgatherv(&loc_splt[0], splt_count, par::Mpi_datatype<T>::value(),
2526                      &glb_splt[0], &glb_splt_cnts[0], &glb_splt_disp[0], par::Mpi_datatype<T>::value(), comm);
2527       //MPI_Barrier(comm); tt[dbg_cnt]+=omp_get_wtime(); dbg_cnt++; //////////////////////////////////////////////////////////////////////
2528       std::sort(&glb_splt[0],&glb_splt[glb_splt_count]);
2529       //MPI_Barrier(comm); tt[dbg_cnt]+=omp_get_wtime(); dbg_cnt++; //////////////////////////////////////////////////////////////////////
2530 
2531       // rank splitters. O( log(N/p) + log(p) )
2532       std::vector<size_t> loc_rank(glb_splt_count,0);
2533       if(nelem>0){
2534         #pragma omp parallel for
2535         for(size_t i=0; i<glb_splt_count; i++){
2536           loc_rank[i] = std::lower_bound(&arr[0], &arr[nelem], glb_splt[i]) - &arr[0];
2537         }
2538       }
2539       //MPI_Barrier(comm); tt[dbg_cnt]+=omp_get_wtime(); dbg_cnt++; //////////////////////////////////////////////////////////////////////
2540       std::vector<size_t> glb_rank(glb_splt_count, 0);
2541       MPI_Allreduce(&loc_rank[0], &glb_rank[0], glb_splt_count, par::Mpi_datatype<size_t>::value(), MPI_SUM, comm);
2542       //MPI_Barrier(comm); tt[dbg_cnt]+=omp_get_wtime(); dbg_cnt++; //////////////////////////////////////////////////////////////////////
2543 
2544       size_t new_max_err=0;
2545       std::vector<T> split_keys(kway);
2546       // #pragma omp parallel for
2547       for (int i=0; i<kway; i++) {
2548         int ub_indx=std::upper_bound(&glb_rank[0], &glb_rank[glb_splt_count], exp_rank[i])-&glb_rank[0];
2549         int lb_indx=ub_indx-1; if(lb_indx<0) lb_indx=0;
2550         size_t err=labs(glb_rank[lb_indx]-exp_rank[i]);
2551 
2552         if(err<max_err){
2553           if(glb_rank[lb_indx]>exp_rank[i]) start[i]=0;
2554           else start[i] = loc_rank[lb_indx];
2555           if(ub_indx==glb_splt_count) end[i]=nelem;
2556           else end[i] = loc_rank[ub_indx];
2557           splt_key[i]=glb_splt[lb_indx];
2558           if(new_max_err<err) new_max_err=err;
2559         }
2560       }
2561       max_err=new_max_err;
2562     }
2563 
2564   template<typename T>
Sorted_approx_Select_recursive(std::vector<T> & arr,unsigned int kway,MPI_Comm comm)2565     std::vector<T> Sorted_approx_Select_recursive(std::vector<T>& arr, unsigned int kway, MPI_Comm comm) {
2566       int rank, npes;
2567       MPI_Comm_size(comm, &npes);
2568       MPI_Comm_rank(comm, &rank);
2569 
2570       //-------------------------------------------
2571       DendroIntL totSize, nelem = arr.size();
2572       par::Mpi_Allreduce<DendroIntL>(&nelem, &totSize, 1, MPI_SUM, comm);
2573 
2574       double tol=1e-2/kway;
2575       int beta=pow(1.0/tol,1.0/3.0)*3.0;
2576       std::vector<T> splt_key(kway);
2577       std::vector<size_t> start(kway,0);
2578       std::vector<size_t> end(kway,nelem);
2579       std::vector<size_t> exp_rank(kway);
2580       for(int i=0;i<kway;i++) exp_rank[i]=((i+1)*totSize)/(kway+1);
2581 
2582       size_t max_error=totSize;
2583       while(max_error>totSize*tol){
2584         Sorted_approx_Select_helper(arr, exp_rank, splt_key, beta, start, end, max_error, comm);
2585       }
2586 
2587       return splt_key;
2588     }
2589 
2590 	template<typename T>
Sorted_approx_Select(std::vector<T> & arr,unsigned int kway,MPI_Comm comm)2591 		std::vector<T> Sorted_approx_Select(std::vector<T>& arr, unsigned int kway, MPI_Comm comm) {
2592 			int rank, npes;
2593       MPI_Comm_size(comm, &npes);
2594 			MPI_Comm_rank(comm, &rank);
2595 
2596 			//-------------------------------------------
2597       DendroIntL totSize, nelem = arr.size();
2598       par::Mpi_Allreduce<DendroIntL>(&nelem, &totSize, 1, MPI_SUM, comm);
2599 
2600 			//Determine splitters. O( log(N/p) + log(p) )
2601       int splt_count = (1000*kway*nelem)/totSize;
2602       if (npes>1000*kway) splt_count = (((float)rand()/(float)RAND_MAX)*totSize<(1000*kway*nelem)?1:0);
2603       if (splt_count>nelem) splt_count=nelem;
2604       std::vector<T> splitters(splt_count);
2605       for(size_t i=0;i<splt_count;i++)
2606         splitters[i] = arr[rand()%nelem];
2607 
2608       // Gather all splitters. O( log(p) )
2609       int glb_splt_count;
2610       std::vector<int> glb_splt_cnts(npes);
2611       std::vector<int> glb_splt_disp(npes,0);
2612       par::Mpi_Allgather<int>(&splt_count, &glb_splt_cnts[0], 1, comm);
2613       omp_par::scan(&glb_splt_cnts[0],&glb_splt_disp[0],npes);
2614       glb_splt_count = glb_splt_cnts[npes-1] + glb_splt_disp[npes-1];
2615       std::vector<T> glb_splitters(glb_splt_count);
2616       MPI_Allgatherv(&    splitters[0], splt_count, par::Mpi_datatype<T>::value(),
2617                      &glb_splitters[0], &glb_splt_cnts[0], &glb_splt_disp[0],
2618                      par::Mpi_datatype<T>::value(), comm);
2619 
2620       // rank splitters. O( log(N/p) + log(p) )
2621       std::vector<DendroIntL> disp(glb_splt_count,0);
2622       if(nelem>0){
2623         #pragma omp parallel for
2624         for(size_t i=0; i<glb_splt_count; i++){
2625           disp[i] = std::lower_bound(&arr[0], &arr[nelem], glb_splitters[i]) - &arr[0];
2626         }
2627       }
2628       std::vector<DendroIntL> glb_disp(glb_splt_count, 0);
2629       MPI_Allreduce(&disp[0], &glb_disp[0], glb_splt_count, par::Mpi_datatype<DendroIntL>::value(), MPI_SUM, comm);
2630 
2631 	    std::vector<T> split_keys(kway);
2632 			#pragma omp parallel for
2633       for (unsigned int qq=0; qq<kway; qq++) {
2634 				DendroIntL* _disp = &glb_disp[0];
2635 				DendroIntL optSplitter = ((qq+1)*totSize)/(kway+1);
2636         // if (!rank) std::cout << "opt " << qq << " - " << optSplitter << std::endl;
2637         for(size_t i=0; i<glb_splt_count; i++) {
2638         	if(labs(glb_disp[i] - optSplitter) < labs(*_disp - optSplitter)) {
2639 						_disp = &glb_disp[i];
2640 					}
2641 				}
2642         split_keys[qq] = glb_splitters[_disp - &glb_disp[0]];
2643 			}
2644 
2645 			return split_keys;
2646 		}
2647 
2648 	template<typename T>
Sorted_approx_Select_skewed(std::vector<T> & arr,unsigned int kway,MPI_Comm comm)2649 		std::vector<std::pair<T, DendroIntL> > Sorted_approx_Select_skewed (std::vector<T>& arr, unsigned int kway, MPI_Comm comm) {
2650 			int rank, npes;
2651       MPI_Comm_size(comm, &npes);
2652 			MPI_Comm_rank(comm, &rank);
2653 
2654 			//-------------------------------------------
2655       DendroIntL totSize, nelem = arr.size();
2656       par::Mpi_Allreduce<DendroIntL>(&nelem, &totSize, 1, MPI_SUM, comm);
2657 
2658 			//Determine splitters. O( log(N/p) + log(p) )
2659       int splt_count = (1000*kway*nelem)/totSize;
2660       if (npes>1000*kway) splt_count = (((float)rand()/(float)RAND_MAX)*totSize<(1000*kway*nelem)?1:0);
2661       if (splt_count>nelem) splt_count=nelem;
2662 
2663       //! this changes to a pair ?
2664       // long should be sufficient for some time at least
2665       // 1<<63 <- 9,223,372,036,854,775,808 (9 Quintillion )
2666       std::vector<T>          splitters(splt_count);
2667       std::vector<DendroIntL> dup_ranks(splt_count);
2668       for(size_t i=0;i<splt_count;i++) {
2669         dup_ranks[i] = (2*i*totSize/npes) + rand()%nelem;
2670         splitters[i] =  arr[rand()%nelem];
2671       }
2672 
2673       // std::cout << rank << ": got splitters and indices " << std::endl;
2674       // Gather all splitters. O( log(p) )
2675       int glb_splt_count;
2676       std::vector<int> glb_splt_cnts(npes);
2677       std::vector<int> glb_splt_disp(npes,0);
2678 
2679       par::Mpi_Allgather<int>(&splt_count, &glb_splt_cnts[0], 1, comm);
2680       omp_par::scan(&glb_splt_cnts[0],&glb_splt_disp[0],npes);
2681       glb_splt_count = glb_splt_cnts[npes-1] + glb_splt_disp[npes-1];
2682 
2683       std::vector<T>          glb_splitters (glb_splt_count);
2684       std::vector<DendroIntL> glb_dup_ranks (glb_splt_count);
2685       MPI_Allgatherv(&    dup_ranks[0], splt_count, par::Mpi_datatype<DendroIntL>::value(),
2686                      &glb_dup_ranks[0], &glb_splt_cnts[0], &glb_splt_disp[0],
2687                      par::Mpi_datatype<DendroIntL>::value(), comm);
2688       MPI_Allgatherv(&    splitters[0], splt_count, par::Mpi_datatype<T>::value(),
2689                      &glb_splitters[0], &glb_splt_cnts[0], &glb_splt_disp[0],
2690                      par::Mpi_datatype<T>::value(), comm);
2691 
2692 
2693       // std::cout << rank << ": ranking splitters " << std::endl;
2694       // rank splitters. O( log(N/p) + log(p) )
2695       std::vector<DendroIntL> disp(glb_splt_count, 0);
2696       DendroIntL dLow, dHigh;
2697       if(nelem>0){
2698         #pragma omp parallel for
2699         for(size_t i=0; i<glb_splt_count; i++){
2700           // disp[i] = std::lower_bound(&arr[0], &arr[nelem], glb_splitters[i]) - &arr[0];
2701           dLow = std::lower_bound(&arr[0], &arr[nelem], glb_splitters[i]) - &arr[0];
2702           dHigh = std::upper_bound(&arr[0], &arr[nelem], glb_splitters[i]) - &arr[0];
2703           if ( (dHigh-dLow) > 1 ) {
2704             DendroIntL sRank = glb_dup_ranks[i]*npes/2/totSize;
2705             if (sRank < rank ) {
2706               disp[i] = dLow;
2707             } else if (sRank > rank) {
2708               disp[i] = dHigh;
2709             } else {
2710               disp[i] = glb_dup_ranks[i] - (2*rank*totSize/npes);
2711             }
2712           } else {
2713             disp[i] = dLow;
2714           }
2715         }
2716       }
2717       std::vector<DendroIntL> glb_disp(glb_splt_count, 0);
2718 
2719       // std::cout << rank << ": all reduce " << std::endl;
2720       MPI_Allreduce(&disp[0], &glb_disp[0], glb_splt_count, par::Mpi_datatype<DendroIntL>::value(), MPI_SUM, comm);
2721 
2722 	    std::vector< std::pair<T, DendroIntL> > split_keys(kway);
2723       std::pair<T, DendroIntL> key_pair;
2724       #pragma omp parallel for
2725       for (unsigned int qq=0; qq<kway; qq++) {
2726 				DendroIntL* _disp = &glb_disp[0];
2727 				DendroIntL optSplitter = ((qq+1)*totSize)/(kway+1);
2728         // if (!rank) std::cout << "opt " << qq << " - " << optSplitter << std::endl;
2729         for(size_t i=0; i<glb_splt_count; i++) {
2730         	if(labs(glb_disp[i] - optSplitter) < labs(*_disp - optSplitter)) {
2731 						_disp = &glb_disp[i];
2732 					}
2733 				}
2734 
2735         key_pair.first  = glb_splitters[_disp - &glb_disp[0]];
2736         key_pair.second = glb_dup_ranks[_disp - &glb_disp[0]];
2737         split_keys[qq]  = key_pair; //
2738 			}
2739 
2740       // std::cout << rank << ": all done" << std::endl;
2741 			return split_keys;
2742 		}
2743 
2744 }//end namespace
2745 
2746