1 #include <Kripke/ParallelComm.h>
2
3 #include <Kripke/Grid.h>
4 #include <Kripke/Subdomain.h>
5 #include <Kripke/SubTVec.h>
6
7 static int outgoingRequests = 0;
8 static int incomingRequests = 0;
9
ParallelComm(Grid_Data * grid_data_ptr)10 ParallelComm::ParallelComm(Grid_Data *grid_data_ptr) :
11 grid_data(grid_data_ptr)
12 {
13 }
14
~ParallelComm()15 ParallelComm::~ParallelComm(){
16
17 }
18
computeTag(int mpi_rank,int sdom_id)19 int ParallelComm::computeTag(int mpi_rank, int sdom_id){
20 int mpi_size;
21 MPI_Comm_size(MPI_COMM_WORLD, &mpi_size);
22
23 int tag = mpi_rank + mpi_size*sdom_id;
24
25 return tag;
26 }
27
computeRankSdom(int tag,int & mpi_rank,int & sdom_id)28 void ParallelComm::computeRankSdom(int tag, int &mpi_rank, int &sdom_id){
29 int mpi_size;
30 MPI_Comm_size(MPI_COMM_WORLD, &mpi_size);
31
32 mpi_rank = tag % mpi_size;
33 sdom_id = tag / mpi_size;
34 }
35
36 /**
37 Finds subdomain in the queue by its subdomain id.
38 */
findSubdomain(int sdom_id)39 int ParallelComm::findSubdomain(int sdom_id){
40
41 // find subdomain in queue
42 int index;
43 for(index = 0;index < queue_sdom_ids.size();++ index){
44 if(queue_sdom_ids[index] == sdom_id){
45 break;
46 }
47 }
48 if(index == queue_sdom_ids.size()){
49 printf("Cannot find subdomain id %d in work queue\n", sdom_id);
50 MPI_Abort(MPI_COMM_WORLD, 1);
51 }
52
53 return index;
54 }
55
56
dequeueSubdomain(int sdom_id)57 Subdomain *ParallelComm::dequeueSubdomain(int sdom_id){
58 int index = findSubdomain(sdom_id);
59
60 // Get subdomain pointer before removing it from queue
61 Subdomain *sdom = queue_subdomains[index];
62
63 // remove subdomain from queue
64 queue_sdom_ids.erase(queue_sdom_ids.begin()+index);
65 queue_subdomains.erase(queue_subdomains.begin()+index);
66 queue_depends.erase(queue_depends.begin()+index);
67
68 return sdom;
69 }
70
71 /**
72 Adds a subdomain to the work queue.
73 Determines if upwind dependencies require communication, and posts appropirate Irecv's.
74 All recieves use the plane_data[] arrays as recieve buffers.
75 */
postRecvs(int sdom_id,Subdomain & sdom)76 void ParallelComm::postRecvs(int sdom_id, Subdomain &sdom){
77 int mpi_rank, mpi_size;
78 MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank);
79 MPI_Comm_size(MPI_COMM_WORLD, &mpi_size);
80
81 // go thru each dimensions upwind neighbors, and add the dependencies
82 int num_depends = 0;
83 for(int dim = 0;dim < 3;++ dim){
84 // If it's a boundary condition, skip it
85 if(sdom.upwind[dim].mpi_rank < 0){
86 continue;
87 }
88
89 // If it's an on-rank communication (from another subdomain)
90 if(sdom.upwind[dim].mpi_rank == mpi_rank){
91 // skip it, but track the dependency
92 num_depends ++;
93 continue;
94 }
95
96 // Add request to pending list
97 recv_requests.push_back(MPI_Request());
98 recv_subdomains.push_back(sdom_id);
99 incomingRequests++;
100 // compute the tag id of THIS subdomain (tags are always based on destination)
101 int tag = computeTag(sdom.upwind[dim].mpi_rank, sdom.upwind[dim].subdomain_id);
102
103 // Post the recieve
104 MPI_Irecv(sdom.plane_data[dim]->ptr(), sdom.plane_data[dim]->elements, MPI_DOUBLE, sdom.upwind[dim].mpi_rank,
105 tag, MPI_COMM_WORLD, &recv_requests[recv_requests.size()-1]);
106
107 // increment number of dependencies
108 num_depends ++;
109 }
110
111 // add subdomain to queue
112 queue_sdom_ids.push_back(sdom_id);
113 queue_subdomains.push_back(&sdom);
114 queue_depends.push_back(num_depends);
115 }
116
postSends(Subdomain * sdom,double * src_buffers[3])117 void ParallelComm::postSends(Subdomain *sdom, double *src_buffers[3]){
118 // post sends for downwind dependencies
119 int mpi_rank, mpi_size;
120 MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank);
121 MPI_Comm_size(MPI_COMM_WORLD, &mpi_size);
122 for(int dim = 0;dim < 3;++ dim){
123 // If it's a boundary condition, skip it
124 if(sdom->downwind[dim].mpi_rank < 0){
125 continue;
126 }
127
128 // If it's an on-rank communication (to another subdomain)
129 if(sdom->downwind[dim].mpi_rank == mpi_rank){
130 // find the local subdomain in the queue, and decrement the counter
131 for(int i = 0;i < queue_sdom_ids.size();++ i){
132 if(queue_sdom_ids[i] == sdom->downwind[dim].subdomain_id){
133 queue_depends[i] --;
134 break;
135 }
136 }
137
138 // copy the boundary condition data into the downwinds plane data
139 Subdomain &sdom_downwind = grid_data->subdomains[sdom->downwind[dim].subdomain_id];
140 sdom_downwind.plane_data[dim]->copy(*sdom->plane_data[dim]);
141 int num_elem = sdom_downwind.plane_data[dim]->elements;
142 //double const * KRESTRICT src_ptr = sdom->plane_data[dim]->ptr();
143 double * KRESTRICT dst_ptr = sdom_downwind.plane_data[dim]->ptr();
144 for(int i = 0;i < num_elem;++ i){
145 dst_ptr[i] = src_buffers[dim][i];
146 }
147 continue;
148 }
149
150 // At this point, we know that we have to send an MPI message
151 // Add request to send queue
152 send_requests.push_back(MPI_Request());
153 outgoingRequests++;
154 // compute the tag id of TARGET subdomain (tags are always based on destination)
155 int tag = computeTag(mpi_rank, sdom->downwind[dim].subdomain_id);
156
157 // Post the send
158 MPI_Isend(src_buffers[dim], sdom->plane_data[dim]->elements, MPI_DOUBLE, sdom->downwind[dim].mpi_rank,
159 tag, MPI_COMM_WORLD, &send_requests[send_requests.size()-1]);
160 }
161 }
162
163
164 // Checks if there are any outstanding subdomains to complete
workRemaining(void)165 bool ParallelComm::workRemaining(void){
166 return (recv_requests.size() > 0 || queue_subdomains.size() > 0);
167 }
168
169
170 // Blocks until all sends have completed, and flushes the send queues
waitAllSends(void)171 void ParallelComm::waitAllSends(void){
172 // Wait for all remaining sends to complete, then return false
173 int num_sends = send_requests.size();
174 if(num_sends > 0){
175 std::vector<MPI_Status> status(num_sends);
176 MPI_Waitall(num_sends, &send_requests[0], &status[0]);
177 send_requests.clear();
178 }
179 }
180
181 /**
182 Checks for incomming messages, and does relevant bookkeeping.
183 */
testRecieves(void)184 void ParallelComm::testRecieves(void){
185
186 // Check for any recv requests that have completed
187 int num_requests = recv_requests.size();
188 bool done = false;
189 while(!done && num_requests > 0){
190 // Create array of status variables
191 std::vector<MPI_Status> recv_status(num_requests);
192
193 // Ask if either one or none of the recvs have completed?
194 int index; // this will be the index of request that completed
195 int complete_flag; // this is set to TRUE if somthing completed
196 MPI_Testany(num_requests, &recv_requests[0], &index, &complete_flag, &recv_status[0]);
197
198 if(complete_flag != 0){
199
200 // get subdomain that this completed for
201 int sdom_id = recv_subdomains[index];
202
203 // remove the request from the list
204 recv_requests.erase(recv_requests.begin()+index);
205 recv_subdomains.erase(recv_subdomains.begin()+index);
206 num_requests --;
207
208 // decrement the dependency count for that subdomain
209 for(int i = 0;i < queue_sdom_ids.size();++ i){
210 if(queue_sdom_ids[i] == sdom_id){
211 queue_depends[i] --;
212 break;
213 }
214 }
215 }
216 else{
217 done = true;
218 }
219 }
220 }
221
222
getReadyList(void)223 std::vector<int> ParallelComm::getReadyList(void){
224 // build up a list of ready subdomains
225 std::vector<int> ready;
226 for(int i = 0;i < queue_depends.size();++ i){
227 if(queue_depends[i] == 0){
228 ready.push_back(queue_sdom_ids[i]);
229 }
230 }
231 return ready;
232 }
233
234
getIncomingRequests()235 int ParallelComm::getIncomingRequests()
236 {
237 return incomingRequests;
238 }
239
getOutgoingRequests()240 int ParallelComm::getOutgoingRequests()
241 {
242 return incomingRequests;
243 }
244
resetRequests()245 void ParallelComm::resetRequests()
246 {
247 incomingRequests = 0;
248 outgoingRequests = 0;
249 }
250