1 /* ----------------------------------------------------------------------
2    SPARTA - Stochastic PArallel Rarefied-gas Time-accurate Analyzer
3    http://sparta.sandia.gov
4    Steve Plimpton, sjplimp@sandia.gov, Michael Gallis, magalli@sandia.gov
5    Sandia National Laboratories
6 
7    Copyright (2014) Sandia Corporation.  Under the terms of Contract
8    DE-AC04-94AL85000 with Sandia Corporation, the U.S. Government retains
9    certain rights in this software.  This software is distributed under
10    the GNU General Public License.
11 
12    See the README file in the top-level SPARTA directory.
13 ------------------------------------------------------------------------- */
14 
15 #include "spatype.h"
16 #include "mpi.h"
17 #include "stdlib.h"
18 #include "string.h"
19 #include "irregular_kokkos.h"
20 #include "particle.h"
21 #include "domain.h"
22 #include "comm.h"
23 #include "memory_kokkos.h"
24 #include "error.h"
25 #include "kokkos.h"
26 
27 
28 
29 // DEBUG
30 #include "update.h"
31 #include "grid.h"
32 
33 using namespace SPARTA_NS;
34 
35 // allocate space for static class variable
36 // prototype for non-class function
37 
38 //int *IrregularKokkos::proc_recv_copy;
39 int compare_standalone(const void *, const void *);
40 
41 #define BUFFACTOR 1.5
42 #define BUFMIN 1000
43 #define BUFEXTRA 1000
44 
45 /* ---------------------------------------------------------------------- */
46 
IrregularKokkos(SPARTA * sparta)47 IrregularKokkos::IrregularKokkos(SPARTA *sparta) : Irregular(sparta)
48 {
49   k_n = DAT::tdual_int_scalar("comm:n");
50 }
51 
52 /* ---------------------------------------------------------------------- */
53 
~IrregularKokkos()54 IrregularKokkos::~IrregularKokkos()
55 {
56   if (copymode) return;
57 
58   memoryKK->destroy_kokkos(k_index_send,index_send);
59   index_send = NULL;
60 
61   memoryKK->destroy_kokkos(k_index_self,index_self);
62   index_self = NULL;
63 }
64 
65 /* ----------------------------------------------------------------------
66    create communication plan based on list of datums of uniform size
67    n = # of datums to send
68    proclist = proc to send each datum to, can include self
69    sort = flag for sorting order of received datums by proc ID
70    return total # of datums I will recv, including any to self
71 ------------------------------------------------------------------------- */
72 
create_data_uniform(int n,int * proclist,int sort)73 int IrregularKokkos::create_data_uniform(int n, int *proclist, int sort)
74 {
75   int i,m;
76 
77   // setup for collective comm
78   // work1 = # of datums I send to each proc, set self to 0
79   // work2 = 1 for all procs, used for ReduceScatter
80 
81   for (i = 0; i < nprocs; i++) {
82     work1[i] = 0;
83     work2[i] = 1;
84   }
85   for (i = 0; i < n; i++) work1[proclist[i]] = 1;
86   work1[me] = 0;
87 
88   // nrecv = # of procs I receive messages from, not including self
89   // options for performing ReduceScatter operation
90   // some are more efficient on some machines at big sizes
91 
92 #ifdef SPARTA_RS_ALLREDUCE_INPLACE
93   MPI_Allreduce(MPI_IN_PLACE,work1,nprocs,MPI_INT,MPI_SUM,world);
94   nrecv = work1[me];
95 #else
96 #ifdef SPARTA_RS_ALLREDUCE
97   MPI_Allreduce(work1,work2,nprocs,MPI_INT,MPI_SUM,world);
98   nrecv = work2[me];
99 #else
100   MPI_Reduce_scatter(work1,&nrecv,work2,MPI_INT,MPI_SUM,world);
101 #endif
102 #endif
103 
104   // work1 = # of datums I send to each proc, including self
105   // nsend = # of procs I send messages to, not including self
106 
107   for (i = 0; i < nprocs; i++) work1[i] = 0;
108   for (i = 0; i < n; i++) work1[proclist[i]]++;
109 
110   nsend = 0;
111   for (i = 0; i < nprocs; i++)
112     if (work1[i]) nsend++;
113   if (work1[me]) nsend--;
114 
115   // reallocate send and self index lists if necessary
116   // could use n-work1[me] for length of index_send to be more precise
117 
118   if (n > indexmax) {
119     indexmax = n;
120     memoryKK->destroy_kokkos(k_index_send,index_send);
121     memoryKK->create_kokkos(k_index_send,index_send,indexmax,"irregular:index_send");
122     d_index_send = k_index_send.d_view;
123   }
124 
125   if (work1[me] > indexselfmax) {
126     indexselfmax = work1[me];
127     memoryKK->destroy_kokkos(k_index_self,index_self);
128     memoryKK->create_kokkos(k_index_self,index_self,indexselfmax,"irregular:index_self");
129     d_index_self = k_index_self.d_view;
130   }
131 
132   // proc_send = procs I send to
133   // num_send = # of datums I send to each proc
134   // num_self = # of datums I copy to self
135   // to balance pattern of send messages:
136   //   each proc starts with iproc > me, continues until iproc = me
137   // reset work1 to store which send message each proc corresponds to
138 
139   int iproc = me;
140   int isend = 0;
141   for (i = 0; i < nprocs; i++) {
142     iproc++;
143     if (iproc == nprocs) iproc = 0;
144     if (iproc == me) {
145       num_self = work1[iproc];
146       work1[iproc] = 0;
147     } else if (work1[iproc]) {
148       proc_send[isend] = iproc;
149       num_send[isend] = work1[iproc];
150       work1[iproc] = isend;
151       isend++;
152     }
153   }
154 
155   // work2 = offsets into index_send for each proc I send to
156   // m = ptr into index_self
157   // index_send = list of which datums to send to each proc
158   //   1st N1 values are datum indices for 1st proc,
159   //   next N2 values are datum indices for 2nd proc, etc
160   // index_self = list of which datums to copy to self
161 
162   work2[0] = 0;
163   for (i = 1; i < nsend; i++) work2[i] = work2[i-1] + num_send[i-1];
164 
165   m = 0;
166   for (i = 0; i < n; i++) {
167     iproc = proclist[i];
168     if (iproc == me) index_self[m++] = i;
169     else {
170       isend = work1[iproc];
171       index_send[work2[isend]++] = i;
172     }
173   }
174 
175   // tell receivers how many datums I send them
176   // sendmax = largest # of datums I send in a single message
177 
178   sendmax = 0;
179   for (i = 0; i < nsend; i++) {
180     MPI_Send(&num_send[i],1,MPI_INT,proc_send[i],0,world);
181     sendmax = MAX(sendmax,num_send[i]);
182   }
183 
184   // receive incoming messages
185   // proc_recv = procs I recv from
186   // num_recv = # of datums each proc sends me
187   // nrecvdatum = total # of datums I recv
188 
189   nrecvdatum = 0;
190   for (i = 0; i < nrecv; i++) {
191     MPI_Recv(&num_recv[i],1,MPI_INT,MPI_ANY_SOURCE,0,world,status);
192     proc_recv[i] = status->MPI_SOURCE;
193     nrecvdatum += num_recv[i];
194   }
195   nrecvdatum += num_self;
196 
197   // sort proc_recv and num_recv by proc ID if requested
198   // useful for debugging to insure reproducible ordering of received datums
199 
200   if (sort) {
201     int *order = new int[nrecv];
202     int *proc_recv_ordered = new int[nrecv];
203     int *num_recv_ordered = new int[nrecv];
204 
205     for (i = 0; i < nrecv; i++) order[i] = i;
206     proc_recv_copy = proc_recv;
207     qsort(order,nrecv,sizeof(int),compare_standalone);
208 
209     int j;
210     for (i = 0; i < nrecv; i++) {
211       j = order[i];
212       proc_recv_ordered[i] = proc_recv[j];
213       num_recv_ordered[i] = num_recv[j];
214     }
215 
216     memcpy(proc_recv,proc_recv_ordered,nrecv*sizeof(int));
217     memcpy(num_recv,num_recv_ordered,nrecv*sizeof(int));
218     delete [] order;
219     delete [] proc_recv_ordered;
220     delete [] num_recv_ordered;
221   }
222 
223   // proc2recv[I] = which recv the Ith proc ID is
224   // will only be accessed by procs I actually receive from
225 
226   for (i = 0; i < nrecv; i++) proc2recv[proc_recv[i]] = i;
227 
228   // barrier to insure all MPI_ANY_SOURCE messages are received
229   // else another proc could proceed to exchange_data() and send to me
230 
231   MPI_Barrier(world);
232 
233   // return # of datums I will receive
234 
235   return nrecvdatum;
236 }
237 
238 /* ----------------------------------------------------------------------
239    augment communication plan with new datums of uniform size
240    called after create_procs() created initial plan
241    n = # of datums to send
242    proclist = proc to send each datum to, can include self
243    return total # of datums I will recv
244 ------------------------------------------------------------------------- */
245 
augment_data_uniform(int n,int * proclist)246 int IrregularKokkos::augment_data_uniform(int n, int *proclist)
247 {
248   int i,m,iproc,isend;
249 
250   // tally count of messages to each proc in num_send and num_self
251 
252   num_self = 0;
253   for (i = 0; i < nsend; i++) work2[proc_send[i]] = 0;
254   work2[me] = 0;
255   for (i = 0; i < n; i++) work2[proclist[i]]++;
256   for (i = 0; i < nsend; i++) num_send[i] = work2[proc_send[i]];
257   num_self = work2[me];
258 
259   // reallocate send and self index lists if necessary
260   // could use n-num_self for length of index_send to be more precise
261 
262   if (n > indexmax) {
263     indexmax = n;
264     memoryKK->destroy_kokkos(k_index_send,index_send);
265     memoryKK->create_kokkos(k_index_send,index_send,indexmax,"irregular:index_send");
266     d_index_send = k_index_send.d_view;
267   }
268 
269   if (num_self > indexselfmax) {
270     indexselfmax = num_self;
271     memoryKK->destroy_kokkos(k_index_self,index_self);
272     memoryKK->create_kokkos(k_index_self,index_self,indexselfmax,"irregular:index_self");
273     d_index_self = k_index_self.d_view;
274   }
275 
276   // work2 = offsets into index_send for each proc I send to
277   // m = ptr into index_self
278   // index_send = list of which datums to send to each proc
279   //   1st N1 values are datum indices for 1st proc,
280   //   next N2 values are datum indices for 2nd proc, etc
281   // index_self = list of which datums to copy to self
282 
283   work2[0] = 0;
284   for (i = 1; i < nsend; i++) work2[i] = work2[i-1] + num_send[i-1];
285 
286   if (num_self) {
287     m = 0;
288     for (i = 0; i < n; i++) {
289       iproc = proclist[i];
290       if (iproc == me) index_self[m++] = i;
291       else {
292         isend = work1[iproc];
293         index_send[work2[isend]++] = i;
294       }
295     }
296   } else {
297     for (i = 0; i < n; i++) {
298       isend = work1[proclist[i]];
299       index_send[work2[isend]++] = i;
300     }
301   }
302 
303   // tell receivers how many datums I send them
304   // sendmax = largest # of datums I send in a single message
305 
306   sendmax = 0;
307   for (i = 0; i < nsend; i++) {
308     MPI_Send(&num_send[i],1,MPI_INT,proc_send[i],0,world);
309     sendmax = MAX(sendmax,num_send[i]);
310   }
311 
312   // receive incoming messages
313   // num_recv = # of datums each proc sends me
314   // nrecvdatum = total # of datums I recv
315 
316   nrecvdatum = 0;
317   for (i = 0; i < nrecv; i++) {
318     MPI_Recv(&m,1,MPI_INT,MPI_ANY_SOURCE,0,world,status);
319     iproc = status->MPI_SOURCE;
320     num_recv[proc2recv[iproc]] = m;
321     nrecvdatum += m;
322   }
323   nrecvdatum += num_self;
324 
325   // barrier to insure all MPI_ANY_SOURCE messages are received
326   // else another proc could proceed to exchange_data() and send to me
327 
328   MPI_Barrier(world);
329 
330   // return # of datums I will receive
331 
332   return nrecvdatum;
333 }
334 
335 /* ----------------------------------------------------------------------
336    communicate uniform-size datums via existing plan
337    sendbuf = list of datums to send
338    nbytes = size of each datum
339    recvbuf = received datums, including copied from me
340 ------------------------------------------------------------------------- */
341 
exchange_uniform(DAT::t_char_1d d_sendbuf_in,int nbytes_in,char * d_recvbuf_ptr_in,DAT::t_char_1d d_recvbuf_in)342 void IrregularKokkos::exchange_uniform(DAT::t_char_1d d_sendbuf_in, int nbytes_in,
343                                        char* d_recvbuf_ptr_in, DAT::t_char_1d d_recvbuf_in)
344 {
345   int offset,count;
346 
347   nbytes = nbytes_in;
348   d_sendbuf = d_sendbuf_in;
349   d_recvbuf_ptr = d_recvbuf_ptr_in;
350   d_recvbuf = d_recvbuf_in;
351 
352   if (!sparta->kokkos->gpu_direct_flag &&
353       h_recvbuf.extent(0) != d_recvbuf.extent(0)) {
354     h_recvbuf = HAT::t_char_1d(Kokkos::view_alloc("irregular:d_recvbuf:mirror",Kokkos::WithoutInitializing),d_recvbuf.extent(0));
355   }
356 
357   // post all receives, starting after self copies
358 
359   offset = num_self*nbytes;
360   for (int irecv = 0; irecv < nrecv; irecv++) {
361     if (sparta->kokkos->gpu_direct_flag) {
362       MPI_Irecv(&d_recvbuf_ptr[offset],num_recv[irecv]*nbytes,MPI_CHAR,
363                 proc_recv[irecv],0,world,&request[irecv]);
364     } else {
365       MPI_Irecv(h_recvbuf.data() + offset,num_recv[irecv]*nbytes,MPI_CHAR,
366                 proc_recv[irecv],0,world,&request[irecv]);
367     }
368     offset += num_recv[irecv]*nbytes;
369   }
370 
371   // reallocate buf for largest send if necessary
372 
373   if (sendmax*nbytes > bufmax) {
374     bufmax = sendmax*nbytes;
375     d_buf = DAT::t_char_1d("Irregular:buf",bufmax);
376   } else if (d_buf.extent(0) < bufmax) {
377     d_buf = DAT::t_char_1d("Irregular:buf",bufmax);
378   }
379 
380   // send each message
381   // pack buf with list of datums
382   // m = index of datum in sendbuf
383 
384   k_index_send.modify_host();
385   k_index_send.sync_device();
386 
387   k_index_self.modify_host();
388   k_index_self.sync_device();
389 
390   k_n.h_view() = 0;
391   k_n.modify_host();
392   k_n.sync_device();
393   d_n = k_n.d_view;
394 
395   for (int isend = 0; isend < nsend; isend++) {
396     count = num_send[isend];
397 
398     if (!sparta->kokkos->gpu_direct_flag) {
399 
400       // allocate exact buffer size to reduce GPU <--> CPU memory transfer
401 
402       d_buf = DAT::t_char_1d(Kokkos::view_alloc("irregular:buf",Kokkos::WithoutInitializing),count*nbytes);
403       h_buf = HAT::t_char_1d(Kokkos::view_alloc("irregular:buf:mirror",Kokkos::WithoutInitializing),count*nbytes);
404     }
405 
406     copymode = 1;
407     if (sparta->kokkos->need_atomics)
408       Kokkos::parallel_for(Kokkos::RangePolicy<DeviceType, TagIrregularPackBuffer<1> >(0,count),*this);
409     else
410       Kokkos::parallel_for(Kokkos::RangePolicy<DeviceType, TagIrregularPackBuffer<0> >(0,count),*this);
411     DeviceType().fence();
412     //pack_buffer_serial(0,count);
413     copymode = 0;
414 
415     if (sparta->kokkos->gpu_direct_flag)
416       MPI_Send(d_buf.data(),count*nbytes,MPI_CHAR,proc_send[isend],0,world);
417     else {
418       Kokkos::deep_copy(h_buf,d_buf);
419       MPI_Send(h_buf.data(),count*nbytes,MPI_CHAR,proc_send[isend],0,world);
420     }
421   }
422 
423   // copy datums to self, put at beginning of recvbuf
424 
425   copymode = 1;
426   Kokkos::parallel_for(Kokkos::RangePolicy<DeviceType, TagIrregularUnpackBuffer>(0,num_self),*this);
427   DeviceType().fence();
428   copymode = 0;
429 
430   // wait on all incoming messages
431 
432   if (nrecv) {
433     MPI_Waitall(nrecv,request,status);
434 
435     if (!sparta->kokkos->gpu_direct_flag)
436       Kokkos::deep_copy(d_recvbuf,h_recvbuf);
437   }
438 }
439 
440 template<int NEED_ATOMICS>
441 KOKKOS_INLINE_FUNCTION
operator ()(TagIrregularPackBuffer<NEED_ATOMICS>,const int & i) const442 void IrregularKokkos::operator()(TagIrregularPackBuffer<NEED_ATOMICS>, const int &i) const {
443   int n;
444   if (NEED_ATOMICS)
445     n = Kokkos::atomic_fetch_add(&d_n(),1);
446   else {
447     n = d_n();
448     d_n()++;
449   }
450   const int m = d_index_send[n];
451   memcpy(&d_buf[i*nbytes],&d_sendbuf[m*nbytes],nbytes);
452 }
453 
454 KOKKOS_INLINE_FUNCTION
operator ()(TagIrregularUnpackBuffer,const int & i) const455 void IrregularKokkos::operator()(TagIrregularUnpackBuffer, const int &i) const {
456   const int m = d_index_self[i];
457   memcpy(&d_recvbuf[i*nbytes],&d_sendbuf[m*nbytes],nbytes);
458 }
459 
460 inline
pack_buffer_serial(const int start,const int end) const461 void IrregularKokkos::pack_buffer_serial(const int start, const int end) const {
462   int n = 0;
463   for (int i = start; i < end; i++) {
464     const int m = d_index_send[n++];
465     memcpy(&d_buf[i*nbytes],&d_sendbuf[m*nbytes],nbytes);
466   }
467 }
468