1 #include <Kripke/ParallelComm.h>
2 
3 #include <Kripke/Grid.h>
4 #include <Kripke/Subdomain.h>
5 #include <Kripke/SubTVec.h>
6 
7 static int outgoingRequests = 0;
8 static int incomingRequests = 0;
9 
ParallelComm(Grid_Data * grid_data_ptr)10 ParallelComm::ParallelComm(Grid_Data *grid_data_ptr) :
11   grid_data(grid_data_ptr)
12 {
13 }
14 
~ParallelComm()15 ParallelComm::~ParallelComm(){
16 
17 }
18 
computeTag(int mpi_rank,int sdom_id)19 int ParallelComm::computeTag(int mpi_rank, int sdom_id){
20   int mpi_size;
21   MPI_Comm_size(MPI_COMM_WORLD, &mpi_size);
22 
23   int tag = mpi_rank + mpi_size*sdom_id;
24 
25   return tag;
26 }
27 
computeRankSdom(int tag,int & mpi_rank,int & sdom_id)28 void ParallelComm::computeRankSdom(int tag, int &mpi_rank, int &sdom_id){
29   int mpi_size;
30   MPI_Comm_size(MPI_COMM_WORLD, &mpi_size);
31 
32   mpi_rank = tag % mpi_size;
33   sdom_id = tag / mpi_size;
34 }
35 
36 /**
37   Finds subdomain in the queue by its subdomain id.
38 */
findSubdomain(int sdom_id)39 int ParallelComm::findSubdomain(int sdom_id){
40 
41   // find subdomain in queue
42   int index;
43   for(index = 0;index < queue_sdom_ids.size();++ index){
44     if(queue_sdom_ids[index] == sdom_id){
45       break;
46     }
47   }
48   if(index == queue_sdom_ids.size()){
49     printf("Cannot find subdomain id %d in work queue\n", sdom_id);
50     MPI_Abort(MPI_COMM_WORLD, 1);
51   }
52 
53   return index;
54 }
55 
56 
dequeueSubdomain(int sdom_id)57 Subdomain *ParallelComm::dequeueSubdomain(int sdom_id){
58   int index = findSubdomain(sdom_id);
59 
60   // Get subdomain pointer before removing it from queue
61   Subdomain *sdom = queue_subdomains[index];
62 
63   // remove subdomain from queue
64   queue_sdom_ids.erase(queue_sdom_ids.begin()+index);
65   queue_subdomains.erase(queue_subdomains.begin()+index);
66   queue_depends.erase(queue_depends.begin()+index);
67 
68   return sdom;
69 }
70 
71 /**
72   Adds a subdomain to the work queue.
73   Determines if upwind dependencies require communication, and posts appropirate Irecv's.
74   All recieves use the plane_data[] arrays as recieve buffers.
75 */
postRecvs(int sdom_id,Subdomain & sdom)76 void ParallelComm::postRecvs(int sdom_id, Subdomain &sdom){
77   int mpi_rank, mpi_size;
78   MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank);
79   MPI_Comm_size(MPI_COMM_WORLD, &mpi_size);
80 
81   // go thru each dimensions upwind neighbors, and add the dependencies
82   int num_depends = 0;
83   for(int dim = 0;dim < 3;++ dim){
84     // If it's a boundary condition, skip it
85     if(sdom.upwind[dim].mpi_rank < 0){
86       continue;
87     }
88 
89     // If it's an on-rank communication (from another subdomain)
90     if(sdom.upwind[dim].mpi_rank == mpi_rank){
91       // skip it, but track the dependency
92       num_depends ++;
93       continue;
94     }
95 
96     // Add request to pending list
97     recv_requests.push_back(MPI_Request());
98     recv_subdomains.push_back(sdom_id);
99     incomingRequests++;
100     // compute the tag id of THIS subdomain (tags are always based on destination)
101     int tag = computeTag(sdom.upwind[dim].mpi_rank, sdom.upwind[dim].subdomain_id);
102 
103     // Post the recieve
104     MPI_Irecv(sdom.plane_data[dim]->ptr(), sdom.plane_data[dim]->elements, MPI_DOUBLE, sdom.upwind[dim].mpi_rank,
105       tag, MPI_COMM_WORLD, &recv_requests[recv_requests.size()-1]);
106 
107     // increment number of dependencies
108     num_depends ++;
109   }
110 
111   // add subdomain to queue
112   queue_sdom_ids.push_back(sdom_id);
113   queue_subdomains.push_back(&sdom);
114   queue_depends.push_back(num_depends);
115 }
116 
postSends(Subdomain * sdom,double * src_buffers[3])117 void ParallelComm::postSends(Subdomain *sdom, double *src_buffers[3]){
118   // post sends for downwind dependencies
119   int mpi_rank, mpi_size;
120   MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank);
121   MPI_Comm_size(MPI_COMM_WORLD, &mpi_size);
122   for(int dim = 0;dim < 3;++ dim){
123     // If it's a boundary condition, skip it
124     if(sdom->downwind[dim].mpi_rank < 0){
125       continue;
126     }
127 
128     // If it's an on-rank communication (to another subdomain)
129     if(sdom->downwind[dim].mpi_rank == mpi_rank){
130       // find the local subdomain in the queue, and decrement the counter
131       for(int i = 0;i < queue_sdom_ids.size();++ i){
132         if(queue_sdom_ids[i] == sdom->downwind[dim].subdomain_id){
133           queue_depends[i] --;
134           break;
135         }
136       }
137 
138       // copy the boundary condition data into the downwinds plane data
139       Subdomain &sdom_downwind = grid_data->subdomains[sdom->downwind[dim].subdomain_id];
140       sdom_downwind.plane_data[dim]->copy(*sdom->plane_data[dim]);
141       int num_elem = sdom_downwind.plane_data[dim]->elements;
142       //double const * KRESTRICT src_ptr = sdom->plane_data[dim]->ptr();
143       double * KRESTRICT dst_ptr = sdom_downwind.plane_data[dim]->ptr();
144       for(int i = 0;i < num_elem;++ i){
145         dst_ptr[i] = src_buffers[dim][i];
146       }
147       continue;
148     }
149 
150     // At this point, we know that we have to send an MPI message
151     // Add request to send queue
152     send_requests.push_back(MPI_Request());
153     outgoingRequests++;
154     // compute the tag id of TARGET subdomain (tags are always based on destination)
155     int tag = computeTag(mpi_rank, sdom->downwind[dim].subdomain_id);
156 
157     // Post the send
158     MPI_Isend(src_buffers[dim], sdom->plane_data[dim]->elements, MPI_DOUBLE, sdom->downwind[dim].mpi_rank,
159       tag, MPI_COMM_WORLD, &send_requests[send_requests.size()-1]);
160   }
161 }
162 
163 
164 // Checks if there are any outstanding subdomains to complete
workRemaining(void)165 bool ParallelComm::workRemaining(void){
166   return (recv_requests.size() > 0 || queue_subdomains.size() > 0);
167 }
168 
169 
170 // Blocks until all sends have completed, and flushes the send queues
waitAllSends(void)171 void ParallelComm::waitAllSends(void){
172   // Wait for all remaining sends to complete, then return false
173   int num_sends = send_requests.size();
174   if(num_sends > 0){
175     std::vector<MPI_Status> status(num_sends);
176     MPI_Waitall(num_sends, &send_requests[0], &status[0]);
177     send_requests.clear();
178   }
179 }
180 
181 /**
182   Checks for incomming messages, and does relevant bookkeeping.
183 */
testRecieves(void)184 void ParallelComm::testRecieves(void){
185 
186   // Check for any recv requests that have completed
187   int num_requests = recv_requests.size();
188   bool done = false;
189   while(!done && num_requests > 0){
190     // Create array of status variables
191     std::vector<MPI_Status> recv_status(num_requests);
192 
193     // Ask if either one or none of the recvs have completed?
194     int index; // this will be the index of request that completed
195     int complete_flag; // this is set to TRUE if somthing completed
196     MPI_Testany(num_requests, &recv_requests[0], &index, &complete_flag, &recv_status[0]);
197 
198     if(complete_flag != 0){
199 
200       // get subdomain that this completed for
201       int sdom_id = recv_subdomains[index];
202 
203       // remove the request from the list
204       recv_requests.erase(recv_requests.begin()+index);
205       recv_subdomains.erase(recv_subdomains.begin()+index);
206       num_requests --;
207 
208       // decrement the dependency count for that subdomain
209       for(int i = 0;i < queue_sdom_ids.size();++ i){
210         if(queue_sdom_ids[i] == sdom_id){
211           queue_depends[i] --;
212           break;
213         }
214       }
215     }
216     else{
217       done = true;
218     }
219   }
220 }
221 
222 
getReadyList(void)223 std::vector<int> ParallelComm::getReadyList(void){
224   // build up a list of ready subdomains
225   std::vector<int> ready;
226   for(int i = 0;i < queue_depends.size();++ i){
227     if(queue_depends[i] == 0){
228       ready.push_back(queue_sdom_ids[i]);
229     }
230   }
231   return ready;
232 }
233 
234 
getIncomingRequests()235 int ParallelComm::getIncomingRequests()
236 {
237   return incomingRequests;
238 }
239 
getOutgoingRequests()240 int ParallelComm::getOutgoingRequests()
241 {
242   return incomingRequests;
243 }
244 
resetRequests()245 void ParallelComm::resetRequests()
246 {
247   incomingRequests = 0;
248   outgoingRequests = 0;
249 }
250