1 #include "moab/ParallelMergeMesh.hpp"
2 #include "moab/Core.hpp"
3 #include "moab/CartVect.hpp"
4 #include "moab/BoundBox.hpp"
5 #include "moab/Skinner.hpp"
6 #include "moab/MergeMesh.hpp"
7 #include "moab/CN.hpp"
8 #include "float.h"
9 #include <algorithm>
10 
11 #ifdef MOAB_HAVE_MPI
12 #include "moab_mpi.h"
13 #endif
14 
15 namespace moab{
16 
17   //Constructor
18   /*Get Merge Data and tolerance*/
ParallelMergeMesh(ParallelComm * pc,const double epsilon)19   ParallelMergeMesh::ParallelMergeMesh(ParallelComm *pc,
20 				       const double epsilon) :
21     myPcomm(pc), myEps(epsilon)
22   {
23     myMB = pc->get_moab();
24     mySkinEnts.resize(4);
25   }
26 
27 
28   //Have a wrapper function on the actual merge to avoid memory leaks
29   //Merges elements within a proximity of epsilon
merge(EntityHandle levelset,bool skip_local_merge)30   ErrorCode ParallelMergeMesh::merge(EntityHandle levelset, bool skip_local_merge)
31   {
32     ErrorCode rval = PerformMerge(levelset, skip_local_merge);MB_CHK_ERR(rval);
33     CleanUp();
34     return rval;
35   }
36 
37   //Perform the merge
PerformMerge(EntityHandle levelset,bool skip_local_merge)38   ErrorCode ParallelMergeMesh::PerformMerge(EntityHandle levelset, bool skip_local_merge)
39   {
40     //Get the mesh dimension
41     int dim;
42     ErrorCode rval = myMB->get_dimension(dim);MB_CHK_ERR(rval);
43 
44 
45     //Get the local skin elements
46     rval = PopulateMySkinEnts(levelset,dim, skip_local_merge);
47     //If there is only 1 proc, we can return now
48     if(rval != MB_SUCCESS || myPcomm->size() == 1){
49       return rval;
50     }
51 
52     //Determine the global bounding box
53     double gbox[6];
54     rval = GetGlobalBox(gbox);MB_CHK_ERR(rval);
55 
56     /* Assemble The Destination Tuples */
57     //Get a list of tuples which contain (toProc, handle, x,y,z)
58     myTup.initialize(1,0,1,3,mySkinEnts[0].size());
59     rval = PopulateMyTup(gbox);MB_CHK_ERR(rval);
60 
61     /* Gather-Scatter Tuple
62        -tup comes out as (remoteProc,handle,x,y,z) */
63     myCD.initialize(myPcomm->comm());
64 
65     //1 represents dynamic tuple, 0 represents index of the processor to send to
66     myCD.gs_transfer(1, myTup, 0);
67 
68     /* Sort By X,Y,Z
69        -Utilizes a custom quick sort incorporating epsilon*/
70     SortTuplesByReal(myTup,myEps);
71 
72     //Initialize another tuple list for matches
73     myMatches.initialize(2,0,2,0,mySkinEnts[0].size());
74 
75     //ID the matching tuples
76     rval = PopulateMyMatches();MB_CHK_ERR(rval);
77 
78     //We can free up the tuple myTup now
79     myTup.reset();
80 
81     /*Gather-Scatter Again*/
82     //1 represents dynamic list, 0 represents proc index to send tuple to
83     myCD.gs_transfer(1,myMatches,0);
84     //We can free up the crystal router now
85     myCD.reset();
86 
87     //Sort the matches tuple list
88     SortMyMatches();
89 
90     //Tag the shared elements
91     rval = TagSharedElements(dim);MB_CHK_ERR(rval);
92 
93     //Free up the matches tuples
94     myMatches.reset();
95     return rval;
96   }
97 
98   //Sets mySkinEnts with all of the skin entities on the processor
PopulateMySkinEnts(const EntityHandle meshset,int dim,bool skip_local_merge)99   ErrorCode ParallelMergeMesh::PopulateMySkinEnts(const EntityHandle meshset, int dim, bool skip_local_merge)
100   {
101     /*Merge Mesh Locally*/
102     //Get all dim dimensional entities
103     Range ents;
104     ErrorCode rval = myMB->get_entities_by_dimension(meshset,dim,ents);MB_CHK_ERR(rval);
105 
106     if (ents.empty() && dim==3)
107     {
108       dim--;
109       rval =  myMB->get_entities_by_dimension(meshset,dim,ents);MB_CHK_ERR(rval);// maybe dimension 2
110     }
111 
112     //Merge Mesh Locally
113     if (!skip_local_merge)
114       {
115         MergeMesh merger(myMB, false);
116         merger.merge_entities(ents,myEps);
117         //We can return if there is only 1 proc
118         if(rval != MB_SUCCESS || myPcomm->size() == 1){
119             return rval;
120           }
121 
122         //Rebuild the ents range
123         ents.clear();
124         rval = myMB->get_entities_by_dimension(meshset,dim,ents);MB_CHK_ERR(rval);
125       }
126 
127     /*Get Skin
128       -Get Range of all dimensional entities
129       -skinEnts[i] is the skin entities of dimension i*/
130     Skinner skinner(myMB);
131     for(int skin_dim = dim; skin_dim >= 0; skin_dim--){
132       rval = skinner.find_skin(meshset,ents,skin_dim,mySkinEnts[skin_dim]);MB_CHK_ERR(rval);
133     }
134     return MB_SUCCESS;
135   }
136 
137   //Determine the global assembly box
GetGlobalBox(double * gbox)138   ErrorCode ParallelMergeMesh::GetGlobalBox(double *gbox)
139   {
140     ErrorCode rval;
141 
142     /*Get Bounding Box*/
143     BoundBox box;
144     if(mySkinEnts[0].size() != 0){
145       rval = box.update(*myMB, mySkinEnts[0]);MB_CHK_ERR(rval);
146     }
147 
148     //Invert the max
149     box.bMax *= -1;
150 
151     /*Communicate to all processors*/
152     MPI_Allreduce( (void*)&box, gbox, 6, MPI_DOUBLE, MPI_MIN, MPI_COMM_WORLD);
153 
154     /*Assemble Global Bounding Box*/
155     //Flip the max back
156     for(int i=3; i<6; i++){
157       gbox[i] *= -1;
158     }
159     return MB_SUCCESS;
160   }
161 
162   //Assemble the tuples with their processor destination
PopulateMyTup(double * gbox)163   ErrorCode ParallelMergeMesh::PopulateMyTup(double * gbox)
164   {
165     /*Figure out how do partition the global box*/
166     double lengths[3];
167     int parts[3];
168     ErrorCode rval = PartitionGlobalBox(gbox, lengths, parts);MB_CHK_ERR(rval);
169 
170     /* Get Skin Coordinates, Vertices */
171     double *x = new double[mySkinEnts[0].size()];
172     double *y = new double[mySkinEnts[0].size()];
173     double *z = new double[mySkinEnts[0].size()];
174     rval = myMB->get_coords(mySkinEnts[0],x,y,z);
175     if(rval != MB_SUCCESS){
176       //Prevent Memory Leak
177       delete []x; delete []y; delete []z;
178       return rval;
179     }
180 
181     //Initialize variable to be used in the loops
182     std::vector<int> toProcs;
183     int xPart, yPart, zPart, xEps, yEps, zEps, baseProc;
184     unsigned long long tup_i=0, tup_ul=0, tup_r=0, count=0;
185     //These are boolean to determine if the vertex is on close enough to a given border
186     bool xDup, yDup, zDup;
187     bool canWrite = myTup.get_writeEnabled();
188     if(!canWrite) myTup.enableWriteAccess();
189     //Go through each vertex
190     for(Range::iterator it = mySkinEnts[0].begin(); it != mySkinEnts[0].end(); ++it){
191       xDup = false; yDup = false; zDup = false;
192       //Figure out which x,y,z partition the element is in.
193       xPart = static_cast<int>(floor((x[count]-gbox[0])/lengths[0]));
194       xPart = (xPart<parts[0]?xPart:parts[0]-1);//Make sure it stays within the bounds
195 
196       yPart = static_cast<int>(floor((y[count]-gbox[1])/lengths[1]));
197       yPart = (yPart<parts[1]?yPart:parts[1]-1);//Make sure it stays within the bounds
198 
199       zPart = static_cast<int>(floor((z[count]-gbox[2])/lengths[2]));
200       zPart = (zPart<parts[2]?zPart:parts[2]-1);//Make sure it stays within the bounds
201 
202       //Figure out the partition with the addition of Epsilon
203       xEps = static_cast<int>(floor((x[count]-gbox[0]+myEps)/lengths[0]));
204       yEps = static_cast<int>(floor((y[count]-gbox[1]+myEps)/lengths[1]));
205       zEps = static_cast<int>(floor((z[count]-gbox[2]+myEps)/lengths[2]));
206 
207       //Figure out if the vertex needs to be sent to multiple procs
208       xDup = (xPart != xEps && xEps < parts[0]);
209       yDup = (yPart != yEps && yEps < parts[1]);
210       zDup = (zPart != zEps && zEps < parts[2]);
211 
212       //Add appropriate processors to the vector
213       baseProc = xPart+ yPart * parts[0] + zPart * parts[0] * parts[1];
214       toProcs.push_back(baseProc);
215       if(xDup){
216 	toProcs.push_back(baseProc + 1);//Get partition to the right
217       }
218       if(yDup){
219 	//Partition up 1
220 	toProcs.push_back(baseProc + parts[0]);
221       }
222       if(zDup){
223 	//Partition above 1
224 	toProcs.push_back(baseProc + parts[0] * parts[1]);
225       }
226       if(xDup && yDup){
227 	//Partition up 1 and right 1
228 	toProcs.push_back(baseProc + parts[0] + 1);
229       }
230       if(xDup && zDup){
231 	//Partition right 1 and above 1
232 	toProcs.push_back(baseProc + parts[0] * parts[1] + 1);
233       }
234       if(yDup && zDup){
235 	//Partition up 1 and above 1
236 	toProcs.push_back(baseProc + parts[0] * parts[1] + parts[0]);
237       }
238       if(xDup && yDup && zDup){
239 	//Partition right 1, up 1, and above 1
240 	toProcs.push_back(baseProc + parts[0] * parts[1] + parts[0] + 1);
241       }
242       //Grow the tuple list if necessary
243       while(myTup.get_n() + toProcs.size() >= myTup.get_max()){
244 	myTup.resize(myTup.get_max() ?
245 		     myTup.get_max() + myTup.get_max()/2 + 1
246 		     : 2);
247       }
248 
249       //Add each proc as a tuple
250       for(std::vector<int>::iterator proc = toProcs.begin();
251 	  proc != toProcs.end();
252 	  ++proc){
253 	myTup.vi_wr[tup_i++] = *proc;
254 	myTup.vul_wr[tup_ul++] = *it;
255 	myTup.vr_wr[tup_r++] = x[count];
256 	myTup.vr_wr[tup_r++] = y[count];
257 	myTup.vr_wr[tup_r++] = z[count];
258 	myTup.inc_n();
259       }
260       count++;
261       toProcs.clear();
262     }
263     delete [] x;
264     delete [] y;
265     delete [] z;
266     if(!canWrite) myTup.disableWriteAccess();
267     return MB_SUCCESS;
268   }
269 
270   //Partition the global box by the number of procs
PartitionGlobalBox(double * gbox,double * lengths,int * parts)271   ErrorCode ParallelMergeMesh::PartitionGlobalBox(double *gbox, double *lengths, int *parts)
272   {
273     //Determine the length of each side
274     double xLen = gbox[3]-gbox[0];
275     double yLen = gbox[4]-gbox[1];
276     double zLen = gbox[5]-gbox[2];
277     unsigned numProcs = myPcomm->size();
278 
279     //Partition sides from the longest to shortest lengths
280     //If x is the longest side
281     if(xLen >= yLen && xLen >= zLen){
282       parts[0] = PartitionSide(xLen, yLen * zLen, numProcs, true);
283       numProcs /= parts[0];
284       //If y is second longest
285       if(yLen >= zLen){
286 	parts[1] = PartitionSide(yLen, zLen, numProcs, false);
287 	parts[2] = numProcs/parts[1];
288       }
289       //If z is the longer
290       else{
291 	parts[2] = PartitionSide(zLen, yLen, numProcs, false);
292 	parts[1] = numProcs/parts[2];
293       }
294     }
295     //If y is the longest side
296     else if (yLen >= zLen){
297       parts[1] = PartitionSide(yLen, xLen * zLen, numProcs, true);
298       numProcs /= parts[1];
299       //If x is the second longest
300       if(xLen >= zLen){
301 	parts[0] = PartitionSide(xLen, zLen, numProcs, false);
302 	parts[2] = numProcs/parts[0];
303       }
304       //If z is the second longest
305       else{
306 	parts[2] = PartitionSide(zLen, xLen, numProcs, false);
307 	parts[0] = numProcs/parts[2];
308       }
309     }
310     //If z is the longest side
311     else{
312       parts[2] = PartitionSide(zLen, xLen * yLen, numProcs, true);
313       numProcs /= parts[2];
314       //If x is the second longest
315       if(xLen >= yLen){
316 	parts[0] = PartitionSide(xLen, yLen, numProcs, false);
317 	parts[1] = numProcs/parts[0];
318       }
319       //If y is the second longest
320       else{
321 	parts[1] = PartitionSide(yLen, xLen, numProcs, false);
322 	parts[0] = numProcs/parts[1];
323       }
324     }
325 
326     //Divide up each side to give the lengths
327     lengths[0] = xLen/(double)parts[0];
328     lengths[1] = yLen/(double)parts[1];
329     lengths[2] = zLen/(double)parts[2];
330     return MB_SUCCESS;
331   }
332 
333   //Partition a side based on the length ratios
PartitionSide(double sideLen,double restLen,unsigned numProcs,bool altRatio)334   int ParallelMergeMesh::PartitionSide(double sideLen, double restLen, unsigned numProcs, bool altRatio)
335   {
336     //If theres only 1 processor, then just return 1
337     if(numProcs == 1){
338       return 1;
339     }
340     //Initialize with the ratio of 1 proc
341     double ratio = -DBL_MAX;
342     unsigned factor = 1;
343     //We need to be able to save the last ratio and factor (for comparison)
344     double oldRatio = ratio;
345     double oldFactor = 1;
346 
347     //This is the ratio were shooting for
348     double goalRatio = sideLen/restLen;
349 
350     //Calculate the divisor and numerator power
351     //This avoid if statements in the loop and is useful since both calculations are similar
352     double divisor, p;
353     if(altRatio){
354       divisor = (double)numProcs * sideLen;
355       p = 3;
356     }
357     else{
358       divisor = (double)numProcs;
359       p = 2;
360     }
361 
362     //Find each possible factor
363     for (unsigned i = 2; i <= numProcs/2; i++){
364       //If it is a factor...
365       if (numProcs % i == 0){
366 	//We need to save the past factor
367 	oldRatio = ratio;
368 	oldFactor = factor;
369 	//There are 2 different ways to calculate the ratio:
370 	//Comparing 1 side to 2 sides: (i*i*i)/(numProcs*x)
371 	//Justification:  We have a ratio x:y:z (side Lengths) == a:b:c (procs).  So a=kx, b=ky, c=kz.
372 	//Also, abc=n (numProcs) => bc = n/a.  Also, a=kx => k=a/x => 1/k=x/a
373 	//And so x/(yz) == (kx)/(kyz) == (kx)/(kykz(1/k)) == a/(bc(x/a)) == a/((n/a)(x/a)) == a^3/(nx).
374 	//Comparing 1 side to 1 side: (i*i)/numprocs
375 	//Justification: i/(n/i) == i^2/n
376 	ratio = pow((double)i, p)/divisor;
377 	factor = i;
378 	//Once we have passed the goal ratio, we can break since we'll only move away from the goal ratio
379 	if(ratio >= goalRatio){
380 	  break;
381 	}
382       }
383     }
384     //If we haven't reached the goal ratio yet, check out factor = numProcs
385     if(ratio < goalRatio){
386       oldRatio = ratio;
387       oldFactor = factor;
388       factor = numProcs;
389       ratio = pow((double)numProcs, p)/divisor;
390     }
391 
392     //Figure out if our oldRatio is better than ratio
393     if(fabs(ratio - goalRatio) > fabs(oldRatio-goalRatio)){
394       factor = oldFactor;
395     }
396     //Return our findings
397     return factor;
398   }
399 
400   //Id the tuples that are matching
PopulateMyMatches()401   ErrorCode ParallelMergeMesh::PopulateMyMatches()
402   {
403     //Counters for accessing tuples more efficiently
404     unsigned long i = 0, mat_i=0, mat_ul=0, j=0, tup_r=0;
405     double eps2 = myEps*myEps;
406 
407     uint tup_mi, tup_ml, tup_mul, tup_mr;
408     myTup.getTupleSize(tup_mi, tup_ml, tup_mul, tup_mr);
409 
410     bool canWrite = myMatches.get_writeEnabled();
411     if(!canWrite) myMatches.enableWriteAccess();
412 
413     while((i+1)<myTup.get_n()){
414       //Proximity Comparison
415       double xi = myTup.vr_rd[tup_r],
416 	yi = myTup.vr_rd[tup_r+1],
417 	zi = myTup.vr_rd[tup_r+2];
418 
419       bool done = false;
420       while(!done){
421 	j++; tup_r += tup_mr;
422 	if(j >= myTup.get_n()){
423 	  break;
424 	}
425 	CartVect cv(myTup.vr_rd[tup_r]-xi,
426 		    myTup.vr_rd[tup_r+1]-yi,
427 		    myTup.vr_rd[tup_r+2]-zi);
428 	if(cv.length_squared() > eps2){
429 	  done = true;
430 	}
431       }
432       //Allocate the tuple list before adding matches
433       while(myMatches.get_n()+(j-i)*(j-i-1) >= myMatches.get_max()){
434 	myMatches.resize(myMatches.get_max() ?
435 			 myMatches.get_max() + myMatches.get_max()/2 + 1 :
436 			 2);
437       }
438 
439       //We now know that tuples [i to j) exclusive match.
440       //If n tuples match, n*(n-1) match tuples will be made
441       //tuples are of the form (proc1,proc2,handle1,handle2)
442       if(i+1 < j){
443 	int kproc = i*tup_mi;
444 	unsigned long khand = i*tup_mul;
445 	for(unsigned long k = i; k<j; k++){
446 	  int lproc = kproc+tup_mi;
447 	  unsigned long lhand = khand+tup_mul;
448 	  for(unsigned long l=k+1; l<j; l++){
449 	    myMatches.vi_wr[mat_i++]=myTup.vi_rd[kproc];//proc1
450 	    myMatches.vi_wr[mat_i++]=myTup.vi_rd[lproc];//proc2
451 	    myMatches.vul_wr[mat_ul++]=myTup.vul_rd[khand];//handle1
452 	    myMatches.vul_wr[mat_ul++]=myTup.vul_rd[lhand];//handle2
453 	    myMatches.inc_n();
454 
455 	    myMatches.vi_wr[mat_i++]=myTup.vi_rd[lproc];//proc1
456 	    myMatches.vi_wr[mat_i++]=myTup.vi_rd[kproc];//proc2
457 	    myMatches.vul_wr[mat_ul++]=myTup.vul_rd[lhand];//handle1
458 	    myMatches.vul_wr[mat_ul++]=myTup.vul_rd[khand];//handle2
459 	    myMatches.inc_n();
460 	    lproc += tup_mi;
461 	    lhand += tup_mul;
462 	  }
463 	  kproc += tup_mi;
464 	  khand += tup_mul;
465 	}//End for(int k...
466       }
467       i = j;
468     }//End while(i+1<tup.n)
469 
470     if(!canWrite) myMatches.disableWriteAccess();
471     return MB_SUCCESS;
472   }
473 
474   //Sort the matching tuples so that vertices can be tagged accurately
SortMyMatches()475   ErrorCode ParallelMergeMesh::SortMyMatches()
476   {
477     TupleList::buffer buf(mySkinEnts[0].size());
478     //Sorts are necessary to check for doubles
479     //Sort by remote handle
480     myMatches.sort(3,&buf);
481     //Sort by matching proc
482     myMatches.sort(1,&buf);
483     //Sort by local handle
484     myMatches.sort(2,&buf);
485     buf.reset();
486     return MB_SUCCESS;
487   }
488 
489   //Tag the shared elements using existing PComm functionality
TagSharedElements(int dim)490   ErrorCode ParallelMergeMesh::TagSharedElements(int dim)
491   {
492     //Manipulate the matches list to tag vertices and entities
493     //Set up proc ents
494     Range proc_ents;
495     ErrorCode rval;
496 
497     // get the entities in the partition sets
498     for (Range::iterator rit = myPcomm->partitionSets.begin(); rit != myPcomm->partitionSets.end(); ++rit) {
499       Range tmp_ents;
500       rval = myMB->get_entities_by_handle(*rit, tmp_ents, true);
501       if (MB_SUCCESS != rval){
502 	return rval;
503       }
504       proc_ents.merge(tmp_ents);
505     }
506     if (myMB->dimension_from_handle(*proc_ents.rbegin()) !=
507 	myMB->dimension_from_handle(*proc_ents.begin())) {
508       Range::iterator lower = proc_ents.lower_bound(CN::TypeDimensionMap[0].first),
509 	upper = proc_ents.upper_bound(CN::TypeDimensionMap[dim-1].second);
510       proc_ents.erase(lower, upper);
511     }
512 
513 
514     //This vector doesn't appear to be used but its in resolve_shared_ents
515     int maxp = -1;
516     std::vector<int> sharing_procs(MAX_SHARING_PROCS);
517     std::fill(sharing_procs.begin(), sharing_procs.end(), maxp);
518 
519     // get ents shared by 1 or n procs
520     std::map<std::vector<int>, std::vector<EntityHandle> > proc_nranges;
521     Range proc_verts;
522     rval = myMB->get_adjacencies(proc_ents, 0, false, proc_verts,
523 				 Interface::UNION);
524     if(rval != MB_SUCCESS){
525       return rval;
526     }
527 
528     rval = myPcomm->tag_shared_verts(myMatches, proc_nranges, proc_verts);
529     if(rval != MB_SUCCESS){
530       return rval;
531     }
532 
533     // get entities shared by 1 or n procs
534     rval = myPcomm->get_proc_nvecs(dim,dim-1, &mySkinEnts[0],proc_nranges);
535     if(rval != MB_SUCCESS){
536       return rval;
537     }
538 
539     // create the sets for each interface; store them as tags on
540     // the interface instance
541     Range iface_sets;
542     rval = myPcomm->create_interface_sets(proc_nranges);
543     if(rval != MB_SUCCESS){
544       return rval;
545     }
546     // establish comm procs and buffers for them
547     std::set<unsigned int> procs;
548     rval = myPcomm->get_interface_procs(procs, true);
549     if(rval != MB_SUCCESS){
550       return rval;
551     }
552 
553     // resolve shared entity remote handles; implemented in ghost cell exchange
554     // code because it's so similar
555     rval = myPcomm->exchange_ghost_cells(-1, -1, 0, true, true);
556     if(rval != MB_SUCCESS){
557       return rval;
558     }
559     // now build parent/child links for interface sets
560     rval = myPcomm->create_iface_pc_links();
561     return rval;
562   }
563 
564   //Make sure to free up any allocated data
565   //Need to avoid a double free
CleanUp()566   void ParallelMergeMesh::CleanUp()
567   {
568     //The reset operation is now safe and avoids a double free()
569     myMatches.reset();
570     myTup.reset();
571     myCD.reset();
572   }
573 
574   //Simple quick  sort to real
SortTuplesByReal(TupleList & tup,double eps)575   void ParallelMergeMesh::SortTuplesByReal(TupleList &tup,
576 					   double eps)
577   {
578     bool canWrite = tup.get_writeEnabled();
579     if(!canWrite) tup.enableWriteAccess();
580 
581     uint mi, ml, mul, mr;
582     tup.getTupleSize(mi,ml,mul,mr);
583     PerformRealSort(tup, 0, tup.get_n(), eps, mr);
584 
585     if(!canWrite) tup.disableWriteAccess();
586   }
587 
588   //Swap around tuples
SwapTuples(TupleList & tup,unsigned long a,unsigned long b)589   void ParallelMergeMesh::SwapTuples(TupleList &tup,
590 				     unsigned long a, unsigned long b)
591   {
592     if(a==b) return;
593 
594     uint mi, ml, mul, mr;
595     tup.getTupleSize(mi, ml, mul, mr);
596 
597     //Swap mi
598     unsigned long a_val = a*mi, b_val=b*mi;
599     for(unsigned long i=0; i< mi;i++){
600       sint t =tup.vi_rd[a_val];
601       tup.vi_wr[a_val] = tup.vi_rd[b_val];
602       tup.vi_wr[b_val] = t;
603       a_val++;
604       b_val++;
605     }
606     //Swap ml
607     a_val = a*ml;
608     b_val = b*ml;
609     for(unsigned long i=0; i< ml;i++){
610       slong t =tup.vl_rd[a_val];
611       tup.vl_wr[a_val] = tup.vl_rd[b_val];
612       tup.vl_wr[b_val] = t;
613       a_val++;
614       b_val++;
615     }
616     //Swap mul
617     a_val = a*mul;
618     b_val = b*mul;
619     for(unsigned long i=0; i< mul;i++){
620       Ulong t =tup.vul_rd[a_val];
621       tup.vul_wr[a_val] = tup.vul_rd[b_val];
622       tup.vul_wr[b_val] = t;
623       a_val++;
624       b_val++;
625     }
626     //Swap mr
627     a_val = a*mr;
628     b_val = b*mr;
629     for(unsigned long i=0; i< mr;i++){
630       realType t =tup.vr_rd[a_val];
631       tup.vr_wr[a_val] = tup.vr_rd[b_val];
632       tup.vr_wr[b_val] = t;
633       a_val++;
634       b_val++;
635     }
636   }
637 
638   //Perform the sorting of a tuple by real
639   //To sort an entire tuple_list, call (tup,0,tup.n,epsilon)
PerformRealSort(TupleList & tup,unsigned long left,unsigned long right,double eps,uint tup_mr)640   void ParallelMergeMesh::PerformRealSort(TupleList &tup,
641 					  unsigned long left,
642 					  unsigned long right,
643 					  double eps,
644 					  uint tup_mr)
645   {
646     //If list size is only 1 or 0 return
647     if(left+1 >= right){
648       return;
649     }
650     unsigned long swap = left, tup_l = left*tup_mr,
651       tup_t = tup_l + tup_mr;
652 
653     //Swap the median with the left position for a (hopefully) better split
654     SwapTuples(tup, left, (left+right)/2);
655 
656     //Partition the data
657     for(unsigned long t=left+1;t<right;t++){
658       //If the left value(pivot) is greater than t_val, swap it into swap
659       if(TupleGreaterThan(tup,tup_l,tup_t,eps, tup_mr)){
660 	swap++;
661 	SwapTuples(tup, swap,t);
662       }
663       tup_t+=tup_mr;
664     }
665 
666     //Swap so that position swap is in the correct position
667     SwapTuples(tup,left,swap);
668 
669     //Sort left and right of swap
670     PerformRealSort(tup, left  ,swap, eps, tup_mr);
671     PerformRealSort(tup, swap+1,right,eps, tup_mr);
672   }
673 
674   //Note, this takes the actual tup.vr[] index (aka i*tup.mr)
TupleGreaterThan(TupleList & tup,unsigned long vrI,unsigned long vrJ,double eps,uint tup_mr)675   bool ParallelMergeMesh::TupleGreaterThan(TupleList &tup,
676 					   unsigned long vrI,
677 					   unsigned long vrJ,
678 					   double eps,
679 					   uint tup_mr){
680     unsigned check=0;
681     while(check < tup_mr){
682       //If the values are the same
683       if(fabs(tup.vr_rd[vrI+check]-tup.vr_rd[vrJ+check]) <= eps){
684 	check++;
685 	continue;
686       }
687       //If I greater than J
688       else if(tup.vr_rd[vrI+check] > tup.vr_rd[vrJ+check]){
689 	return true;
690       }
691       //If J greater than I
692       else{
693 	return false;
694       }
695     }
696     //All Values are the same
697     return false;
698   }
699 
700 }//End namespace moab
701