1 /* ----------------------------------------------------------------------
2    SPARTA - Stochastic PArallel Rarefied-gas Time-accurate Analyzer
3    http://sparta.sandia.gov
4    Steve Plimpton, sjplimp@sandia.gov, Michael Gallis, magalli@sandia.gov
5    Sandia National Laboratories
6 
7    Copyright (2014) Sandia Corporation.  Under the terms of Contract
8    DE-AC04-94AL85000 with Sandia Corporation, the U.S. Government retains
9    certain rights in this software.  This software is distributed under
10    the GNU General Public License.
11 
12    See the README file in the top-level SPARTA directory.
13 ------------------------------------------------------------------------- */
14 
15 #include "mpi.h"
16 #include "stdlib.h"
17 #include "string.h"
18 #include "comm.h"
19 #include "irregular.h"
20 #include "particle.h"
21 #include "grid.h"
22 #include "surf.h"
23 #include "update.h"
24 #include "modify.h"
25 #include "adapt_grid.h"
26 #include "memory.h"
27 #include "error.h"
28 
29 using namespace SPARTA_NS;
30 
31 enum{PKEEP,PINSERT,PDONE,PDISCARD,PENTRY,PEXIT,PSURF};   // several files
32 
33 /* ---------------------------------------------------------------------- */
34 
Comm(SPARTA * sparta)35 Comm::Comm(SPARTA *sparta) : Pointers(sparta)
36 {
37   MPI_Comm_rank(world,&me);
38   MPI_Comm_size(world,&nprocs);
39 
40   ncomm = 0;
41   commsortflag = 0;
42   commpartstyle = 1;
43 
44   neighflag = 0;
45   neighlist = NULL;
46 
47   iparticle = new Irregular(sparta);
48   igrid = NULL;
49   iuniform = NULL;
50 
51   pproc = NULL;
52   maxpproc = 0;
53   gproc = gsize = NULL;
54   maxgproc = 0;
55   sbuf = rbuf = NULL;
56   maxsendbuf = maxrecvbuf = 0;
57 
58   copymode = 0;
59 }
60 
61 /* ---------------------------------------------------------------------- */
62 
~Comm()63 Comm::~Comm()
64 {
65   if (copymode) return;
66 
67   delete iparticle;
68   delete igrid;
69   delete iuniform;
70 
71   memory->destroy(pproc);
72   memory->destroy(gproc);
73   memory->destroy(gsize);
74   memory->destroy(sbuf);
75   memory->destroy(rbuf);
76 
77   memory->destroy(neighlist);
78 }
79 
80 /* ----------------------------------------------------------------------
81    reset neighbor list used in particle comm and setup irregular for them
82    invoked after grid decomposition changes
83    no-op if commpartstyle not set or grid decomposition not clumped
84      since different mode of irregular comm will be done
85 ------------------------------------------------------------------------- */
86 
reset_neighbors()87 void Comm::reset_neighbors()
88 {
89   neighflag = 0;
90   if (!commpartstyle || !grid->clumped) return;
91   neighflag = 1;
92 
93   if (neighlist == NULL)
94     memory->create(neighlist,nprocs,"comm:neighlist");
95   for (int i = 0; i < nprocs; i++) neighlist[i] = 0;
96 
97   Grid::ChildCell *cells = grid->cells;
98   int nglocal = grid->nlocal;
99   int ntotal = nglocal + grid->nghost;
100 
101   for (int icell = nglocal; icell < ntotal; icell++)
102     neighlist[cells[icell].proc] = 1;
103   neighlist[me] = 0;
104 
105   nneigh = 0;
106   for (int i = 0; i < nprocs; i++)
107     if (neighlist[i]) neighlist[nneigh++] = i;
108 
109   iparticle->create_procs(nneigh,neighlist,commsortflag);
110 }
111 
112 /* ----------------------------------------------------------------------
113    migrate particles to new procs after particle move
114    return particle nlocal after compression,
115      so Update can iterate on particle move
116 ------------------------------------------------------------------------- */
117 
migrate_particles(int nmigrate,int * plist)118 int Comm::migrate_particles(int nmigrate, int *plist)
119 {
120   int i,j;
121 
122   Grid::ChildCell *cells = grid->cells;
123   Particle::OnePart *particles = particle->particles;
124 
125   int ncustom = particle->ncustom;
126   int nbytes_particle = sizeof(Particle::OnePart);
127   int nbytes_custom = particle->sizeof_custom();
128   int nbytes = nbytes_particle + nbytes_custom;
129 
130   // grow pproc and sbuf if necessary
131 
132   if (nmigrate > maxpproc) {
133     maxpproc = nmigrate;
134     memory->destroy(pproc);
135     memory->create(pproc,maxpproc,"comm:pproc");
136   }
137   if (nmigrate*nbytes > maxsendbuf) {
138     maxsendbuf = nmigrate*nbytes;
139     memory->destroy(sbuf);
140     memory->create(sbuf,maxsendbuf,"comm:sbuf");
141   }
142 
143   // fill proclist with procs to send to
144   // pack sbuf with particles to migrate
145   // if flag == PDISCARD, particle is deleted but not sent
146   // change icell of migrated particle to owning cell on receiving proc
147   // nsend = particles that actually migrate
148   // if no custom attributes, pack particles directly via memcpy()
149   // else pack_custom() performs packing into sbuf
150 
151   int nsend = 0;
152   int offset = 0;
153 
154   if (!ncustom) {
155     for (i = 0; i < nmigrate; i++) {
156       j = plist[i];
157       if (particles[j].flag == PDISCARD) continue;
158       pproc[nsend++] = cells[particles[j].icell].proc;
159       particles[j].icell = cells[particles[j].icell].ilocal;
160       memcpy(&sbuf[offset],&particles[j],nbytes_particle);
161       offset += nbytes_particle;
162     }
163   } else {
164     for (i = 0; i < nmigrate; i++) {
165       j = plist[i];
166       if (particles[j].flag == PDISCARD) continue;
167       pproc[nsend++] = cells[particles[j].icell].proc;
168       particles[j].icell = cells[particles[j].icell].ilocal;
169       memcpy(&sbuf[offset],&particles[j],nbytes_particle);
170       offset += nbytes_particle;
171       particle->pack_custom(j,&sbuf[offset]);
172       offset += nbytes_custom;
173     }
174   }
175 
176   // compress my list of particles
177 
178   particle->compress_migrate(nmigrate,plist);
179   int ncompress = particle->nlocal;
180 
181   // create or augment irregular communication plan
182   // nrecv = # of incoming particles
183 
184   int nrecv;
185   if (neighflag)
186     nrecv = iparticle->augment_data_uniform(nsend,pproc);
187   else
188     nrecv = iparticle->create_data_uniform(nsend,pproc,commsortflag);
189 
190   // extend particle list if necessary
191 
192   particle->grow(nrecv);
193 
194   // perform irregular communication
195   // if no custom attributes, append recv particles directly to particle list
196   // else receive into rbuf, unpack particles one by one via unpack_custom()
197 
198   if (!ncustom)
199     iparticle->
200       exchange_uniform(sbuf,nbytes,
201                        (char *) &particle->particles[particle->nlocal]);
202 
203   else {
204     if (nrecv*nbytes > maxrecvbuf) {
205       maxrecvbuf = nrecv*nbytes;
206       memory->destroy(rbuf);
207       memory->create(rbuf,maxrecvbuf,"comm:rbuf");
208     }
209 
210     iparticle->exchange_uniform(sbuf,nbytes,rbuf);
211 
212     offset = 0;
213     int nlocal = particle->nlocal;
214     for (i = 0; i < nrecv; i++) {
215       memcpy(&particle->particles[nlocal],&rbuf[offset],nbytes_particle);
216       offset += nbytes_particle;
217       particle->unpack_custom(&rbuf[offset],nlocal);
218       offset += nbytes_custom;
219       nlocal++;
220     }
221   }
222 
223   particle->nlocal += nrecv;
224   ncomm += nsend;
225   return ncompress;
226 }
227 
228 /* ----------------------------------------------------------------------
229    migrate grid cells with their particles to new procs
230    called from BalanceGrid and FixBalance
231 ------------------------------------------------------------------------- */
232 
migrate_cells(int nmigrate)233 void Comm::migrate_cells(int nmigrate)
234 {
235   if (update->have_mem_limit())
236     return migrate_cells_less_memory(nmigrate);
237 
238   int i,n;
239 
240   Grid::ChildCell *cells = grid->cells;
241   int nglocal = grid->nlocal;
242 
243   // grow proc and size lists if needed
244 
245   if (nmigrate > maxgproc) {
246     maxgproc = nmigrate;
247     memory->destroy(gproc);
248     memory->destroy(gsize);
249     memory->create(gproc,maxgproc,"comm:gproc");
250     memory->create(gsize,maxgproc,"comm:gsize");
251   }
252 
253   // fill proclist with procs to send to
254   // compute byte count needed to pack cells
255 
256   int nsend = 0;
257   bigint boffset = 0;
258   for (int icell = 0; icell < nglocal; icell++) {
259     if (cells[icell].nsplit <= 0) continue;
260     if (cells[icell].proc == me) continue;
261     gproc[nsend] = cells[icell].proc;
262     n = grid->pack_one(icell,NULL,1,1,1,0);
263     gsize[nsend++] = n;
264     boffset += n;
265   }
266 
267   if (boffset > MAXSMALLINT)
268     error->one(FLERR,"Migrate cells send buffer exceeds 2 GB");
269   int offset = boffset;
270 
271   // reallocate sbuf as needed
272 
273   if (offset > maxsendbuf) {
274     memory->destroy(sbuf);
275     maxsendbuf = offset;
276     memory->create(sbuf,maxsendbuf,"comm:sbuf");
277     memset(sbuf,0,maxsendbuf);
278   }
279 
280   // pack cell info into sbuf
281   // only called for unsplit and split cells I no longer own
282 
283   offset = 0;
284   for (int icell = 0; icell < nglocal; icell++) {
285     if (cells[icell].nsplit <= 0) continue;
286     if (cells[icell].proc == me) continue;
287     offset += grid->pack_one(icell,&sbuf[offset],1,1,1,1);
288   }
289 
290   // compress my list of owned implicit surfs, resets csurfs in kept cells
291   // compress my list of owned grid cells to remove migrating cells
292   // compress my list of owned distributed/explicit surfs
293   // compress particle list to remove particles in migrating cells
294   // procs with no migrating cells must also unset particle sorted
295   //   since compress_rebalance() unsets it
296 
297   if (nmigrate) {
298     if (surf->implicit) surf->compress_implicit();
299     grid->compress();
300     if (surf->distributed && !surf->implicit) surf->compress_explicit();
301     particle->compress_rebalance();
302   } else particle->sorted = 0;
303 
304   // create irregular communication plan with variable size datums
305   // nrecv = # of incoming grid cells
306   // recvsize = total byte size of incoming grid + particle info
307   // DEBUG: append a sort=1 arg so that messages from other procs
308   //        are received in repeatable order, thus grid cells stay in order
309 
310   if (!igrid) igrid = new Irregular(sparta);
311   int recvsize;
312   int nrecv = igrid->create_data_variable(nmigrate,gproc,gsize,
313                                           recvsize,commsortflag);
314 
315   // reallocate rbuf as needed
316 
317   if (recvsize > maxrecvbuf) {
318     memory->destroy(rbuf);
319     maxrecvbuf = recvsize;
320     memory->create(rbuf,maxrecvbuf,"comm:rbuf");
321     memset(rbuf,0,maxrecvbuf);
322   }
323 
324   // perform irregular communication
325 
326   igrid->exchange_variable(sbuf,gsize,rbuf);
327 
328   // unpack received grid cells with their particles
329 
330   offset = 0;
331   for (i = 0; i < nrecv; i++)
332     offset += grid->unpack_one(&rbuf[offset],1,1,1);
333 }
334 
335 /* ----------------------------------------------------------------------
336    migrate grid cells with their particles to new procs
337    called from BalanceGrid and FixBalance
338    uses multiple comm passes to reduce buffer size
339 ------------------------------------------------------------------------- */
340 
migrate_cells_less_memory(int nmigrate)341 void Comm::migrate_cells_less_memory(int nmigrate)
342 {
343   int i,n;
344 
345   // grow proc and size lists if needed
346 
347   if (nmigrate > maxgproc) {
348     maxgproc = nmigrate;
349     memory->destroy(gproc);
350     memory->destroy(gsize);
351     memory->create(gproc,maxgproc,"comm:gproc");
352     memory->create(gsize,maxgproc,"comm:gsize");
353   }
354 
355   // fill proclist with procs to send to
356   // compute byte count needed to pack cells
357 
358   int icell_start = 0;
359   int icell_end = grid->nlocal;
360   int not_done = 1;
361   int nglocal = grid->nlocal;
362 
363   while (not_done) {
364     Grid::ChildCell *cells = grid->cells;
365 
366     int nsend = 0;
367     bigint boffset = 0;
368     for (int icell = icell_start; icell < nglocal; icell++) {
369       icell_end = icell+1;
370       if (cells[icell].nsplit <= 0) continue;
371       if (cells[icell].proc == me) continue;
372       gproc[nsend] = cells[icell].proc;
373       n = grid->pack_one(icell,NULL,1,1,1,0);
374       if (n > 0 && boffset > 0 && boffset+n > update->global_mem_limit) {
375         icell_end -= 1;
376         break;
377       }
378       gsize[nsend++] = n;
379       boffset += n;
380     }
381 
382     if (boffset > MAXSMALLINT)
383       error->one(FLERR,"Migrate cells send buffer exceeds 2 GB");
384     int offset = boffset;
385 
386     // reallocate sbuf as needed
387 
388     if (offset > maxsendbuf) {
389       memory->destroy(sbuf);
390       maxsendbuf = offset;
391       memory->create(sbuf,maxsendbuf,"comm:sbuf");
392       memset(sbuf,0,maxsendbuf);
393     }
394 
395     // pack cell info into sbuf
396     // only called for unsplit and split cells I no longer own
397 
398     offset = 0;
399     for (int icell = icell_start; icell < icell_end; icell++) {
400       if (cells[icell].nsplit <= 0) continue;
401       if (cells[icell].proc == me) continue;
402       offset += grid->pack_one(icell,&sbuf[offset],1,1,1,1);
403     }
404 
405     // compress particle list to remove particles in migrating cells
406 
407     if (nmigrate) particle->compress_rebalance_sorted();
408 
409     // create irregular communication plan with variable size datums
410     // nrecv = # of incoming grid cells
411     // recvsize = total byte size of incoming grid + particle info
412     // DEBUG: append a sort=1 arg so that messages from other procs
413     //        are received in repeatable order, thus grid cells stay in order
414 
415     if (!igrid) igrid = new Irregular(sparta);
416     int recvsize;
417     int nrecv = igrid->create_data_variable(nsend,gproc,gsize,
418                                             recvsize,commsortflag);
419 
420     // reallocate rbuf as needed
421 
422     if (recvsize > maxrecvbuf) {
423       memory->destroy(rbuf);
424       maxrecvbuf = recvsize;
425       memory->create(rbuf,maxrecvbuf,"comm:rbuf");
426       memset(rbuf,0,maxrecvbuf);
427     }
428 
429     // perform irregular communication
430 
431     igrid->exchange_variable(sbuf,gsize,rbuf);
432 
433     // unpack received grid cells with their particles
434     // set unpack_one() sortflag arg to keep new particles sorted
435 
436     offset = 0;
437     for (i = 0; i < nrecv; i++)
438       offset += grid->unpack_one(&rbuf[offset],1,1,1,1);
439 
440     // deallocate large buffers to reduce memory footprint
441     // also deallocate igrid for same reason
442 
443     if (sbuf) memory->destroy(sbuf);
444     sbuf = NULL;
445     maxsendbuf = 0;
446 
447     if (rbuf) memory->destroy(rbuf);
448     rbuf = NULL;
449     maxrecvbuf = 0;
450 
451     delete igrid;
452     igrid = NULL;
453 
454     icell_start = icell_end;
455     int not_done_local = icell_start < nglocal;
456     MPI_Allreduce(&not_done_local,&not_done,1,MPI_INT,MPI_SUM,world);
457   }
458 
459   // compress my list of owned implicit surfs, resets csurfs in kept cells
460   // compress my list of owned grid cells to remove migrated cells
461 
462   if (nmigrate) {
463     if (surf->implicit) surf->compress_implicit();
464     grid->compress();
465     if (surf->distributed && !surf->implicit) surf->compress_explicit();
466   }
467 }
468 
469 /* ----------------------------------------------------------------------
470    send grid cell info with their surfs/particles needed for grid adaptation
471    return # of received cells and buf = ptr to received cell info
472    called from AdaptGrid
473 ------------------------------------------------------------------------- */
474 
send_cells_adapt(int nsend,int * procsend,char * inbuf,char ** outbuf)475 int Comm::send_cells_adapt(int nsend, int *procsend, char *inbuf, char **outbuf)
476 {
477   int i,n;
478 
479   AdaptGrid::SendAdapt *sadapt = (AdaptGrid::SendAdapt *) inbuf;
480 
481   // grow size list if needed
482   // don't use gproc, but needs to stay same size as gsize
483 
484   if (nsend > maxgproc) {
485     maxgproc = nsend;
486     memory->destroy(gproc);
487     memory->destroy(gsize);
488     memory->create(gproc,maxgproc,"comm:gproc");
489     memory->create(gsize,maxgproc,"comm:gsize");
490   }
491 
492   // compute byte count needed to pack cells
493 
494   bigint boffset = 0;
495   for (i = 0; i < nsend; i++) {
496     n = grid->pack_one_adapt((char *) &sadapt[i],NULL,0);
497     gsize[i] = n;
498     boffset += n;
499   }
500 
501   if (boffset > MAXSMALLINT)
502     error->one(FLERR,"Adapt grid send buffer exceeds 2 GB");
503   int offset = boffset;
504 
505   // reallocate sbuf as needed
506 
507   if (offset > maxsendbuf) {
508     memory->destroy(sbuf);
509     maxsendbuf = offset;
510     memory->create(sbuf,maxsendbuf,"comm:sbuf");
511     memset(sbuf,0,maxsendbuf);
512   }
513 
514   // pack cell info into sbuf
515   // only called for unsplit and split cells I no longer own
516 
517   offset = 0;
518   for (i = 0; i < nsend; i++)
519     offset += grid->pack_one_adapt((char *) &sadapt[i],&sbuf[offset],1);
520 
521   // create irregular communication plan with variable size datums
522   // nrecv = # of incoming grid cells
523   // recvsize = total byte size of incoming grid + particle info
524   // DEBUG: append a sort=1 arg so that messages from other procs
525   //        are received in repeatable order, thus grid cells stay in order
526 
527   if (!igrid) igrid = new Irregular(sparta);
528   int recvsize;
529   int nrecv =
530     igrid->create_data_variable(nsend,procsend,gsize,recvsize,commsortflag);
531 
532   // reallocate rbuf as needed
533 
534   if (recvsize > maxrecvbuf) {
535     memory->destroy(rbuf);
536     maxrecvbuf = recvsize;
537     memory->create(rbuf,maxrecvbuf,"comm:rbuf");
538     memset(rbuf,0,maxrecvbuf);
539   }
540 
541   // perform irregular communication
542 
543   igrid->exchange_variable(sbuf,gsize,rbuf);
544 
545   // return rbuf and grid cell count
546 
547   *outbuf = rbuf;
548   return nrecv;
549 }
550 
551 /* ----------------------------------------------------------------------
552    wrapper on irregular comm of datums of uniform size
553    receiving procs are neighbor procs of my owned grid cells, not including self
554    so can use same comm calls as migrate_particles (if neighflag is set)
555      via iparticle->augment_data_uniform() and exchange_uniform()
556      otherwise same logic as irregular_uniform()
557    called from FixAblate
558 ------------------------------------------------------------------------- */
559 
irregular_uniform_neighs(int nsend,int * procsend,char * inbuf,int nsize,char ** outbuf)560 int Comm::irregular_uniform_neighs(int nsend, int *procsend,
561                                    char *inbuf, int nsize, char **outbuf)
562 {
563   // if neighflag, use iparticle
564   // else one-time create of irregular comm plan with constant size datums
565   // nrecv = # of incoming grid cells
566 
567   if (!neighflag && !iuniform) iuniform = new Irregular(sparta);
568 
569   int nrecv;
570   if (neighflag)
571     nrecv = iparticle->augment_data_uniform(nsend,procsend);
572   else
573     nrecv = iuniform->create_data_uniform(nsend,procsend,commsortflag);
574 
575   // reallocate rbuf as needed
576 
577   if (nrecv*nsize > maxrecvbuf) {
578     memory->destroy(rbuf);
579     maxrecvbuf = nrecv*nsize;
580     memory->create(rbuf,maxrecvbuf,"comm:rbuf");
581     memset(rbuf,0,maxrecvbuf);
582   }
583 
584   // perform irregular communication
585 
586   if (neighflag)
587     iparticle->exchange_uniform(inbuf,nsize,rbuf);
588   else
589     iuniform->exchange_uniform(inbuf,nsize,rbuf);
590 
591   // return rbuf and grid cell count
592 
593   *outbuf = rbuf;
594   return nrecv;
595 }
596 
597 /* ----------------------------------------------------------------------
598    wrapper on irregular comm of datums of uniform size
599    receiving procs can be anyone, including self
600    use create_data_uniform() and exchange_uniform()
601    called from AdaptGrid
602 ------------------------------------------------------------------------- */
603 
irregular_uniform(int nsend,int * procsend,char * inbuf,int nsize,char ** outbuf)604 int Comm::irregular_uniform(int nsend, int *procsend,
605                             char *inbuf, int nsize, char **outbuf)
606 {
607   // one-time create of irregular comm plan with constant size datums
608   // nrecv = # of incoming grid cells
609 
610   if (!iuniform) iuniform = new Irregular(sparta);
611   int nrecv = iuniform->create_data_uniform(nsend,procsend,commsortflag);
612 
613   // reallocate rbuf as needed
614 
615   if (nrecv*nsize > maxrecvbuf) {
616     memory->destroy(rbuf);
617     maxrecvbuf = nrecv*nsize;
618     memory->create(rbuf,maxrecvbuf,"comm:rbuf");
619     memset(rbuf,0,maxrecvbuf);
620   }
621 
622   // perform irregular communication
623 
624   iuniform->exchange_uniform(inbuf,nsize,rbuf);
625 
626   // return rbuf and grid cell count
627 
628   *outbuf = rbuf;
629   return nrecv;
630 }
631 
632 /* ----------------------------------------------------------------------
633    communicate inbuf around full ring of processors with messtag
634    nbytes = size of inbuf = n datums * nper bytes
635    callback() is invoked to allow caller to process/update each proc's inbuf
636    note that callback() is invoked on final iteration for original inbuf
637    for non-NULL outbuf, final updated inbuf is copied to it
638    outbuf = inbuf is OK
639 ------------------------------------------------------------------------- */
640 
ring(int n,int nper,void * inbuf,int messtag,void (* callback)(int,char *,void *),void * outbuf,int self,void * ptr)641 void Comm::ring(int n, int nper, void *inbuf, int messtag,
642                 void (*callback)(int, char *, void *), void *outbuf, int self,
643                 void *ptr)
644 {
645   MPI_Request request;
646   MPI_Status status;
647 
648   int nbytes = n*nper;
649   int maxbytes;
650   MPI_Allreduce(&nbytes,&maxbytes,1,MPI_INT,MPI_MAX,world);
651 
652   char *buf,*bufcopy;
653   memory->create(buf,maxbytes,"comm:buf");
654   memory->create(bufcopy,maxbytes,"comm:bufcopy");
655   memcpy(buf,inbuf,nbytes);
656 
657   int next = me + 1;
658   int prev = me - 1;
659   if (next == nprocs) next = 0;
660   if (prev < 0) prev = nprocs - 1;
661 
662   for (int loop = 0; loop < nprocs; loop++) {
663     if (me != next) {
664       MPI_Irecv(bufcopy,maxbytes,MPI_CHAR,prev,messtag,world,&request);
665       MPI_Send(buf,nbytes,MPI_CHAR,next,messtag,world);
666       MPI_Wait(&request,&status);
667       MPI_Get_count(&status,MPI_CHAR,&nbytes);
668       memcpy(buf,bufcopy,nbytes);
669     }
670     if (self || loop != nprocs-1) callback(nbytes/nper,buf,ptr);
671   }
672 
673   if (outbuf) memcpy(outbuf,buf,nbytes);
674 
675   memory->destroy(buf);
676   memory->destroy(bufcopy);
677 }
678 
679 /* ----------------------------------------------------------------------
680    rendezvous communication operation
681    three stages:
682      first comm sends inbuf from caller decomp to rvous decomp
683      callback operates on data in rendevous decomp
684      second comm sends outbuf from rvous decomp back to caller decomp
685    inputs:
686      which = perform (0) irregular or (1) MPI_All2allv communication
687      n = # of datums in inbuf
688      inbuf = vector of input datums
689      insize = byte size of each input datum
690      inorder = 0 for inbuf in random proc order, 1 for datums ordered by proc
691      procs: inorder 0 = proc to send each datum to, 1 = # of datums/proc,
692      callback = caller function to invoke in rendezvous decomposition
693                 takes input datums, returns output datums
694      outorder = same as inorder, but for datums returned by callback()
695      ptr = pointer to caller class, passed to callback()
696      statflag = 1 for stats output, else 0
697    outputs:
698      nout = # of output datums (function return)
699      outbuf = vector of output datums
700      outsize = byte size of each output datum
701    callback inputs:
702      nrvous = # of rvous decomp datums in inbuf_rvous
703      inbuf_rvous = vector of rvous decomp input datums
704      ptr = pointer to caller class
705    callback outputs:
706      nrvous_out = # of rvous decomp output datums (function return)
707      flag = 0 for no second comm, 1 for outbuf_rvous = inbuf_rvous,
708             2 for second comm with new outbuf_rvous
709      procs_rvous = outorder 0 = proc to send each datum to, 1 = # of datums/proc
710                    allocated
711      outbuf_rvous = vector of rvous decomp output datums
712    NOTE: could use MPI_INT or MPI_DOUBLE insead of MPI_CHAR
713          to avoid checked-for overflow in MPI_Alltoallv?
714 ------------------------------------------------------------------------- */
715 
716 int Comm::
rendezvous(int which,int n,char * inbuf,int insize,int inorder,int * procs,int (* callback)(int,char *,int &,int * &,char * &,void *),int outorder,char * & outbuf,int outsize,void * ptr,int statflag)717 rendezvous(int which, int n, char *inbuf, int insize,
718            int inorder, int *procs,
719            int (*callback)(int, char *, int &, int *&, char *&, void *),
720            int outorder, char *&outbuf, int outsize, void *ptr, int statflag)
721 {
722   if (which == 0)
723     return rendezvous_irregular(n,inbuf,insize,inorder,procs,callback,
724                                 outorder,outbuf,outsize,ptr,statflag);
725   else
726     return rendezvous_all2all(n,inbuf,insize,inorder,procs,callback,
727                               outorder,outbuf,outsize,ptr,statflag);
728 }
729 
730 /* ---------------------------------------------------------------------- */
731 
732 int Comm::
rendezvous_irregular(int n,char * inbuf,int insize,int inorder,int * procs,int (* callback)(int,char *,int &,int * &,char * &,void *),int outorder,char * & outbuf,int outsize,void * ptr,int statflag)733 rendezvous_irregular(int n, char *inbuf, int insize, int inorder, int *procs,
734                      int (*callback)(int, char *, int &, int *&, char *&, void *),
735                      int outorder, char *&outbuf,
736                      int outsize, void *ptr, int statflag)
737 {
738   // irregular comm of inbuf from caller decomp to rendezvous decomp
739 
740   Irregular *irregular = new Irregular(sparta);
741 
742   int nrvous;
743   if (inorder) nrvous = irregular->create_data_uniform_grouped(n,procs);
744   else nrvous = irregular->create_data_uniform(n,procs);
745 
746   char *inbuf_rvous = (char *) memory->smalloc((bigint) nrvous*insize,
747                                                "rendezvous:inbuf");
748   irregular->exchange_uniform(inbuf,insize,inbuf_rvous);
749 
750   bigint irregular1_bytes = 0;   // irregular->irregular_bytes;
751   delete irregular;
752 
753   // done if callback is NULL, return inbuf_rvous
754 
755   if (!callback) {
756     outbuf = inbuf_rvous;
757     return nrvous;
758   }
759 
760   // peform rendezvous computation via callback()
761   // callback() allocates/populates proclist_rvous and outbuf_rvous
762 
763   int flag;
764   int *procs_rvous;
765   char *outbuf_rvous;
766   int nrvous_out = callback(nrvous,inbuf_rvous,flag,
767                             procs_rvous,outbuf_rvous,ptr);
768 
769   if (flag != 1) memory->sfree(inbuf_rvous);  // outbuf_rvous = inbuf_vous
770   if (flag == 0) return 0;    // all nout_rvous are 0, no 2nd comm stage
771 
772   // irregular comm of outbuf from rendezvous decomp back to caller decomp
773   // caller will free outbuf
774 
775   irregular = new Irregular(sparta);
776 
777   int nout;
778   if (outorder)
779     nout = irregular->create_data_uniform_grouped(nrvous_out,procs_rvous);
780   else nout = irregular->create_data_uniform(nrvous_out,procs_rvous);
781 
782   outbuf = (char *) memory->smalloc((bigint) nout*outsize,
783                                     "rendezvous:outbuf");
784   irregular->exchange_uniform(outbuf_rvous,outsize,outbuf);
785 
786   bigint irregular2_bytes = 0;   // irregular->irregular_bytes;
787   delete irregular;
788 
789   memory->destroy(procs_rvous);
790   memory->sfree(outbuf_rvous);
791 
792   // return number of output datums
793 
794   if (!statflag) return nout;
795 
796   rendezvous_stats(n,insize,nout,outsize,nrvous,nrvous_out);
797 
798   /*
799   rvous_bytes = 0;
800   rvous_bytes += n*insize;                                // inbuf
801   rvous_bytes += nout*outsize;                            // outbuf
802   rvous_bytes += nrvous*insize;                           // inbuf_rvous
803   rvous_bytes += nrvous_out*outsize;                      // outbuf_rvous
804   rvous_bytes += nrvous_out*sizeof(int);                  // procs_rvous
805   rvous_bytes += MAX(irregular1_bytes,irregular2_bytes);  // max of 2 comms
806   */
807 
808   return nout;
809 }
810 
811 /* ---------------------------------------------------------------------- */
812 
813 int Comm::
rendezvous_all2all(int n,char * inbuf,int insize,int inorder,int * procs,int (* callback)(int,char *,int &,int * &,char * &,void *),int outorder,char * & outbuf,int outsize,void * ptr,int statflag)814 rendezvous_all2all(int n, char *inbuf, int insize, int inorder, int *procs,
815                    int (*callback)(int, char *, int &, int *&, char *&, void *),
816                    int outorder, char *&outbuf, int outsize, void *ptr,
817                    int statflag)
818 {
819   int iproc;
820   bigint all2all1_bytes,all2all2_bytes;
821   int *sendcount,*sdispls,*recvcount,*rdispls;
822   int *procs_a2a;
823   bigint *offsets;
824   char *inbuf_a2a,*outbuf_a2a;
825 
826   // create procs and inbuf for All2all if necesary
827 
828   if (!inorder) {
829     memory->create(procs_a2a,nprocs,"rendezvous:procs");
830     inbuf_a2a = (char *) memory->smalloc((bigint) n*insize,
831                                          "rendezvous:inbuf");
832     memory->create(offsets,nprocs,"rendezvous:offsets");
833 
834     for (int i = 0; i < nprocs; i++) procs_a2a[i] = 0;
835     for (int i = 0; i < n; i++) procs_a2a[procs[i]]++;
836 
837     offsets[0] = 0;
838     for (int i = 1; i < nprocs; i++)
839       offsets[i] = offsets[i-1] + insize*procs_a2a[i-1];
840 
841     bigint offset = 0;
842     for (int i = 0; i < n; i++) {
843       iproc = procs[i];
844       memcpy(&inbuf_a2a[offsets[iproc]],&inbuf[offset],insize);
845       offsets[iproc] += insize;
846       offset += insize;
847     }
848 
849     all2all1_bytes = nprocs*sizeof(int) + nprocs*sizeof(bigint) + n*insize;
850 
851   } else {
852     procs_a2a = procs;
853     inbuf_a2a = inbuf;
854     all2all1_bytes = 0;
855   }
856 
857   // create args for MPI_Alltoallv() on input data
858 
859   memory->create(sendcount,nprocs,"rendezvous:sendcount");
860   memcpy(sendcount,procs_a2a,nprocs*sizeof(int));
861 
862   memory->create(recvcount,nprocs,"rendezvous:recvcount");
863   MPI_Alltoall(sendcount,1,MPI_INT,recvcount,1,MPI_INT,world);
864 
865   memory->create(sdispls,nprocs,"rendezvous:sdispls");
866   memory->create(rdispls,nprocs,"rendezvous:rdispls");
867   sdispls[0] = rdispls[0] = 0;
868   for (int i = 1; i < nprocs; i++) {
869     sdispls[i] = sdispls[i-1] + sendcount[i-1];
870     rdispls[i] = rdispls[i-1] + recvcount[i-1];
871   }
872   int nrvous = rdispls[nprocs-1] + recvcount[nprocs-1];
873 
874   // test for overflow of input data due to imbalance or insize
875   // means that individual sdispls or rdispls values overflow
876 
877   int overflow = 0;
878   if ((bigint) n*insize > MAXSMALLINT) overflow = 1;
879   if ((bigint) nrvous*insize > MAXSMALLINT) overflow = 1;
880   int overflowall;
881   MPI_Allreduce(&overflow,&overflowall,1,MPI_INT,MPI_MAX,world);
882   if (overflowall) error->all(FLERR,"Overflow input size in rendezvous_a2a");
883 
884   for (int i = 0; i < nprocs; i++) {
885     sendcount[i] *= insize;
886     sdispls[i] *= insize;
887     recvcount[i] *= insize;
888     rdispls[i] *= insize;
889   }
890 
891   // all2all comm of inbuf from caller decomp to rendezvous decomp
892 
893   char *inbuf_rvous = (char *) memory->smalloc((bigint) nrvous*insize,
894                                                "rendezvous:inbuf");
895 
896   MPI_Alltoallv(inbuf_a2a,sendcount,sdispls,MPI_CHAR,
897 		inbuf_rvous,recvcount,rdispls,MPI_CHAR,world);
898 
899   if (!inorder) {
900     memory->destroy(procs_a2a);
901     memory->sfree(inbuf_a2a);
902     memory->destroy(offsets);
903   }
904 
905   // done if callback is NULL, return inbuf_rvous
906 
907   if (!callback) {
908     memory->destroy(sendcount);
909     memory->destroy(recvcount);
910     memory->destroy(sdispls);
911     memory->destroy(rdispls);
912     outbuf = inbuf_rvous;
913     return nrvous;
914   }
915 
916   // peform rendezvous computation via callback()
917   // callback() allocates/populates proclist_rvous and outbuf_rvous
918 
919   int flag;
920   int *procs_rvous;
921   char *outbuf_rvous;
922 
923   int nrvous_out = callback(nrvous,inbuf_rvous,flag,
924                             procs_rvous,outbuf_rvous,ptr);
925 
926   if (flag != 1) memory->sfree(inbuf_rvous);  // outbuf_rvous = inbuf_vous
927   if (flag == 0) {
928     memory->destroy(sendcount);
929     memory->destroy(recvcount);
930     memory->destroy(sdispls);
931     memory->destroy(rdispls);
932     return 0;    // all nout_rvous are 0, no 2nd irregular
933   }
934 
935   // create procs and outbuf for All2all if necesary
936 
937   if (!outorder) {
938     memory->create(procs_a2a,nprocs,"rendezvous_a2a:procs");
939 
940     outbuf_a2a = (char *) memory->smalloc((bigint) nrvous_out*outsize,
941                                           "rendezvous:outbuf");
942     memory->create(offsets,nprocs,"rendezvous:offsets");
943 
944     for (int i = 0; i < nprocs; i++) procs_a2a[i] = 0;
945     for (int i = 0; i < nrvous_out; i++) procs_a2a[procs_rvous[i]]++;
946 
947     offsets[0] = 0;
948     for (int i = 1; i < nprocs; i++)
949       offsets[i] = offsets[i-1] + outsize*procs_a2a[i-1];
950 
951     bigint offset = 0;
952     for (int i = 0; i < nrvous_out; i++) {
953       iproc = procs_rvous[i];
954       memcpy(&outbuf_a2a[offsets[iproc]],&outbuf_rvous[offset],outsize);
955       offsets[iproc] += outsize;
956       offset += outsize;
957     }
958 
959     all2all2_bytes = nprocs*sizeof(int) + nprocs*sizeof(bigint) +
960       nrvous_out*outsize;
961 
962   } else {
963     procs_a2a = procs_rvous;
964     outbuf_a2a = outbuf_rvous;
965     all2all2_bytes = 0;
966   }
967 
968   // comm outbuf from rendezvous decomposition back to caller
969 
970   memcpy(sendcount,procs_a2a,nprocs*sizeof(int));
971 
972   MPI_Alltoall(sendcount,1,MPI_INT,recvcount,1,MPI_INT,world);
973 
974   sdispls[0] = rdispls[0] = 0;
975   for (int i = 1; i < nprocs; i++) {
976     sdispls[i] = sdispls[i-1] + sendcount[i-1];
977     rdispls[i] = rdispls[i-1] + recvcount[i-1];
978   }
979   int nout = rdispls[nprocs-1] + recvcount[nprocs-1];
980 
981   // test for overflow of outbuf due to imbalance or outsize
982   // means that individual sdispls or rdispls values overflow
983 
984   overflow = 0;
985   if ((bigint) nrvous*outsize > MAXSMALLINT) overflow = 1;
986   if ((bigint) nout*outsize > MAXSMALLINT) overflow = 1;
987   MPI_Allreduce(&overflow,&overflowall,1,MPI_INT,MPI_MAX,world);
988   if (overflowall) error->all(FLERR,"Overflow output in rendezvous_a2a");
989 
990   for (int i = 0; i < nprocs; i++) {
991     sendcount[i] *= outsize;
992     sdispls[i] *= outsize;
993     recvcount[i] *= outsize;
994     rdispls[i] *= outsize;
995   }
996 
997   // all2all comm of outbuf from rendezvous decomp back to caller decomp
998   // caller will free outbuf
999 
1000   outbuf = (char *) memory->smalloc((bigint) nout*outsize,"rendezvous:outbuf");
1001 
1002   MPI_Alltoallv(outbuf_a2a,sendcount,sdispls,MPI_CHAR,
1003 		outbuf,recvcount,rdispls,MPI_CHAR,world);
1004 
1005   memory->destroy(procs_rvous);
1006   memory->sfree(outbuf_rvous);
1007 
1008   if (!outorder) {
1009     memory->destroy(procs_a2a);
1010     memory->sfree(outbuf_a2a);
1011     memory->destroy(offsets);
1012   }
1013 
1014   // clean up
1015 
1016   memory->destroy(sendcount);
1017   memory->destroy(recvcount);
1018   memory->destroy(sdispls);
1019   memory->destroy(rdispls);
1020 
1021   // return number of datums
1022 
1023   if (!statflag) return nout;
1024 
1025   rendezvous_stats(n,insize,nout,outsize,nrvous,nrvous_out);
1026 
1027   /*
1028   rvous_bytes = 0;
1029   rvous_bytes += n*insize;                                // inbuf
1030   rvous_bytes += nout*outsize;                            // outbuf
1031   rvous_bytes += nrvous*insize;                           // inbuf_rvous
1032   rvous_bytes += nrvous_out*outsize;                      // outbuf_rvous
1033   rvous_bytes += nrvous_out*sizeof(int);                  // procs_rvous
1034   rvous_bytes += 4*nprocs*sizeof(int);                    // all2all vectors
1035   rvous_bytes += MAX(all2all1_bytes,all2all2_bytes);      // reorder ops
1036   */
1037 
1038   return nout;
1039 }
1040 
1041 /* ----------------------------------------------------------------------
1042    memory info for caller and rendezvous decompositions
1043 ------------------------------------------------------------------------- */
1044 
rendezvous_stats(int n,int insize,int nout,int outsize,int nrvous,int nrvous_out)1045 void Comm::rendezvous_stats(int n, int insize, int nout, int outsize,
1046                             int nrvous, int nrvous_out)
1047 {
1048   bigint size_in_all,size_in_max,size_in_min;
1049   bigint size_out_all,size_out_max,size_out_min;
1050   bigint size_inrvous_all,size_inrvous_max,size_inrvous_min;
1051   bigint size_outrvous_all,size_outrvous_max,size_outrvous_min;
1052 
1053   bigint size = (bigint) n*insize;
1054   MPI_Allreduce(&size,&size_in_all,1,MPI_SPARTA_BIGINT,MPI_SUM,world);
1055   MPI_Allreduce(&size,&size_in_max,1,MPI_SPARTA_BIGINT,MPI_MAX,world);
1056   MPI_Allreduce(&size,&size_in_min,1,MPI_SPARTA_BIGINT,MPI_MIN,world);
1057 
1058   size = (bigint) nout*outsize;
1059   MPI_Allreduce(&size,&size_out_all,1,MPI_SPARTA_BIGINT,MPI_SUM,world);
1060   MPI_Allreduce(&size,&size_out_max,1,MPI_SPARTA_BIGINT,MPI_MAX,world);
1061   MPI_Allreduce(&size,&size_out_min,1,MPI_SPARTA_BIGINT,MPI_MIN,world);
1062 
1063   size = (bigint) nrvous*insize;
1064   MPI_Allreduce(&size,&size_inrvous_all,1,MPI_SPARTA_BIGINT,MPI_SUM,world);
1065   MPI_Allreduce(&size,&size_inrvous_max,1,MPI_SPARTA_BIGINT,MPI_MAX,world);
1066   MPI_Allreduce(&size,&size_inrvous_min,1,MPI_SPARTA_BIGINT,MPI_MIN,world);
1067 
1068   size = (bigint) nrvous_out*insize;
1069   MPI_Allreduce(&size,&size_outrvous_all,1,MPI_SPARTA_BIGINT,MPI_SUM,world);
1070   MPI_Allreduce(&size,&size_outrvous_max,1,MPI_SPARTA_BIGINT,MPI_MAX,world);
1071   MPI_Allreduce(&size,&size_outrvous_min,1,MPI_SPARTA_BIGINT,MPI_MIN,world);
1072 
1073   int mbytes = 1024*1024;
1074 
1075   if (me == 0) {
1076     if (screen) {
1077       fprintf(screen,"Rendezvous balance and memory info:\n");
1078       fprintf(screen,"  input datum count "
1079               "(tot,ave,max,min): " BIGINT_FORMAT " %g "
1080               BIGINT_FORMAT " " BIGINT_FORMAT "\n",
1081               size_in_all/insize,1.0*size_in_all/nprocs/insize,
1082               size_in_max/insize,size_in_min/insize);
1083       fprintf(screen,"  input data (MB) "
1084               "(tot,ave,max,min): %g %g %g %g\n",
1085               1.0*size_in_all/mbytes,1.0*size_in_all/nprocs/mbytes,
1086               1.0*size_in_max/mbytes,1.0*size_in_min/mbytes);
1087       fprintf(screen,"  output datum count "
1088               "(tot,ave,max,min): " BIGINT_FORMAT " %g "
1089               BIGINT_FORMAT " " BIGINT_FORMAT "\n",
1090               size_out_all/outsize,1.0*size_out_all/nprocs/outsize,
1091               size_out_max/outsize,size_out_min/outsize);
1092       fprintf(screen,"  output data (MB) "
1093               "(tot,ave,max,min): %g %g %g %g\n",
1094               1.0*size_out_all/mbytes,1.0*size_out_all/nprocs/mbytes,
1095               1.0*size_out_max/mbytes,1.0*size_out_min/mbytes);
1096       fprintf(screen,"  input rvous datum count "
1097               "(tot,ave,max,min): " BIGINT_FORMAT " %g "
1098               BIGINT_FORMAT " " BIGINT_FORMAT "\n",
1099               size_inrvous_all/insize,1.0*size_inrvous_all/nprocs/insize,
1100               size_inrvous_max/insize,size_inrvous_min/insize);
1101       fprintf(screen,"  input rvous data (MB) "
1102               "(tot,ave,max,min): %g %g %g %g\n",
1103               1.0*size_inrvous_all/mbytes,1.0*size_inrvous_all/nprocs/mbytes,
1104               1.0*size_inrvous_max/mbytes,1.0*size_inrvous_min/mbytes);
1105       fprintf(screen,"  output rvous datum count "
1106               "(tot,ave,max,min): " BIGINT_FORMAT " %g "
1107               BIGINT_FORMAT " " BIGINT_FORMAT "\n",
1108               size_outrvous_all/outsize,1.0*size_outrvous_all/nprocs/outsize,
1109               size_outrvous_max/outsize,size_outrvous_min/outsize);
1110       fprintf(screen,"  output rvous data (MB) "
1111               "(tot,ave,max,min): %g %g %g %g\n",
1112               1.0*size_outrvous_all/mbytes,1.0*size_outrvous_all/nprocs/mbytes,
1113               1.0*size_outrvous_max/mbytes,1.0*size_outrvous_min/mbytes);
1114     }
1115   }
1116 }
1117