1 /* ----------------------------------------------------------------------
2 SPARTA - Stochastic PArallel Rarefied-gas Time-accurate Analyzer
3 http://sparta.sandia.gov
4 Steve Plimpton, sjplimp@sandia.gov, Michael Gallis, magalli@sandia.gov
5 Sandia National Laboratories
6
7 Copyright (2014) Sandia Corporation. Under the terms of Contract
8 DE-AC04-94AL85000 with Sandia Corporation, the U.S. Government retains
9 certain rights in this software. This software is distributed under
10 the GNU General Public License.
11
12 See the README file in the top-level SPARTA directory.
13 ------------------------------------------------------------------------- */
14
15 #include "spatype.h"
16 #include "mpi.h"
17 #include "stdlib.h"
18 #include "string.h"
19 #include "irregular_kokkos.h"
20 #include "particle.h"
21 #include "domain.h"
22 #include "comm.h"
23 #include "memory_kokkos.h"
24 #include "error.h"
25 #include "kokkos.h"
26
27
28
29 // DEBUG
30 #include "update.h"
31 #include "grid.h"
32
33 using namespace SPARTA_NS;
34
35 // allocate space for static class variable
36 // prototype for non-class function
37
38 //int *IrregularKokkos::proc_recv_copy;
39 int compare_standalone(const void *, const void *);
40
41 #define BUFFACTOR 1.5
42 #define BUFMIN 1000
43 #define BUFEXTRA 1000
44
45 /* ---------------------------------------------------------------------- */
46
IrregularKokkos(SPARTA * sparta)47 IrregularKokkos::IrregularKokkos(SPARTA *sparta) : Irregular(sparta)
48 {
49 k_n = DAT::tdual_int_scalar("comm:n");
50 }
51
52 /* ---------------------------------------------------------------------- */
53
~IrregularKokkos()54 IrregularKokkos::~IrregularKokkos()
55 {
56 if (copymode) return;
57
58 memoryKK->destroy_kokkos(k_index_send,index_send);
59 index_send = NULL;
60
61 memoryKK->destroy_kokkos(k_index_self,index_self);
62 index_self = NULL;
63 }
64
65 /* ----------------------------------------------------------------------
66 create communication plan based on list of datums of uniform size
67 n = # of datums to send
68 proclist = proc to send each datum to, can include self
69 sort = flag for sorting order of received datums by proc ID
70 return total # of datums I will recv, including any to self
71 ------------------------------------------------------------------------- */
72
create_data_uniform(int n,int * proclist,int sort)73 int IrregularKokkos::create_data_uniform(int n, int *proclist, int sort)
74 {
75 int i,m;
76
77 // setup for collective comm
78 // work1 = # of datums I send to each proc, set self to 0
79 // work2 = 1 for all procs, used for ReduceScatter
80
81 for (i = 0; i < nprocs; i++) {
82 work1[i] = 0;
83 work2[i] = 1;
84 }
85 for (i = 0; i < n; i++) work1[proclist[i]] = 1;
86 work1[me] = 0;
87
88 // nrecv = # of procs I receive messages from, not including self
89 // options for performing ReduceScatter operation
90 // some are more efficient on some machines at big sizes
91
92 #ifdef SPARTA_RS_ALLREDUCE_INPLACE
93 MPI_Allreduce(MPI_IN_PLACE,work1,nprocs,MPI_INT,MPI_SUM,world);
94 nrecv = work1[me];
95 #else
96 #ifdef SPARTA_RS_ALLREDUCE
97 MPI_Allreduce(work1,work2,nprocs,MPI_INT,MPI_SUM,world);
98 nrecv = work2[me];
99 #else
100 MPI_Reduce_scatter(work1,&nrecv,work2,MPI_INT,MPI_SUM,world);
101 #endif
102 #endif
103
104 // work1 = # of datums I send to each proc, including self
105 // nsend = # of procs I send messages to, not including self
106
107 for (i = 0; i < nprocs; i++) work1[i] = 0;
108 for (i = 0; i < n; i++) work1[proclist[i]]++;
109
110 nsend = 0;
111 for (i = 0; i < nprocs; i++)
112 if (work1[i]) nsend++;
113 if (work1[me]) nsend--;
114
115 // reallocate send and self index lists if necessary
116 // could use n-work1[me] for length of index_send to be more precise
117
118 if (n > indexmax) {
119 indexmax = n;
120 memoryKK->destroy_kokkos(k_index_send,index_send);
121 memoryKK->create_kokkos(k_index_send,index_send,indexmax,"irregular:index_send");
122 d_index_send = k_index_send.d_view;
123 }
124
125 if (work1[me] > indexselfmax) {
126 indexselfmax = work1[me];
127 memoryKK->destroy_kokkos(k_index_self,index_self);
128 memoryKK->create_kokkos(k_index_self,index_self,indexselfmax,"irregular:index_self");
129 d_index_self = k_index_self.d_view;
130 }
131
132 // proc_send = procs I send to
133 // num_send = # of datums I send to each proc
134 // num_self = # of datums I copy to self
135 // to balance pattern of send messages:
136 // each proc starts with iproc > me, continues until iproc = me
137 // reset work1 to store which send message each proc corresponds to
138
139 int iproc = me;
140 int isend = 0;
141 for (i = 0; i < nprocs; i++) {
142 iproc++;
143 if (iproc == nprocs) iproc = 0;
144 if (iproc == me) {
145 num_self = work1[iproc];
146 work1[iproc] = 0;
147 } else if (work1[iproc]) {
148 proc_send[isend] = iproc;
149 num_send[isend] = work1[iproc];
150 work1[iproc] = isend;
151 isend++;
152 }
153 }
154
155 // work2 = offsets into index_send for each proc I send to
156 // m = ptr into index_self
157 // index_send = list of which datums to send to each proc
158 // 1st N1 values are datum indices for 1st proc,
159 // next N2 values are datum indices for 2nd proc, etc
160 // index_self = list of which datums to copy to self
161
162 work2[0] = 0;
163 for (i = 1; i < nsend; i++) work2[i] = work2[i-1] + num_send[i-1];
164
165 m = 0;
166 for (i = 0; i < n; i++) {
167 iproc = proclist[i];
168 if (iproc == me) index_self[m++] = i;
169 else {
170 isend = work1[iproc];
171 index_send[work2[isend]++] = i;
172 }
173 }
174
175 // tell receivers how many datums I send them
176 // sendmax = largest # of datums I send in a single message
177
178 sendmax = 0;
179 for (i = 0; i < nsend; i++) {
180 MPI_Send(&num_send[i],1,MPI_INT,proc_send[i],0,world);
181 sendmax = MAX(sendmax,num_send[i]);
182 }
183
184 // receive incoming messages
185 // proc_recv = procs I recv from
186 // num_recv = # of datums each proc sends me
187 // nrecvdatum = total # of datums I recv
188
189 nrecvdatum = 0;
190 for (i = 0; i < nrecv; i++) {
191 MPI_Recv(&num_recv[i],1,MPI_INT,MPI_ANY_SOURCE,0,world,status);
192 proc_recv[i] = status->MPI_SOURCE;
193 nrecvdatum += num_recv[i];
194 }
195 nrecvdatum += num_self;
196
197 // sort proc_recv and num_recv by proc ID if requested
198 // useful for debugging to insure reproducible ordering of received datums
199
200 if (sort) {
201 int *order = new int[nrecv];
202 int *proc_recv_ordered = new int[nrecv];
203 int *num_recv_ordered = new int[nrecv];
204
205 for (i = 0; i < nrecv; i++) order[i] = i;
206 proc_recv_copy = proc_recv;
207 qsort(order,nrecv,sizeof(int),compare_standalone);
208
209 int j;
210 for (i = 0; i < nrecv; i++) {
211 j = order[i];
212 proc_recv_ordered[i] = proc_recv[j];
213 num_recv_ordered[i] = num_recv[j];
214 }
215
216 memcpy(proc_recv,proc_recv_ordered,nrecv*sizeof(int));
217 memcpy(num_recv,num_recv_ordered,nrecv*sizeof(int));
218 delete [] order;
219 delete [] proc_recv_ordered;
220 delete [] num_recv_ordered;
221 }
222
223 // proc2recv[I] = which recv the Ith proc ID is
224 // will only be accessed by procs I actually receive from
225
226 for (i = 0; i < nrecv; i++) proc2recv[proc_recv[i]] = i;
227
228 // barrier to insure all MPI_ANY_SOURCE messages are received
229 // else another proc could proceed to exchange_data() and send to me
230
231 MPI_Barrier(world);
232
233 // return # of datums I will receive
234
235 return nrecvdatum;
236 }
237
238 /* ----------------------------------------------------------------------
239 augment communication plan with new datums of uniform size
240 called after create_procs() created initial plan
241 n = # of datums to send
242 proclist = proc to send each datum to, can include self
243 return total # of datums I will recv
244 ------------------------------------------------------------------------- */
245
augment_data_uniform(int n,int * proclist)246 int IrregularKokkos::augment_data_uniform(int n, int *proclist)
247 {
248 int i,m,iproc,isend;
249
250 // tally count of messages to each proc in num_send and num_self
251
252 num_self = 0;
253 for (i = 0; i < nsend; i++) work2[proc_send[i]] = 0;
254 work2[me] = 0;
255 for (i = 0; i < n; i++) work2[proclist[i]]++;
256 for (i = 0; i < nsend; i++) num_send[i] = work2[proc_send[i]];
257 num_self = work2[me];
258
259 // reallocate send and self index lists if necessary
260 // could use n-num_self for length of index_send to be more precise
261
262 if (n > indexmax) {
263 indexmax = n;
264 memoryKK->destroy_kokkos(k_index_send,index_send);
265 memoryKK->create_kokkos(k_index_send,index_send,indexmax,"irregular:index_send");
266 d_index_send = k_index_send.d_view;
267 }
268
269 if (num_self > indexselfmax) {
270 indexselfmax = num_self;
271 memoryKK->destroy_kokkos(k_index_self,index_self);
272 memoryKK->create_kokkos(k_index_self,index_self,indexselfmax,"irregular:index_self");
273 d_index_self = k_index_self.d_view;
274 }
275
276 // work2 = offsets into index_send for each proc I send to
277 // m = ptr into index_self
278 // index_send = list of which datums to send to each proc
279 // 1st N1 values are datum indices for 1st proc,
280 // next N2 values are datum indices for 2nd proc, etc
281 // index_self = list of which datums to copy to self
282
283 work2[0] = 0;
284 for (i = 1; i < nsend; i++) work2[i] = work2[i-1] + num_send[i-1];
285
286 if (num_self) {
287 m = 0;
288 for (i = 0; i < n; i++) {
289 iproc = proclist[i];
290 if (iproc == me) index_self[m++] = i;
291 else {
292 isend = work1[iproc];
293 index_send[work2[isend]++] = i;
294 }
295 }
296 } else {
297 for (i = 0; i < n; i++) {
298 isend = work1[proclist[i]];
299 index_send[work2[isend]++] = i;
300 }
301 }
302
303 // tell receivers how many datums I send them
304 // sendmax = largest # of datums I send in a single message
305
306 sendmax = 0;
307 for (i = 0; i < nsend; i++) {
308 MPI_Send(&num_send[i],1,MPI_INT,proc_send[i],0,world);
309 sendmax = MAX(sendmax,num_send[i]);
310 }
311
312 // receive incoming messages
313 // num_recv = # of datums each proc sends me
314 // nrecvdatum = total # of datums I recv
315
316 nrecvdatum = 0;
317 for (i = 0; i < nrecv; i++) {
318 MPI_Recv(&m,1,MPI_INT,MPI_ANY_SOURCE,0,world,status);
319 iproc = status->MPI_SOURCE;
320 num_recv[proc2recv[iproc]] = m;
321 nrecvdatum += m;
322 }
323 nrecvdatum += num_self;
324
325 // barrier to insure all MPI_ANY_SOURCE messages are received
326 // else another proc could proceed to exchange_data() and send to me
327
328 MPI_Barrier(world);
329
330 // return # of datums I will receive
331
332 return nrecvdatum;
333 }
334
335 /* ----------------------------------------------------------------------
336 communicate uniform-size datums via existing plan
337 sendbuf = list of datums to send
338 nbytes = size of each datum
339 recvbuf = received datums, including copied from me
340 ------------------------------------------------------------------------- */
341
exchange_uniform(DAT::t_char_1d d_sendbuf_in,int nbytes_in,char * d_recvbuf_ptr_in,DAT::t_char_1d d_recvbuf_in)342 void IrregularKokkos::exchange_uniform(DAT::t_char_1d d_sendbuf_in, int nbytes_in,
343 char* d_recvbuf_ptr_in, DAT::t_char_1d d_recvbuf_in)
344 {
345 int offset,count;
346
347 nbytes = nbytes_in;
348 d_sendbuf = d_sendbuf_in;
349 d_recvbuf_ptr = d_recvbuf_ptr_in;
350 d_recvbuf = d_recvbuf_in;
351
352 if (!sparta->kokkos->gpu_direct_flag &&
353 h_recvbuf.extent(0) != d_recvbuf.extent(0)) {
354 h_recvbuf = HAT::t_char_1d(Kokkos::view_alloc("irregular:d_recvbuf:mirror",Kokkos::WithoutInitializing),d_recvbuf.extent(0));
355 }
356
357 // post all receives, starting after self copies
358
359 offset = num_self*nbytes;
360 for (int irecv = 0; irecv < nrecv; irecv++) {
361 if (sparta->kokkos->gpu_direct_flag) {
362 MPI_Irecv(&d_recvbuf_ptr[offset],num_recv[irecv]*nbytes,MPI_CHAR,
363 proc_recv[irecv],0,world,&request[irecv]);
364 } else {
365 MPI_Irecv(h_recvbuf.data() + offset,num_recv[irecv]*nbytes,MPI_CHAR,
366 proc_recv[irecv],0,world,&request[irecv]);
367 }
368 offset += num_recv[irecv]*nbytes;
369 }
370
371 // reallocate buf for largest send if necessary
372
373 if (sendmax*nbytes > bufmax) {
374 bufmax = sendmax*nbytes;
375 d_buf = DAT::t_char_1d("Irregular:buf",bufmax);
376 } else if (d_buf.extent(0) < bufmax) {
377 d_buf = DAT::t_char_1d("Irregular:buf",bufmax);
378 }
379
380 // send each message
381 // pack buf with list of datums
382 // m = index of datum in sendbuf
383
384 k_index_send.modify_host();
385 k_index_send.sync_device();
386
387 k_index_self.modify_host();
388 k_index_self.sync_device();
389
390 k_n.h_view() = 0;
391 k_n.modify_host();
392 k_n.sync_device();
393 d_n = k_n.d_view;
394
395 for (int isend = 0; isend < nsend; isend++) {
396 count = num_send[isend];
397
398 if (!sparta->kokkos->gpu_direct_flag) {
399
400 // allocate exact buffer size to reduce GPU <--> CPU memory transfer
401
402 d_buf = DAT::t_char_1d(Kokkos::view_alloc("irregular:buf",Kokkos::WithoutInitializing),count*nbytes);
403 h_buf = HAT::t_char_1d(Kokkos::view_alloc("irregular:buf:mirror",Kokkos::WithoutInitializing),count*nbytes);
404 }
405
406 copymode = 1;
407 if (sparta->kokkos->need_atomics)
408 Kokkos::parallel_for(Kokkos::RangePolicy<DeviceType, TagIrregularPackBuffer<1> >(0,count),*this);
409 else
410 Kokkos::parallel_for(Kokkos::RangePolicy<DeviceType, TagIrregularPackBuffer<0> >(0,count),*this);
411 DeviceType().fence();
412 //pack_buffer_serial(0,count);
413 copymode = 0;
414
415 if (sparta->kokkos->gpu_direct_flag)
416 MPI_Send(d_buf.data(),count*nbytes,MPI_CHAR,proc_send[isend],0,world);
417 else {
418 Kokkos::deep_copy(h_buf,d_buf);
419 MPI_Send(h_buf.data(),count*nbytes,MPI_CHAR,proc_send[isend],0,world);
420 }
421 }
422
423 // copy datums to self, put at beginning of recvbuf
424
425 copymode = 1;
426 Kokkos::parallel_for(Kokkos::RangePolicy<DeviceType, TagIrregularUnpackBuffer>(0,num_self),*this);
427 DeviceType().fence();
428 copymode = 0;
429
430 // wait on all incoming messages
431
432 if (nrecv) {
433 MPI_Waitall(nrecv,request,status);
434
435 if (!sparta->kokkos->gpu_direct_flag)
436 Kokkos::deep_copy(d_recvbuf,h_recvbuf);
437 }
438 }
439
440 template<int NEED_ATOMICS>
441 KOKKOS_INLINE_FUNCTION
operator ()(TagIrregularPackBuffer<NEED_ATOMICS>,const int & i) const442 void IrregularKokkos::operator()(TagIrregularPackBuffer<NEED_ATOMICS>, const int &i) const {
443 int n;
444 if (NEED_ATOMICS)
445 n = Kokkos::atomic_fetch_add(&d_n(),1);
446 else {
447 n = d_n();
448 d_n()++;
449 }
450 const int m = d_index_send[n];
451 memcpy(&d_buf[i*nbytes],&d_sendbuf[m*nbytes],nbytes);
452 }
453
454 KOKKOS_INLINE_FUNCTION
operator ()(TagIrregularUnpackBuffer,const int & i) const455 void IrregularKokkos::operator()(TagIrregularUnpackBuffer, const int &i) const {
456 const int m = d_index_self[i];
457 memcpy(&d_recvbuf[i*nbytes],&d_sendbuf[m*nbytes],nbytes);
458 }
459
460 inline
pack_buffer_serial(const int start,const int end) const461 void IrregularKokkos::pack_buffer_serial(const int start, const int end) const {
462 int n = 0;
463 for (int i = start; i < end; i++) {
464 const int m = d_index_send[n++];
465 memcpy(&d_buf[i*nbytes],&d_sendbuf[m*nbytes],nbytes);
466 }
467 }
468