1 /* ----------------------------------------------------------------------
2 SPARTA - Stochastic PArallel Rarefied-gas Time-accurate Analyzer
3 http://sparta.sandia.gov
4 Steve Plimpton, sjplimp@sandia.gov, Michael Gallis, magalli@sandia.gov
5 Sandia National Laboratories
6
7 Copyright (2014) Sandia Corporation. Under the terms of Contract
8 DE-AC04-94AL85000 with Sandia Corporation, the U.S. Government retains
9 certain rights in this software. This software is distributed under
10 the GNU General Public License.
11
12 See the README file in the top-level SPARTA directory.
13 ------------------------------------------------------------------------- */
14
15 #include "mpi.h"
16 #include "stdlib.h"
17 #include "string.h"
18 #include "comm.h"
19 #include "irregular.h"
20 #include "particle.h"
21 #include "grid.h"
22 #include "surf.h"
23 #include "update.h"
24 #include "modify.h"
25 #include "adapt_grid.h"
26 #include "memory.h"
27 #include "error.h"
28
29 using namespace SPARTA_NS;
30
31 enum{PKEEP,PINSERT,PDONE,PDISCARD,PENTRY,PEXIT,PSURF}; // several files
32
33 /* ---------------------------------------------------------------------- */
34
Comm(SPARTA * sparta)35 Comm::Comm(SPARTA *sparta) : Pointers(sparta)
36 {
37 MPI_Comm_rank(world,&me);
38 MPI_Comm_size(world,&nprocs);
39
40 ncomm = 0;
41 commsortflag = 0;
42 commpartstyle = 1;
43
44 neighflag = 0;
45 neighlist = NULL;
46
47 iparticle = new Irregular(sparta);
48 igrid = NULL;
49 iuniform = NULL;
50
51 pproc = NULL;
52 maxpproc = 0;
53 gproc = gsize = NULL;
54 maxgproc = 0;
55 sbuf = rbuf = NULL;
56 maxsendbuf = maxrecvbuf = 0;
57
58 copymode = 0;
59 }
60
61 /* ---------------------------------------------------------------------- */
62
~Comm()63 Comm::~Comm()
64 {
65 if (copymode) return;
66
67 delete iparticle;
68 delete igrid;
69 delete iuniform;
70
71 memory->destroy(pproc);
72 memory->destroy(gproc);
73 memory->destroy(gsize);
74 memory->destroy(sbuf);
75 memory->destroy(rbuf);
76
77 memory->destroy(neighlist);
78 }
79
80 /* ----------------------------------------------------------------------
81 reset neighbor list used in particle comm and setup irregular for them
82 invoked after grid decomposition changes
83 no-op if commpartstyle not set or grid decomposition not clumped
84 since different mode of irregular comm will be done
85 ------------------------------------------------------------------------- */
86
reset_neighbors()87 void Comm::reset_neighbors()
88 {
89 neighflag = 0;
90 if (!commpartstyle || !grid->clumped) return;
91 neighflag = 1;
92
93 if (neighlist == NULL)
94 memory->create(neighlist,nprocs,"comm:neighlist");
95 for (int i = 0; i < nprocs; i++) neighlist[i] = 0;
96
97 Grid::ChildCell *cells = grid->cells;
98 int nglocal = grid->nlocal;
99 int ntotal = nglocal + grid->nghost;
100
101 for (int icell = nglocal; icell < ntotal; icell++)
102 neighlist[cells[icell].proc] = 1;
103 neighlist[me] = 0;
104
105 nneigh = 0;
106 for (int i = 0; i < nprocs; i++)
107 if (neighlist[i]) neighlist[nneigh++] = i;
108
109 iparticle->create_procs(nneigh,neighlist,commsortflag);
110 }
111
112 /* ----------------------------------------------------------------------
113 migrate particles to new procs after particle move
114 return particle nlocal after compression,
115 so Update can iterate on particle move
116 ------------------------------------------------------------------------- */
117
migrate_particles(int nmigrate,int * plist)118 int Comm::migrate_particles(int nmigrate, int *plist)
119 {
120 int i,j;
121
122 Grid::ChildCell *cells = grid->cells;
123 Particle::OnePart *particles = particle->particles;
124
125 int ncustom = particle->ncustom;
126 int nbytes_particle = sizeof(Particle::OnePart);
127 int nbytes_custom = particle->sizeof_custom();
128 int nbytes = nbytes_particle + nbytes_custom;
129
130 // grow pproc and sbuf if necessary
131
132 if (nmigrate > maxpproc) {
133 maxpproc = nmigrate;
134 memory->destroy(pproc);
135 memory->create(pproc,maxpproc,"comm:pproc");
136 }
137 if (nmigrate*nbytes > maxsendbuf) {
138 maxsendbuf = nmigrate*nbytes;
139 memory->destroy(sbuf);
140 memory->create(sbuf,maxsendbuf,"comm:sbuf");
141 }
142
143 // fill proclist with procs to send to
144 // pack sbuf with particles to migrate
145 // if flag == PDISCARD, particle is deleted but not sent
146 // change icell of migrated particle to owning cell on receiving proc
147 // nsend = particles that actually migrate
148 // if no custom attributes, pack particles directly via memcpy()
149 // else pack_custom() performs packing into sbuf
150
151 int nsend = 0;
152 int offset = 0;
153
154 if (!ncustom) {
155 for (i = 0; i < nmigrate; i++) {
156 j = plist[i];
157 if (particles[j].flag == PDISCARD) continue;
158 pproc[nsend++] = cells[particles[j].icell].proc;
159 particles[j].icell = cells[particles[j].icell].ilocal;
160 memcpy(&sbuf[offset],&particles[j],nbytes_particle);
161 offset += nbytes_particle;
162 }
163 } else {
164 for (i = 0; i < nmigrate; i++) {
165 j = plist[i];
166 if (particles[j].flag == PDISCARD) continue;
167 pproc[nsend++] = cells[particles[j].icell].proc;
168 particles[j].icell = cells[particles[j].icell].ilocal;
169 memcpy(&sbuf[offset],&particles[j],nbytes_particle);
170 offset += nbytes_particle;
171 particle->pack_custom(j,&sbuf[offset]);
172 offset += nbytes_custom;
173 }
174 }
175
176 // compress my list of particles
177
178 particle->compress_migrate(nmigrate,plist);
179 int ncompress = particle->nlocal;
180
181 // create or augment irregular communication plan
182 // nrecv = # of incoming particles
183
184 int nrecv;
185 if (neighflag)
186 nrecv = iparticle->augment_data_uniform(nsend,pproc);
187 else
188 nrecv = iparticle->create_data_uniform(nsend,pproc,commsortflag);
189
190 // extend particle list if necessary
191
192 particle->grow(nrecv);
193
194 // perform irregular communication
195 // if no custom attributes, append recv particles directly to particle list
196 // else receive into rbuf, unpack particles one by one via unpack_custom()
197
198 if (!ncustom)
199 iparticle->
200 exchange_uniform(sbuf,nbytes,
201 (char *) &particle->particles[particle->nlocal]);
202
203 else {
204 if (nrecv*nbytes > maxrecvbuf) {
205 maxrecvbuf = nrecv*nbytes;
206 memory->destroy(rbuf);
207 memory->create(rbuf,maxrecvbuf,"comm:rbuf");
208 }
209
210 iparticle->exchange_uniform(sbuf,nbytes,rbuf);
211
212 offset = 0;
213 int nlocal = particle->nlocal;
214 for (i = 0; i < nrecv; i++) {
215 memcpy(&particle->particles[nlocal],&rbuf[offset],nbytes_particle);
216 offset += nbytes_particle;
217 particle->unpack_custom(&rbuf[offset],nlocal);
218 offset += nbytes_custom;
219 nlocal++;
220 }
221 }
222
223 particle->nlocal += nrecv;
224 ncomm += nsend;
225 return ncompress;
226 }
227
228 /* ----------------------------------------------------------------------
229 migrate grid cells with their particles to new procs
230 called from BalanceGrid and FixBalance
231 ------------------------------------------------------------------------- */
232
migrate_cells(int nmigrate)233 void Comm::migrate_cells(int nmigrate)
234 {
235 if (update->have_mem_limit())
236 return migrate_cells_less_memory(nmigrate);
237
238 int i,n;
239
240 Grid::ChildCell *cells = grid->cells;
241 int nglocal = grid->nlocal;
242
243 // grow proc and size lists if needed
244
245 if (nmigrate > maxgproc) {
246 maxgproc = nmigrate;
247 memory->destroy(gproc);
248 memory->destroy(gsize);
249 memory->create(gproc,maxgproc,"comm:gproc");
250 memory->create(gsize,maxgproc,"comm:gsize");
251 }
252
253 // fill proclist with procs to send to
254 // compute byte count needed to pack cells
255
256 int nsend = 0;
257 bigint boffset = 0;
258 for (int icell = 0; icell < nglocal; icell++) {
259 if (cells[icell].nsplit <= 0) continue;
260 if (cells[icell].proc == me) continue;
261 gproc[nsend] = cells[icell].proc;
262 n = grid->pack_one(icell,NULL,1,1,1,0);
263 gsize[nsend++] = n;
264 boffset += n;
265 }
266
267 if (boffset > MAXSMALLINT)
268 error->one(FLERR,"Migrate cells send buffer exceeds 2 GB");
269 int offset = boffset;
270
271 // reallocate sbuf as needed
272
273 if (offset > maxsendbuf) {
274 memory->destroy(sbuf);
275 maxsendbuf = offset;
276 memory->create(sbuf,maxsendbuf,"comm:sbuf");
277 memset(sbuf,0,maxsendbuf);
278 }
279
280 // pack cell info into sbuf
281 // only called for unsplit and split cells I no longer own
282
283 offset = 0;
284 for (int icell = 0; icell < nglocal; icell++) {
285 if (cells[icell].nsplit <= 0) continue;
286 if (cells[icell].proc == me) continue;
287 offset += grid->pack_one(icell,&sbuf[offset],1,1,1,1);
288 }
289
290 // compress my list of owned implicit surfs, resets csurfs in kept cells
291 // compress my list of owned grid cells to remove migrating cells
292 // compress my list of owned distributed/explicit surfs
293 // compress particle list to remove particles in migrating cells
294 // procs with no migrating cells must also unset particle sorted
295 // since compress_rebalance() unsets it
296
297 if (nmigrate) {
298 if (surf->implicit) surf->compress_implicit();
299 grid->compress();
300 if (surf->distributed && !surf->implicit) surf->compress_explicit();
301 particle->compress_rebalance();
302 } else particle->sorted = 0;
303
304 // create irregular communication plan with variable size datums
305 // nrecv = # of incoming grid cells
306 // recvsize = total byte size of incoming grid + particle info
307 // DEBUG: append a sort=1 arg so that messages from other procs
308 // are received in repeatable order, thus grid cells stay in order
309
310 if (!igrid) igrid = new Irregular(sparta);
311 int recvsize;
312 int nrecv = igrid->create_data_variable(nmigrate,gproc,gsize,
313 recvsize,commsortflag);
314
315 // reallocate rbuf as needed
316
317 if (recvsize > maxrecvbuf) {
318 memory->destroy(rbuf);
319 maxrecvbuf = recvsize;
320 memory->create(rbuf,maxrecvbuf,"comm:rbuf");
321 memset(rbuf,0,maxrecvbuf);
322 }
323
324 // perform irregular communication
325
326 igrid->exchange_variable(sbuf,gsize,rbuf);
327
328 // unpack received grid cells with their particles
329
330 offset = 0;
331 for (i = 0; i < nrecv; i++)
332 offset += grid->unpack_one(&rbuf[offset],1,1,1);
333 }
334
335 /* ----------------------------------------------------------------------
336 migrate grid cells with their particles to new procs
337 called from BalanceGrid and FixBalance
338 uses multiple comm passes to reduce buffer size
339 ------------------------------------------------------------------------- */
340
migrate_cells_less_memory(int nmigrate)341 void Comm::migrate_cells_less_memory(int nmigrate)
342 {
343 int i,n;
344
345 // grow proc and size lists if needed
346
347 if (nmigrate > maxgproc) {
348 maxgproc = nmigrate;
349 memory->destroy(gproc);
350 memory->destroy(gsize);
351 memory->create(gproc,maxgproc,"comm:gproc");
352 memory->create(gsize,maxgproc,"comm:gsize");
353 }
354
355 // fill proclist with procs to send to
356 // compute byte count needed to pack cells
357
358 int icell_start = 0;
359 int icell_end = grid->nlocal;
360 int not_done = 1;
361 int nglocal = grid->nlocal;
362
363 while (not_done) {
364 Grid::ChildCell *cells = grid->cells;
365
366 int nsend = 0;
367 bigint boffset = 0;
368 for (int icell = icell_start; icell < nglocal; icell++) {
369 icell_end = icell+1;
370 if (cells[icell].nsplit <= 0) continue;
371 if (cells[icell].proc == me) continue;
372 gproc[nsend] = cells[icell].proc;
373 n = grid->pack_one(icell,NULL,1,1,1,0);
374 if (n > 0 && boffset > 0 && boffset+n > update->global_mem_limit) {
375 icell_end -= 1;
376 break;
377 }
378 gsize[nsend++] = n;
379 boffset += n;
380 }
381
382 if (boffset > MAXSMALLINT)
383 error->one(FLERR,"Migrate cells send buffer exceeds 2 GB");
384 int offset = boffset;
385
386 // reallocate sbuf as needed
387
388 if (offset > maxsendbuf) {
389 memory->destroy(sbuf);
390 maxsendbuf = offset;
391 memory->create(sbuf,maxsendbuf,"comm:sbuf");
392 memset(sbuf,0,maxsendbuf);
393 }
394
395 // pack cell info into sbuf
396 // only called for unsplit and split cells I no longer own
397
398 offset = 0;
399 for (int icell = icell_start; icell < icell_end; icell++) {
400 if (cells[icell].nsplit <= 0) continue;
401 if (cells[icell].proc == me) continue;
402 offset += grid->pack_one(icell,&sbuf[offset],1,1,1,1);
403 }
404
405 // compress particle list to remove particles in migrating cells
406
407 if (nmigrate) particle->compress_rebalance_sorted();
408
409 // create irregular communication plan with variable size datums
410 // nrecv = # of incoming grid cells
411 // recvsize = total byte size of incoming grid + particle info
412 // DEBUG: append a sort=1 arg so that messages from other procs
413 // are received in repeatable order, thus grid cells stay in order
414
415 if (!igrid) igrid = new Irregular(sparta);
416 int recvsize;
417 int nrecv = igrid->create_data_variable(nsend,gproc,gsize,
418 recvsize,commsortflag);
419
420 // reallocate rbuf as needed
421
422 if (recvsize > maxrecvbuf) {
423 memory->destroy(rbuf);
424 maxrecvbuf = recvsize;
425 memory->create(rbuf,maxrecvbuf,"comm:rbuf");
426 memset(rbuf,0,maxrecvbuf);
427 }
428
429 // perform irregular communication
430
431 igrid->exchange_variable(sbuf,gsize,rbuf);
432
433 // unpack received grid cells with their particles
434 // set unpack_one() sortflag arg to keep new particles sorted
435
436 offset = 0;
437 for (i = 0; i < nrecv; i++)
438 offset += grid->unpack_one(&rbuf[offset],1,1,1,1);
439
440 // deallocate large buffers to reduce memory footprint
441 // also deallocate igrid for same reason
442
443 if (sbuf) memory->destroy(sbuf);
444 sbuf = NULL;
445 maxsendbuf = 0;
446
447 if (rbuf) memory->destroy(rbuf);
448 rbuf = NULL;
449 maxrecvbuf = 0;
450
451 delete igrid;
452 igrid = NULL;
453
454 icell_start = icell_end;
455 int not_done_local = icell_start < nglocal;
456 MPI_Allreduce(¬_done_local,¬_done,1,MPI_INT,MPI_SUM,world);
457 }
458
459 // compress my list of owned implicit surfs, resets csurfs in kept cells
460 // compress my list of owned grid cells to remove migrated cells
461
462 if (nmigrate) {
463 if (surf->implicit) surf->compress_implicit();
464 grid->compress();
465 if (surf->distributed && !surf->implicit) surf->compress_explicit();
466 }
467 }
468
469 /* ----------------------------------------------------------------------
470 send grid cell info with their surfs/particles needed for grid adaptation
471 return # of received cells and buf = ptr to received cell info
472 called from AdaptGrid
473 ------------------------------------------------------------------------- */
474
send_cells_adapt(int nsend,int * procsend,char * inbuf,char ** outbuf)475 int Comm::send_cells_adapt(int nsend, int *procsend, char *inbuf, char **outbuf)
476 {
477 int i,n;
478
479 AdaptGrid::SendAdapt *sadapt = (AdaptGrid::SendAdapt *) inbuf;
480
481 // grow size list if needed
482 // don't use gproc, but needs to stay same size as gsize
483
484 if (nsend > maxgproc) {
485 maxgproc = nsend;
486 memory->destroy(gproc);
487 memory->destroy(gsize);
488 memory->create(gproc,maxgproc,"comm:gproc");
489 memory->create(gsize,maxgproc,"comm:gsize");
490 }
491
492 // compute byte count needed to pack cells
493
494 bigint boffset = 0;
495 for (i = 0; i < nsend; i++) {
496 n = grid->pack_one_adapt((char *) &sadapt[i],NULL,0);
497 gsize[i] = n;
498 boffset += n;
499 }
500
501 if (boffset > MAXSMALLINT)
502 error->one(FLERR,"Adapt grid send buffer exceeds 2 GB");
503 int offset = boffset;
504
505 // reallocate sbuf as needed
506
507 if (offset > maxsendbuf) {
508 memory->destroy(sbuf);
509 maxsendbuf = offset;
510 memory->create(sbuf,maxsendbuf,"comm:sbuf");
511 memset(sbuf,0,maxsendbuf);
512 }
513
514 // pack cell info into sbuf
515 // only called for unsplit and split cells I no longer own
516
517 offset = 0;
518 for (i = 0; i < nsend; i++)
519 offset += grid->pack_one_adapt((char *) &sadapt[i],&sbuf[offset],1);
520
521 // create irregular communication plan with variable size datums
522 // nrecv = # of incoming grid cells
523 // recvsize = total byte size of incoming grid + particle info
524 // DEBUG: append a sort=1 arg so that messages from other procs
525 // are received in repeatable order, thus grid cells stay in order
526
527 if (!igrid) igrid = new Irregular(sparta);
528 int recvsize;
529 int nrecv =
530 igrid->create_data_variable(nsend,procsend,gsize,recvsize,commsortflag);
531
532 // reallocate rbuf as needed
533
534 if (recvsize > maxrecvbuf) {
535 memory->destroy(rbuf);
536 maxrecvbuf = recvsize;
537 memory->create(rbuf,maxrecvbuf,"comm:rbuf");
538 memset(rbuf,0,maxrecvbuf);
539 }
540
541 // perform irregular communication
542
543 igrid->exchange_variable(sbuf,gsize,rbuf);
544
545 // return rbuf and grid cell count
546
547 *outbuf = rbuf;
548 return nrecv;
549 }
550
551 /* ----------------------------------------------------------------------
552 wrapper on irregular comm of datums of uniform size
553 receiving procs are neighbor procs of my owned grid cells, not including self
554 so can use same comm calls as migrate_particles (if neighflag is set)
555 via iparticle->augment_data_uniform() and exchange_uniform()
556 otherwise same logic as irregular_uniform()
557 called from FixAblate
558 ------------------------------------------------------------------------- */
559
irregular_uniform_neighs(int nsend,int * procsend,char * inbuf,int nsize,char ** outbuf)560 int Comm::irregular_uniform_neighs(int nsend, int *procsend,
561 char *inbuf, int nsize, char **outbuf)
562 {
563 // if neighflag, use iparticle
564 // else one-time create of irregular comm plan with constant size datums
565 // nrecv = # of incoming grid cells
566
567 if (!neighflag && !iuniform) iuniform = new Irregular(sparta);
568
569 int nrecv;
570 if (neighflag)
571 nrecv = iparticle->augment_data_uniform(nsend,procsend);
572 else
573 nrecv = iuniform->create_data_uniform(nsend,procsend,commsortflag);
574
575 // reallocate rbuf as needed
576
577 if (nrecv*nsize > maxrecvbuf) {
578 memory->destroy(rbuf);
579 maxrecvbuf = nrecv*nsize;
580 memory->create(rbuf,maxrecvbuf,"comm:rbuf");
581 memset(rbuf,0,maxrecvbuf);
582 }
583
584 // perform irregular communication
585
586 if (neighflag)
587 iparticle->exchange_uniform(inbuf,nsize,rbuf);
588 else
589 iuniform->exchange_uniform(inbuf,nsize,rbuf);
590
591 // return rbuf and grid cell count
592
593 *outbuf = rbuf;
594 return nrecv;
595 }
596
597 /* ----------------------------------------------------------------------
598 wrapper on irregular comm of datums of uniform size
599 receiving procs can be anyone, including self
600 use create_data_uniform() and exchange_uniform()
601 called from AdaptGrid
602 ------------------------------------------------------------------------- */
603
irregular_uniform(int nsend,int * procsend,char * inbuf,int nsize,char ** outbuf)604 int Comm::irregular_uniform(int nsend, int *procsend,
605 char *inbuf, int nsize, char **outbuf)
606 {
607 // one-time create of irregular comm plan with constant size datums
608 // nrecv = # of incoming grid cells
609
610 if (!iuniform) iuniform = new Irregular(sparta);
611 int nrecv = iuniform->create_data_uniform(nsend,procsend,commsortflag);
612
613 // reallocate rbuf as needed
614
615 if (nrecv*nsize > maxrecvbuf) {
616 memory->destroy(rbuf);
617 maxrecvbuf = nrecv*nsize;
618 memory->create(rbuf,maxrecvbuf,"comm:rbuf");
619 memset(rbuf,0,maxrecvbuf);
620 }
621
622 // perform irregular communication
623
624 iuniform->exchange_uniform(inbuf,nsize,rbuf);
625
626 // return rbuf and grid cell count
627
628 *outbuf = rbuf;
629 return nrecv;
630 }
631
632 /* ----------------------------------------------------------------------
633 communicate inbuf around full ring of processors with messtag
634 nbytes = size of inbuf = n datums * nper bytes
635 callback() is invoked to allow caller to process/update each proc's inbuf
636 note that callback() is invoked on final iteration for original inbuf
637 for non-NULL outbuf, final updated inbuf is copied to it
638 outbuf = inbuf is OK
639 ------------------------------------------------------------------------- */
640
ring(int n,int nper,void * inbuf,int messtag,void (* callback)(int,char *,void *),void * outbuf,int self,void * ptr)641 void Comm::ring(int n, int nper, void *inbuf, int messtag,
642 void (*callback)(int, char *, void *), void *outbuf, int self,
643 void *ptr)
644 {
645 MPI_Request request;
646 MPI_Status status;
647
648 int nbytes = n*nper;
649 int maxbytes;
650 MPI_Allreduce(&nbytes,&maxbytes,1,MPI_INT,MPI_MAX,world);
651
652 char *buf,*bufcopy;
653 memory->create(buf,maxbytes,"comm:buf");
654 memory->create(bufcopy,maxbytes,"comm:bufcopy");
655 memcpy(buf,inbuf,nbytes);
656
657 int next = me + 1;
658 int prev = me - 1;
659 if (next == nprocs) next = 0;
660 if (prev < 0) prev = nprocs - 1;
661
662 for (int loop = 0; loop < nprocs; loop++) {
663 if (me != next) {
664 MPI_Irecv(bufcopy,maxbytes,MPI_CHAR,prev,messtag,world,&request);
665 MPI_Send(buf,nbytes,MPI_CHAR,next,messtag,world);
666 MPI_Wait(&request,&status);
667 MPI_Get_count(&status,MPI_CHAR,&nbytes);
668 memcpy(buf,bufcopy,nbytes);
669 }
670 if (self || loop != nprocs-1) callback(nbytes/nper,buf,ptr);
671 }
672
673 if (outbuf) memcpy(outbuf,buf,nbytes);
674
675 memory->destroy(buf);
676 memory->destroy(bufcopy);
677 }
678
679 /* ----------------------------------------------------------------------
680 rendezvous communication operation
681 three stages:
682 first comm sends inbuf from caller decomp to rvous decomp
683 callback operates on data in rendevous decomp
684 second comm sends outbuf from rvous decomp back to caller decomp
685 inputs:
686 which = perform (0) irregular or (1) MPI_All2allv communication
687 n = # of datums in inbuf
688 inbuf = vector of input datums
689 insize = byte size of each input datum
690 inorder = 0 for inbuf in random proc order, 1 for datums ordered by proc
691 procs: inorder 0 = proc to send each datum to, 1 = # of datums/proc,
692 callback = caller function to invoke in rendezvous decomposition
693 takes input datums, returns output datums
694 outorder = same as inorder, but for datums returned by callback()
695 ptr = pointer to caller class, passed to callback()
696 statflag = 1 for stats output, else 0
697 outputs:
698 nout = # of output datums (function return)
699 outbuf = vector of output datums
700 outsize = byte size of each output datum
701 callback inputs:
702 nrvous = # of rvous decomp datums in inbuf_rvous
703 inbuf_rvous = vector of rvous decomp input datums
704 ptr = pointer to caller class
705 callback outputs:
706 nrvous_out = # of rvous decomp output datums (function return)
707 flag = 0 for no second comm, 1 for outbuf_rvous = inbuf_rvous,
708 2 for second comm with new outbuf_rvous
709 procs_rvous = outorder 0 = proc to send each datum to, 1 = # of datums/proc
710 allocated
711 outbuf_rvous = vector of rvous decomp output datums
712 NOTE: could use MPI_INT or MPI_DOUBLE insead of MPI_CHAR
713 to avoid checked-for overflow in MPI_Alltoallv?
714 ------------------------------------------------------------------------- */
715
716 int Comm::
rendezvous(int which,int n,char * inbuf,int insize,int inorder,int * procs,int (* callback)(int,char *,int &,int * &,char * &,void *),int outorder,char * & outbuf,int outsize,void * ptr,int statflag)717 rendezvous(int which, int n, char *inbuf, int insize,
718 int inorder, int *procs,
719 int (*callback)(int, char *, int &, int *&, char *&, void *),
720 int outorder, char *&outbuf, int outsize, void *ptr, int statflag)
721 {
722 if (which == 0)
723 return rendezvous_irregular(n,inbuf,insize,inorder,procs,callback,
724 outorder,outbuf,outsize,ptr,statflag);
725 else
726 return rendezvous_all2all(n,inbuf,insize,inorder,procs,callback,
727 outorder,outbuf,outsize,ptr,statflag);
728 }
729
730 /* ---------------------------------------------------------------------- */
731
732 int Comm::
rendezvous_irregular(int n,char * inbuf,int insize,int inorder,int * procs,int (* callback)(int,char *,int &,int * &,char * &,void *),int outorder,char * & outbuf,int outsize,void * ptr,int statflag)733 rendezvous_irregular(int n, char *inbuf, int insize, int inorder, int *procs,
734 int (*callback)(int, char *, int &, int *&, char *&, void *),
735 int outorder, char *&outbuf,
736 int outsize, void *ptr, int statflag)
737 {
738 // irregular comm of inbuf from caller decomp to rendezvous decomp
739
740 Irregular *irregular = new Irregular(sparta);
741
742 int nrvous;
743 if (inorder) nrvous = irregular->create_data_uniform_grouped(n,procs);
744 else nrvous = irregular->create_data_uniform(n,procs);
745
746 char *inbuf_rvous = (char *) memory->smalloc((bigint) nrvous*insize,
747 "rendezvous:inbuf");
748 irregular->exchange_uniform(inbuf,insize,inbuf_rvous);
749
750 bigint irregular1_bytes = 0; // irregular->irregular_bytes;
751 delete irregular;
752
753 // done if callback is NULL, return inbuf_rvous
754
755 if (!callback) {
756 outbuf = inbuf_rvous;
757 return nrvous;
758 }
759
760 // peform rendezvous computation via callback()
761 // callback() allocates/populates proclist_rvous and outbuf_rvous
762
763 int flag;
764 int *procs_rvous;
765 char *outbuf_rvous;
766 int nrvous_out = callback(nrvous,inbuf_rvous,flag,
767 procs_rvous,outbuf_rvous,ptr);
768
769 if (flag != 1) memory->sfree(inbuf_rvous); // outbuf_rvous = inbuf_vous
770 if (flag == 0) return 0; // all nout_rvous are 0, no 2nd comm stage
771
772 // irregular comm of outbuf from rendezvous decomp back to caller decomp
773 // caller will free outbuf
774
775 irregular = new Irregular(sparta);
776
777 int nout;
778 if (outorder)
779 nout = irregular->create_data_uniform_grouped(nrvous_out,procs_rvous);
780 else nout = irregular->create_data_uniform(nrvous_out,procs_rvous);
781
782 outbuf = (char *) memory->smalloc((bigint) nout*outsize,
783 "rendezvous:outbuf");
784 irregular->exchange_uniform(outbuf_rvous,outsize,outbuf);
785
786 bigint irregular2_bytes = 0; // irregular->irregular_bytes;
787 delete irregular;
788
789 memory->destroy(procs_rvous);
790 memory->sfree(outbuf_rvous);
791
792 // return number of output datums
793
794 if (!statflag) return nout;
795
796 rendezvous_stats(n,insize,nout,outsize,nrvous,nrvous_out);
797
798 /*
799 rvous_bytes = 0;
800 rvous_bytes += n*insize; // inbuf
801 rvous_bytes += nout*outsize; // outbuf
802 rvous_bytes += nrvous*insize; // inbuf_rvous
803 rvous_bytes += nrvous_out*outsize; // outbuf_rvous
804 rvous_bytes += nrvous_out*sizeof(int); // procs_rvous
805 rvous_bytes += MAX(irregular1_bytes,irregular2_bytes); // max of 2 comms
806 */
807
808 return nout;
809 }
810
811 /* ---------------------------------------------------------------------- */
812
813 int Comm::
rendezvous_all2all(int n,char * inbuf,int insize,int inorder,int * procs,int (* callback)(int,char *,int &,int * &,char * &,void *),int outorder,char * & outbuf,int outsize,void * ptr,int statflag)814 rendezvous_all2all(int n, char *inbuf, int insize, int inorder, int *procs,
815 int (*callback)(int, char *, int &, int *&, char *&, void *),
816 int outorder, char *&outbuf, int outsize, void *ptr,
817 int statflag)
818 {
819 int iproc;
820 bigint all2all1_bytes,all2all2_bytes;
821 int *sendcount,*sdispls,*recvcount,*rdispls;
822 int *procs_a2a;
823 bigint *offsets;
824 char *inbuf_a2a,*outbuf_a2a;
825
826 // create procs and inbuf for All2all if necesary
827
828 if (!inorder) {
829 memory->create(procs_a2a,nprocs,"rendezvous:procs");
830 inbuf_a2a = (char *) memory->smalloc((bigint) n*insize,
831 "rendezvous:inbuf");
832 memory->create(offsets,nprocs,"rendezvous:offsets");
833
834 for (int i = 0; i < nprocs; i++) procs_a2a[i] = 0;
835 for (int i = 0; i < n; i++) procs_a2a[procs[i]]++;
836
837 offsets[0] = 0;
838 for (int i = 1; i < nprocs; i++)
839 offsets[i] = offsets[i-1] + insize*procs_a2a[i-1];
840
841 bigint offset = 0;
842 for (int i = 0; i < n; i++) {
843 iproc = procs[i];
844 memcpy(&inbuf_a2a[offsets[iproc]],&inbuf[offset],insize);
845 offsets[iproc] += insize;
846 offset += insize;
847 }
848
849 all2all1_bytes = nprocs*sizeof(int) + nprocs*sizeof(bigint) + n*insize;
850
851 } else {
852 procs_a2a = procs;
853 inbuf_a2a = inbuf;
854 all2all1_bytes = 0;
855 }
856
857 // create args for MPI_Alltoallv() on input data
858
859 memory->create(sendcount,nprocs,"rendezvous:sendcount");
860 memcpy(sendcount,procs_a2a,nprocs*sizeof(int));
861
862 memory->create(recvcount,nprocs,"rendezvous:recvcount");
863 MPI_Alltoall(sendcount,1,MPI_INT,recvcount,1,MPI_INT,world);
864
865 memory->create(sdispls,nprocs,"rendezvous:sdispls");
866 memory->create(rdispls,nprocs,"rendezvous:rdispls");
867 sdispls[0] = rdispls[0] = 0;
868 for (int i = 1; i < nprocs; i++) {
869 sdispls[i] = sdispls[i-1] + sendcount[i-1];
870 rdispls[i] = rdispls[i-1] + recvcount[i-1];
871 }
872 int nrvous = rdispls[nprocs-1] + recvcount[nprocs-1];
873
874 // test for overflow of input data due to imbalance or insize
875 // means that individual sdispls or rdispls values overflow
876
877 int overflow = 0;
878 if ((bigint) n*insize > MAXSMALLINT) overflow = 1;
879 if ((bigint) nrvous*insize > MAXSMALLINT) overflow = 1;
880 int overflowall;
881 MPI_Allreduce(&overflow,&overflowall,1,MPI_INT,MPI_MAX,world);
882 if (overflowall) error->all(FLERR,"Overflow input size in rendezvous_a2a");
883
884 for (int i = 0; i < nprocs; i++) {
885 sendcount[i] *= insize;
886 sdispls[i] *= insize;
887 recvcount[i] *= insize;
888 rdispls[i] *= insize;
889 }
890
891 // all2all comm of inbuf from caller decomp to rendezvous decomp
892
893 char *inbuf_rvous = (char *) memory->smalloc((bigint) nrvous*insize,
894 "rendezvous:inbuf");
895
896 MPI_Alltoallv(inbuf_a2a,sendcount,sdispls,MPI_CHAR,
897 inbuf_rvous,recvcount,rdispls,MPI_CHAR,world);
898
899 if (!inorder) {
900 memory->destroy(procs_a2a);
901 memory->sfree(inbuf_a2a);
902 memory->destroy(offsets);
903 }
904
905 // done if callback is NULL, return inbuf_rvous
906
907 if (!callback) {
908 memory->destroy(sendcount);
909 memory->destroy(recvcount);
910 memory->destroy(sdispls);
911 memory->destroy(rdispls);
912 outbuf = inbuf_rvous;
913 return nrvous;
914 }
915
916 // peform rendezvous computation via callback()
917 // callback() allocates/populates proclist_rvous and outbuf_rvous
918
919 int flag;
920 int *procs_rvous;
921 char *outbuf_rvous;
922
923 int nrvous_out = callback(nrvous,inbuf_rvous,flag,
924 procs_rvous,outbuf_rvous,ptr);
925
926 if (flag != 1) memory->sfree(inbuf_rvous); // outbuf_rvous = inbuf_vous
927 if (flag == 0) {
928 memory->destroy(sendcount);
929 memory->destroy(recvcount);
930 memory->destroy(sdispls);
931 memory->destroy(rdispls);
932 return 0; // all nout_rvous are 0, no 2nd irregular
933 }
934
935 // create procs and outbuf for All2all if necesary
936
937 if (!outorder) {
938 memory->create(procs_a2a,nprocs,"rendezvous_a2a:procs");
939
940 outbuf_a2a = (char *) memory->smalloc((bigint) nrvous_out*outsize,
941 "rendezvous:outbuf");
942 memory->create(offsets,nprocs,"rendezvous:offsets");
943
944 for (int i = 0; i < nprocs; i++) procs_a2a[i] = 0;
945 for (int i = 0; i < nrvous_out; i++) procs_a2a[procs_rvous[i]]++;
946
947 offsets[0] = 0;
948 for (int i = 1; i < nprocs; i++)
949 offsets[i] = offsets[i-1] + outsize*procs_a2a[i-1];
950
951 bigint offset = 0;
952 for (int i = 0; i < nrvous_out; i++) {
953 iproc = procs_rvous[i];
954 memcpy(&outbuf_a2a[offsets[iproc]],&outbuf_rvous[offset],outsize);
955 offsets[iproc] += outsize;
956 offset += outsize;
957 }
958
959 all2all2_bytes = nprocs*sizeof(int) + nprocs*sizeof(bigint) +
960 nrvous_out*outsize;
961
962 } else {
963 procs_a2a = procs_rvous;
964 outbuf_a2a = outbuf_rvous;
965 all2all2_bytes = 0;
966 }
967
968 // comm outbuf from rendezvous decomposition back to caller
969
970 memcpy(sendcount,procs_a2a,nprocs*sizeof(int));
971
972 MPI_Alltoall(sendcount,1,MPI_INT,recvcount,1,MPI_INT,world);
973
974 sdispls[0] = rdispls[0] = 0;
975 for (int i = 1; i < nprocs; i++) {
976 sdispls[i] = sdispls[i-1] + sendcount[i-1];
977 rdispls[i] = rdispls[i-1] + recvcount[i-1];
978 }
979 int nout = rdispls[nprocs-1] + recvcount[nprocs-1];
980
981 // test for overflow of outbuf due to imbalance or outsize
982 // means that individual sdispls or rdispls values overflow
983
984 overflow = 0;
985 if ((bigint) nrvous*outsize > MAXSMALLINT) overflow = 1;
986 if ((bigint) nout*outsize > MAXSMALLINT) overflow = 1;
987 MPI_Allreduce(&overflow,&overflowall,1,MPI_INT,MPI_MAX,world);
988 if (overflowall) error->all(FLERR,"Overflow output in rendezvous_a2a");
989
990 for (int i = 0; i < nprocs; i++) {
991 sendcount[i] *= outsize;
992 sdispls[i] *= outsize;
993 recvcount[i] *= outsize;
994 rdispls[i] *= outsize;
995 }
996
997 // all2all comm of outbuf from rendezvous decomp back to caller decomp
998 // caller will free outbuf
999
1000 outbuf = (char *) memory->smalloc((bigint) nout*outsize,"rendezvous:outbuf");
1001
1002 MPI_Alltoallv(outbuf_a2a,sendcount,sdispls,MPI_CHAR,
1003 outbuf,recvcount,rdispls,MPI_CHAR,world);
1004
1005 memory->destroy(procs_rvous);
1006 memory->sfree(outbuf_rvous);
1007
1008 if (!outorder) {
1009 memory->destroy(procs_a2a);
1010 memory->sfree(outbuf_a2a);
1011 memory->destroy(offsets);
1012 }
1013
1014 // clean up
1015
1016 memory->destroy(sendcount);
1017 memory->destroy(recvcount);
1018 memory->destroy(sdispls);
1019 memory->destroy(rdispls);
1020
1021 // return number of datums
1022
1023 if (!statflag) return nout;
1024
1025 rendezvous_stats(n,insize,nout,outsize,nrvous,nrvous_out);
1026
1027 /*
1028 rvous_bytes = 0;
1029 rvous_bytes += n*insize; // inbuf
1030 rvous_bytes += nout*outsize; // outbuf
1031 rvous_bytes += nrvous*insize; // inbuf_rvous
1032 rvous_bytes += nrvous_out*outsize; // outbuf_rvous
1033 rvous_bytes += nrvous_out*sizeof(int); // procs_rvous
1034 rvous_bytes += 4*nprocs*sizeof(int); // all2all vectors
1035 rvous_bytes += MAX(all2all1_bytes,all2all2_bytes); // reorder ops
1036 */
1037
1038 return nout;
1039 }
1040
1041 /* ----------------------------------------------------------------------
1042 memory info for caller and rendezvous decompositions
1043 ------------------------------------------------------------------------- */
1044
rendezvous_stats(int n,int insize,int nout,int outsize,int nrvous,int nrvous_out)1045 void Comm::rendezvous_stats(int n, int insize, int nout, int outsize,
1046 int nrvous, int nrvous_out)
1047 {
1048 bigint size_in_all,size_in_max,size_in_min;
1049 bigint size_out_all,size_out_max,size_out_min;
1050 bigint size_inrvous_all,size_inrvous_max,size_inrvous_min;
1051 bigint size_outrvous_all,size_outrvous_max,size_outrvous_min;
1052
1053 bigint size = (bigint) n*insize;
1054 MPI_Allreduce(&size,&size_in_all,1,MPI_SPARTA_BIGINT,MPI_SUM,world);
1055 MPI_Allreduce(&size,&size_in_max,1,MPI_SPARTA_BIGINT,MPI_MAX,world);
1056 MPI_Allreduce(&size,&size_in_min,1,MPI_SPARTA_BIGINT,MPI_MIN,world);
1057
1058 size = (bigint) nout*outsize;
1059 MPI_Allreduce(&size,&size_out_all,1,MPI_SPARTA_BIGINT,MPI_SUM,world);
1060 MPI_Allreduce(&size,&size_out_max,1,MPI_SPARTA_BIGINT,MPI_MAX,world);
1061 MPI_Allreduce(&size,&size_out_min,1,MPI_SPARTA_BIGINT,MPI_MIN,world);
1062
1063 size = (bigint) nrvous*insize;
1064 MPI_Allreduce(&size,&size_inrvous_all,1,MPI_SPARTA_BIGINT,MPI_SUM,world);
1065 MPI_Allreduce(&size,&size_inrvous_max,1,MPI_SPARTA_BIGINT,MPI_MAX,world);
1066 MPI_Allreduce(&size,&size_inrvous_min,1,MPI_SPARTA_BIGINT,MPI_MIN,world);
1067
1068 size = (bigint) nrvous_out*insize;
1069 MPI_Allreduce(&size,&size_outrvous_all,1,MPI_SPARTA_BIGINT,MPI_SUM,world);
1070 MPI_Allreduce(&size,&size_outrvous_max,1,MPI_SPARTA_BIGINT,MPI_MAX,world);
1071 MPI_Allreduce(&size,&size_outrvous_min,1,MPI_SPARTA_BIGINT,MPI_MIN,world);
1072
1073 int mbytes = 1024*1024;
1074
1075 if (me == 0) {
1076 if (screen) {
1077 fprintf(screen,"Rendezvous balance and memory info:\n");
1078 fprintf(screen," input datum count "
1079 "(tot,ave,max,min): " BIGINT_FORMAT " %g "
1080 BIGINT_FORMAT " " BIGINT_FORMAT "\n",
1081 size_in_all/insize,1.0*size_in_all/nprocs/insize,
1082 size_in_max/insize,size_in_min/insize);
1083 fprintf(screen," input data (MB) "
1084 "(tot,ave,max,min): %g %g %g %g\n",
1085 1.0*size_in_all/mbytes,1.0*size_in_all/nprocs/mbytes,
1086 1.0*size_in_max/mbytes,1.0*size_in_min/mbytes);
1087 fprintf(screen," output datum count "
1088 "(tot,ave,max,min): " BIGINT_FORMAT " %g "
1089 BIGINT_FORMAT " " BIGINT_FORMAT "\n",
1090 size_out_all/outsize,1.0*size_out_all/nprocs/outsize,
1091 size_out_max/outsize,size_out_min/outsize);
1092 fprintf(screen," output data (MB) "
1093 "(tot,ave,max,min): %g %g %g %g\n",
1094 1.0*size_out_all/mbytes,1.0*size_out_all/nprocs/mbytes,
1095 1.0*size_out_max/mbytes,1.0*size_out_min/mbytes);
1096 fprintf(screen," input rvous datum count "
1097 "(tot,ave,max,min): " BIGINT_FORMAT " %g "
1098 BIGINT_FORMAT " " BIGINT_FORMAT "\n",
1099 size_inrvous_all/insize,1.0*size_inrvous_all/nprocs/insize,
1100 size_inrvous_max/insize,size_inrvous_min/insize);
1101 fprintf(screen," input rvous data (MB) "
1102 "(tot,ave,max,min): %g %g %g %g\n",
1103 1.0*size_inrvous_all/mbytes,1.0*size_inrvous_all/nprocs/mbytes,
1104 1.0*size_inrvous_max/mbytes,1.0*size_inrvous_min/mbytes);
1105 fprintf(screen," output rvous datum count "
1106 "(tot,ave,max,min): " BIGINT_FORMAT " %g "
1107 BIGINT_FORMAT " " BIGINT_FORMAT "\n",
1108 size_outrvous_all/outsize,1.0*size_outrvous_all/nprocs/outsize,
1109 size_outrvous_max/outsize,size_outrvous_min/outsize);
1110 fprintf(screen," output rvous data (MB) "
1111 "(tot,ave,max,min): %g %g %g %g\n",
1112 1.0*size_outrvous_all/mbytes,1.0*size_outrvous_all/nprocs/mbytes,
1113 1.0*size_outrvous_max/mbytes,1.0*size_outrvous_min/mbytes);
1114 }
1115 }
1116 }
1117