1 /*****************************************************************************/
2 /* XDMF */
3 /* eXtensible Data Model and Format */
4 /* */
5 /* Id : XdmfDSMCommMPI.cpp */
6 /* */
7 /* Author: */
8 /* Andrew Burns */
9 /* andrew.j.burns2@us.army.mil */
10 /* US Army Research Laboratory */
11 /* Aberdeen Proving Ground, MD */
12 /* */
13 /* Copyright @ 2013 US Army Research Laboratory */
14 /* All Rights Reserved */
15 /* See Copyright.txt for details */
16 /* */
17 /* This software is distributed WITHOUT ANY WARRANTY; without */
18 /* even the implied warranty of MERCHANTABILITY or FITNESS */
19 /* FOR A PARTICULAR PURPOSE. See the above copyright notice */
20 /* for more information. */
21 /* */
22 /*****************************************************************************/
23
24 /*=========================================================================
25 This code is derived from an earlier work and is distributed
26 with permission from, and thanks to ...
27 =========================================================================*/
28
29 /*============================================================================
30
31 Project : H5FDdsm
32 Module : H5FDdsmCommMpi.cxx
33
34 Authors:
35 John Biddiscombe Jerome Soumagne
36 biddisco@cscs.ch soumagne@cscs.ch
37
38 Copyright (C) CSCS - Swiss National Supercomputing Centre.
39 You may use modify and and distribute this code freely providing
40 1) This copyright notice appears on all copies of source code
41 2) An acknowledgment appears with any substantial usage of the code
42 3) If this code is contributed to any other open source project, it
43 must not be reformatted such that the indentation, bracketing or
44 overall style is modified significantly.
45
46 This software is distributed WITHOUT ANY WARRANTY; without even the
47 implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
48
49 This work has received funding from the European Community's Seventh
50 Framework Programme (FP7/2007-2013) under grant agreement 225967 âxtMuSEâOC
51
52 ============================================================================*/
53
54 #include <XdmfDSMCommMPI.hpp>
55 #include <XdmfError.hpp>
56 #include <mpi.h>
57 #include <string.h>
58 #include <cstdlib>
59 #include <fstream>
60 #include <iostream>
61
62 bool XdmfDSMCommMPI::UseEnvFileName = false;
63
XdmfDSMCommMPI()64 XdmfDSMCommMPI::XdmfDSMCommMPI()
65 {
66 IntraComm = MPI_COMM_NULL;
67 Id = -1;
68 IntraSize = -1;
69 InterComm = MPI_COMM_NULL;
70 InterId = -1;
71 InterSize = -1;
72 SetDsmPortName("");
73 // This is the default file name for the config file.
74 DsmFileName = "dsmconnect.cfg";
75 if (XdmfDSMCommMPI::UseEnvFileName)
76 {
77 // Grab from ENV
78 if (std::getenv("XDMFDSM_CONFIG_FILE") != NULL)
79 {
80 DsmFileName = std::getenv("XDMFDSM_CONFIG_FILE");
81 }
82 }
83 InterCommType = XDMF_DSM_COMM_MPI;
84 HasOpenedPort = false;
85 ApplicationName = "Application";
86 }
87
~XdmfDSMCommMPI()88 XdmfDSMCommMPI::~XdmfDSMCommMPI()
89 {
90 #ifndef OPEN_MPI
91 if (InterComm != MPI_COMM_NULL) {
92 int status = MPI_Comm_free(&InterComm);
93 if (status != MPI_SUCCESS) {
94 try {
95 XdmfError::message(XdmfError::FATAL, "Failed to free intercomm Comm");
96 }
97 catch (XdmfError & e) {
98 throw e;
99 }
100 }
101 }
102 if (IntraComm != MPI_COMM_NULL) {
103 int status = MPI_Comm_free(&IntraComm);
104 if (status != MPI_SUCCESS) {
105 try {
106 XdmfError::message(XdmfError::FATAL, "Failed to free intercomm Comm");
107 }
108 catch (XdmfError & e) {
109 throw e;
110 }
111 }
112 }
113 #endif
114 }
115
116 void
Accept(unsigned int numConnections)117 XdmfDSMCommMPI::Accept(unsigned int numConnections)
118 {
119 #ifndef XDMF_DSM_IS_CRAY
120 int status;
121 int acceptingLeadId;
122 while (numConnections > 0) {
123 if (InterComm == MPI_COMM_NULL) {
124 acceptingLeadId = this->IntraSize;
125 // If there is no InterComm, then accept from IntraComm and merge into InterComm
126 MPI_Comm tempComm;
127 int * portCheck = new int[GetInterSize()]();
128 int portStatus;
129 portStatus = 0;
130 if (HasOpenedPort) {
131 portStatus = 1;
132 }
133 MPI_Allgather(&portStatus, 1, MPI_INT, &(portCheck[0]), 1, MPI_INT, InterComm);
134 int index = 0;
135 for (index = 0; index < GetInterSize(); ++index) {
136 if (portCheck[index] == 1) {
137 break;
138 }
139 }
140 int status = MPI_Comm_accept(DsmPortName, MPI_INFO_NULL, index, IntraComm, &tempComm);
141 if (status != MPI_SUCCESS) {
142 try {
143 std::string message = "Failed to accept port ";
144 message = message + DsmPortName;
145 XdmfError::message(XdmfError::FATAL, message);
146 }
147 catch (XdmfError & e) {
148 throw e;
149 }
150 }
151 // False is specified for high so that the index of the cores doesn't change
152 MPI_Comm mergedComm;
153 status = MPI_Intercomm_merge(tempComm, false, &mergedComm);
154 this->DupInterComm(mergedComm);
155 if (status != MPI_SUCCESS) {
156 try {
157 XdmfError::message(XdmfError::FATAL, "Failed to merge intercomm");
158 }
159 catch (XdmfError & e) {
160 throw e;
161 }
162 }
163 else {
164 MPI_Comm_rank(InterComm, &InterId);
165 MPI_Comm_size(InterComm, &InterSize);
166 }
167 }
168 else {
169 acceptingLeadId = this->InterSize;
170 // If there is an InterComm, accept into the InterComm and merge
171 MPI_Comm tempComm;
172 int * portCheck = new int[GetInterSize()]();
173 int portStatus;
174 portStatus = 0;
175 if (HasOpenedPort) {
176 portStatus = 1;
177 }
178 MPI_Allgather(&portStatus, 1, MPI_INT, &(portCheck[0]), 1, MPI_INT, InterComm);
179 int index = 0;
180 for (index = 0; index < GetInterSize(); ++index) {
181 if (portCheck[index] == 1) {
182 break;
183 }
184 }
185 int status = MPI_Comm_accept(DsmPortName, MPI_INFO_NULL, index, InterComm, &tempComm);
186 if (status != MPI_SUCCESS) {
187 try {
188 std::string message = "Failed to accept port ";
189 message = message + DsmPortName;
190 XdmfError::message(XdmfError::FATAL, message);
191 }
192 catch (XdmfError & e) {
193 throw e;
194 }
195 }
196 // False is specified for high so that the index of the cores doesn't change
197 MPI_Comm mergedComm;
198 status = MPI_Intercomm_merge(tempComm, false, &mergedComm);
199 this->DupInterComm(mergedComm);
200 if (status != MPI_SUCCESS) {
201 try {
202 XdmfError::message(XdmfError::FATAL, "Failed to merge InterComm");
203 }
204 catch (XdmfError & e) {
205 throw e;
206 }
207 }
208 else {
209 MPI_Comm_rank(InterComm, &InterId);
210 MPI_Comm_size(InterComm, &InterSize);
211 }
212 }
213
214 //regen Intra comm from Inter Comm
215 MPI_Group IntraGroup, InterGroup;
216 MPI_Comm_group(InterComm, &InterGroup);
217 int * ServerIds = (int *)calloc(this->IntraSize, sizeof(int));
218 unsigned int index = 0;
219 for(int i=this->InterId - this->Id; i < this->InterId - this->Id + this->IntraSize; ++i)
220 {
221 ServerIds[index++] = i;
222 }
223 MPI_Group_incl(InterGroup, this->IntraSize, ServerIds, &IntraGroup);
224 MPI_Comm_create(InterComm, IntraGroup, &IntraComm);
225 cfree(ServerIds);
226
227 // Since this is accept, we will be recieving the local data from the new core and
228 // sending the overarching data to the connecting cores
229 unsigned int length;
230 char * appname;
231 unsigned int appsize;
232 if (DsmProcessStructure.size() == 0)
233 {
234 DsmProcessStructure.push_back(std::pair<std::string, unsigned int>(this->ApplicationName, this->IntraSize));
235 }
236 int numSections = DsmProcessStructure.size();
237 if (InterId == 0)
238 {
239 // Loop in the applicaiton names of the already existing sections of the DSM
240 // Get the number of application sections
241 MPI_Send(&numSections, 1, MPI_INT, acceptingLeadId, XDMF_DSM_EXCHANGE_TAG, this->InterComm);
242 for (unsigned int i = 0; i < numSections; ++i)
243 {
244 length = DsmProcessStructure[i].first.size();
245 // Get the length of the name
246 MPI_Send(&length, 1, MPI_UNSIGNED, acceptingLeadId, XDMF_DSM_EXCHANGE_TAG, InterComm);
247 // Get the string of characters
248 appname = new char[length]();
249 strcpy(appname, DsmProcessStructure[i].first.c_str());
250 MPI_Send(appname, length, MPI_CHAR, acceptingLeadId, XDMF_DSM_EXCHANGE_TAG, InterComm);
251 delete appname;
252 // Get associated numprocs
253 appsize = DsmProcessStructure[i].second;
254 MPI_Send(&appsize, 1, MPI_UNSIGNED, acceptingLeadId, XDMF_DSM_EXCHANGE_TAG, InterComm);
255 }
256 }
257 // Add the information for the newly added application
258 // For each application in the connecting set.
259 MPI_Bcast(&numSections, 1, MPI_UNSIGNED, acceptingLeadId, InterComm);
260 for (unsigned int i = 0; i < numSections; ++i)
261 {
262 MPI_Bcast(&length, 1, MPI_UNSIGNED, acceptingLeadId, InterComm);
263 appname = new char[length+1]();
264 MPI_Bcast(appname, length, MPI_CHAR, acceptingLeadId, InterComm);
265 appname[length] = 0;
266 MPI_Bcast(&appsize, 1, MPI_UNSIGNED, acceptingLeadId, InterComm);
267 DsmProcessStructure.push_back(std::pair<std::string, unsigned int>(std::string(appname), appsize));
268 }
269 --numConnections;
270 MPI_Bcast(&numConnections, 1, MPI_INT, 0, InterComm);
271 }
272 #endif
273 }
274
275 void
AllGather(void * sendbuf,int sendbytes,void * recvbuf,int recvbytes,int comm)276 XdmfDSMCommMPI::AllGather(void *sendbuf,
277 int sendbytes,
278 void *recvbuf,
279 int recvbytes,
280 int comm)
281 {
282 if (comm == XDMF_DSM_INTRA_COMM)
283 {
284 MPI_Allgather(sendbuf,
285 sendbytes,
286 MPI_UNSIGNED_CHAR,
287 recvbuf,
288 recvbytes,
289 MPI_UNSIGNED_CHAR,
290 IntraComm);
291 }
292 else if (comm == XDMF_DSM_INTER_COMM)
293 {
294 MPI_Allgather(sendbuf,
295 sendbytes,
296 MPI_UNSIGNED_CHAR,
297 recvbuf,
298 recvbytes,
299 MPI_UNSIGNED_CHAR,
300 InterComm);
301 }
302 }
303
304 void
Barrier(int comm)305 XdmfDSMCommMPI::Barrier(int comm)
306 {
307 int status;
308
309 if (comm == XDMF_DSM_INTRA_COMM)
310 {
311 status = MPI_Barrier(IntraComm);
312 }
313 else if (comm == XDMF_DSM_INTER_COMM)
314 {
315 status = MPI_Barrier(InterComm);
316 }
317 }
318
319 void
Broadcast(void * pointer,int sizebytes,int root,int comm)320 XdmfDSMCommMPI::Broadcast(void * pointer,
321 int sizebytes,
322 int root,
323 int comm)
324 {
325 int status;
326
327 if (comm == XDMF_DSM_INTRA_COMM)
328 {
329 status = MPI_Bcast(pointer, sizebytes, MPI_UNSIGNED_CHAR, root, IntraComm);
330 }
331 else if (comm == XDMF_DSM_INTER_COMM)
332 {
333 status = MPI_Bcast(pointer, sizebytes, MPI_UNSIGNED_CHAR, root, InterComm);
334 }
335 }
336
337 void
ClosePort()338 XdmfDSMCommMPI::ClosePort()
339 {
340 #ifndef XDMF_DSM_IS_CRAY
341 if (Id == 0) {
342 int status;
343 for (unsigned int i = 0; i < PreviousDsmPortNames.size(); ++i)
344 {
345 status = MPI_Close_port(PreviousDsmPortNames[i]);
346 if (status != MPI_SUCCESS) {
347 try {// OpenMPI iterate through open ports in order to close the multiple needed
348 std::string message = "Failed to close port ";
349 message = message + PreviousDsmPortNames[i];
350 XdmfError::message(XdmfError::FATAL, message);
351 }
352 catch (XdmfError & e) {
353 throw e;
354 }
355 }
356 }
357 }
358 #endif
359 HasOpenedPort = false;
360 }
361
362 int
Connect()363 XdmfDSMCommMPI::Connect()
364 {
365 #ifndef XDMF_DSM_IS_CRAY
366 int status;
367 MPI_Status mpistatus;
368 if (InterComm == MPI_COMM_NULL) {
369 this->DupInterComm(IntraComm);
370 }
371 MPI_Comm tempComm;
372 MPI_Comm tempConnectComm;
373 MPI_Comm_dup(InterComm, &tempConnectComm);
374 MPI_Errhandler_set(InterComm, MPI_ERRORS_RETURN);
375 status = MPI_Comm_connect(DsmPortName, MPI_INFO_NULL, 0, tempConnectComm, &tempComm);
376 MPI_Errhandler_set(InterComm, MPI_ERRORS_ARE_FATAL);
377 if (status != MPI_SUCCESS) {
378 try {
379 std::string message = "Failed to connect to port ";
380 message = message + DsmPortName;
381 XdmfError::message(XdmfError::FATAL, message);
382 }
383 catch (XdmfError & e) {
384 throw e;
385 }
386 }
387 MPI_Comm mergedComm;
388 status = MPI_Intercomm_merge(tempComm, true, &mergedComm);
389 this->DupInterComm(mergedComm);
390 if (status != MPI_SUCCESS) {
391 try {
392 XdmfError::message(XdmfError::FATAL, "Failed to merge InterComm");
393 }
394 catch (XdmfError & e) {
395 throw e;
396 }
397 }
398 else {
399 status = MPI_Comm_rank(InterComm, &InterId);
400 status = MPI_Comm_size(InterComm, &InterSize);
401 }
402
403 //regen Intra comm from Inter Comm
404 MPI_Group IntraGroup, InterGroup;
405 MPI_Comm_group(InterComm, &InterGroup);
406 int * ServerIds = (int *)calloc(this->IntraSize, sizeof(int));
407 unsigned int index = 0;
408 for(int i=this->InterId - this->Id; i < this->InterId - this->Id + this->IntraSize; ++i)
409 {
410 ServerIds[index++] = i;
411 }
412 MPI_Group_incl(InterGroup, this->IntraSize, ServerIds, &IntraGroup);
413 MPI_Comm_create(InterComm, IntraGroup, &IntraComm);
414 cfree(ServerIds);
415
416 // Here the process will send information about itself to the server and
417 // the server will diseminate that info across all the connected processes
418 std::vector<std::pair<std::string, unsigned int> > structureArchive;
419 // Archive old structure if it exists
420 if (DsmProcessStructure.size() > 0)
421 {
422 for (unsigned int i = 0; i < DsmProcessStructure.size(); ++i)
423 {
424 structureArchive.push_back(DsmProcessStructure[i]);
425 }
426 DsmProcessStructure.clear();
427 }
428 // Loop in the applicaiton names of the already existing sections of the DSM
429 // Get the number of application sections
430 int numSections;
431 if (this->Id == 0)
432 {
433 MPI_Recv(&numSections, 1, MPI_INT, 0, XDMF_DSM_EXCHANGE_TAG, this->InterComm, &mpistatus);
434 }
435 MPI_Bcast(&numSections, 1, MPI_INT, 0, IntraComm);
436 unsigned int length;
437 char * appname;
438 unsigned int appsize;
439 for (unsigned int i = 0; i < numSections; ++i)
440 {
441 if (this->Id == 0)
442 {
443 // Get the length of the name
444 MPI_Recv(&length, 1, MPI_UNSIGNED, 0, XDMF_DSM_EXCHANGE_TAG, InterComm, &mpistatus);
445 // Get the string of characters
446 appname = new char[length+1]();
447 MPI_Recv(appname, length, MPI_CHAR, 0, XDMF_DSM_EXCHANGE_TAG, InterComm, &mpistatus);
448 appname[length] = 0;
449 // Get associated numprocs
450 MPI_Recv(&appsize, 1, MPI_UNSIGNED, 0, XDMF_DSM_EXCHANGE_TAG, InterComm, &mpistatus);
451 }
452 // Broadcast to local comm
453 MPI_Bcast(&length, 1, MPI_UNSIGNED, 0, IntraComm);
454 if (this->Id != 0)
455 {
456 appname = new char[length+1]();
457 }
458 MPI_Bcast(appname, length+1, MPI_CHAR, 0, IntraComm);
459 MPI_Bcast(&appsize, 1, MPI_UNSIGNED, 0, IntraComm);
460
461 DsmProcessStructure.push_back(std::pair<std::string, unsigned int>(std::string(appname), appsize));
462 }
463 if (structureArchive.size() == 0)
464 {
465 numSections = 1;
466 MPI_Bcast(&numSections, 1, MPI_INT, InterId-Id, InterComm);
467 length = ApplicationName.size();
468 appsize = this->IntraSize;
469 MPI_Bcast(&length, 1, MPI_UNSIGNED, InterId-Id, InterComm);
470 appname = new char[length]();
471 strcpy(appname, ApplicationName.c_str());
472 MPI_Bcast(appname, length, MPI_CHAR, InterId-Id, InterComm);
473 delete appname;
474 MPI_Bcast(&appsize, 1, MPI_UNSIGNED, InterId-Id, InterComm);
475 DsmProcessStructure.push_back(std::pair<std::string, unsigned int>(ApplicationName, appsize));
476 }
477 else
478 {
479 numSections = structureArchive.size();
480 MPI_Bcast(&numSections, 1, MPI_UNSIGNED, InterId-Id, InterComm);
481 for (unsigned int i = 0; i < numSections; ++i)
482 {
483 length = structureArchive[i].first.size();
484 appsize = structureArchive[i].second;
485 MPI_Bcast(&length, 1, MPI_UNSIGNED, InterId-Id, InterComm);
486 appname = new char[length]();
487 strcpy(appname, structureArchive[i].first.c_str());
488 MPI_Bcast(appname, length, MPI_CHAR, InterId-Id, InterComm);
489 delete appname;
490 MPI_Bcast(&appsize, 1, MPI_UNSIGNED, InterId-Id, InterComm);
491 DsmProcessStructure.push_back(std::pair<std::string, unsigned int>(structureArchive[i].first, appsize));
492 }
493 }
494 int numAccepts = 0;
495 MPI_Bcast(&numAccepts, 1, MPI_INT, 0, InterComm);
496 #ifdef OPEN_MPI
497 if (numAccepts > 0) {
498 MPI_Bcast(DsmPortName, MPI_MAX_PORT_NAME, MPI_CHAR, 0, InterComm);
499 }
500 #endif
501 Accept(numAccepts);
502 return MPI_SUCCESS;
503 #endif
504 return MPI_SUCCESS;
505 }
506
507 void
Disconnect()508 XdmfDSMCommMPI::Disconnect()
509 {
510 #ifndef XDMF_DSM_IS_CRAY
511 #ifndef OPEN_MPI
512 if (InterComm != MPI_COMM_NULL) {
513 int status = MPI_Comm_free(&InterComm);
514 if (status != MPI_SUCCESS) {
515 try {
516 XdmfError::message(XdmfError::FATAL, "Failed to disconnect Comm");
517 }
518 catch (XdmfError & e) {
519 throw e;
520 }
521 }
522 }
523 #endif
524 #endif
525 InterComm = MPI_COMM_NULL;
526 }
527
528 void
DupComm(MPI_Comm comm)529 XdmfDSMCommMPI::DupComm(MPI_Comm comm)
530 {
531 if (IntraComm != comm) {
532 int status;
533 #ifndef OPEN_MPI
534 if (IntraComm != MPI_COMM_NULL) {
535 status = MPI_Comm_free(&IntraComm);
536 if (status != MPI_SUCCESS) {
537 try {
538 XdmfError::message(XdmfError::FATAL, "Failed to disconnect Comm");
539 }
540 catch (XdmfError & e) {
541 throw e;
542 }
543 }
544 }
545 #endif
546 if (comm != MPI_COMM_NULL) {
547 status = MPI_Comm_dup(comm, &IntraComm);
548 if (status != MPI_SUCCESS) {
549 try {
550 XdmfError::message(XdmfError::FATAL, "Failed to duplicate Comm");
551 }
552 catch (XdmfError & e) {
553 throw e;
554 }
555 }
556 else {
557 status = MPI_Comm_size(IntraComm, &IntraSize);
558 status = MPI_Comm_rank(IntraComm, &Id);
559 }
560 }
561 }
562 }
563
564 void
DupInterComm(MPI_Comm comm)565 XdmfDSMCommMPI::DupInterComm(MPI_Comm comm)
566 {
567 if (InterComm != comm) {
568 int status;
569 #ifndef OPEN_MPI
570 if (InterComm != MPI_COMM_NULL) {
571 status = MPI_Comm_free(&InterComm);
572 if (status != MPI_SUCCESS) {
573 try {
574 XdmfError::message(XdmfError::FATAL, "Failed to disconnect Comm");
575 }
576 catch (XdmfError & e) {
577 throw e;
578 }
579 }
580 }
581 #endif
582 if (comm != MPI_COMM_NULL) {
583 status = MPI_Comm_dup(comm, &InterComm);
584 if (status != MPI_SUCCESS) {
585 try {
586 XdmfError::message(XdmfError::FATAL, "Failed to duplicate Comm");
587 }
588 catch (XdmfError & e) {
589 throw e;
590 }
591 }
592 else {
593 status = MPI_Comm_rank(InterComm, &InterId);
594 status = MPI_Comm_size(InterComm, &InterSize);
595 }
596 }
597 else {
598 InterId = -1;
599 InterSize = -1;
600 }
601 }
602 }
603
604 std::string
GetApplicationName()605 XdmfDSMCommMPI::GetApplicationName()
606 {
607 return ApplicationName;
608 }
609
610 std::string
GetDsmFileName()611 XdmfDSMCommMPI::GetDsmFileName()
612 {
613 return DsmFileName;
614 }
615
616 char *
GetDsmPortName()617 XdmfDSMCommMPI::GetDsmPortName()
618 {
619 return DsmPortName;
620 }
621
622 std::vector<std::pair<std::string, unsigned int> >
GetDsmProcessStructure()623 XdmfDSMCommMPI::GetDsmProcessStructure()
624 {
625 return DsmProcessStructure;
626 }
627
628 int
GetId()629 XdmfDSMCommMPI::GetId()
630 {
631 return this->Id;
632 }
633
634 MPI_Comm
GetInterComm()635 XdmfDSMCommMPI::GetInterComm()
636 {
637 return InterComm;
638 }
639
640 int
GetInterCommType()641 XdmfDSMCommMPI::GetInterCommType()
642 {
643 return this->InterCommType;
644 }
645
646 int
GetInterId()647 XdmfDSMCommMPI::GetInterId()
648 {
649 return this->InterId;
650 }
651
652 int
GetInterSize()653 XdmfDSMCommMPI::GetInterSize()
654 {
655 return this->InterSize;
656 }
657
658 MPI_Comm
GetIntraComm()659 XdmfDSMCommMPI::GetIntraComm()
660 {
661 return IntraComm;
662 }
663
664 int
GetIntraSize()665 XdmfDSMCommMPI::GetIntraSize()
666 {
667 return this->IntraSize;
668 }
669
670 bool
GetUseEnvFileName()671 XdmfDSMCommMPI::GetUseEnvFileName()
672 {
673 return XdmfDSMCommMPI::UseEnvFileName;
674 }
675
676 void
Init()677 XdmfDSMCommMPI::Init()
678 {
679 int size, rank;
680 if (MPI_Comm_size(this->IntraComm, &size) != MPI_SUCCESS) {
681 try {
682 XdmfError::message(XdmfError::FATAL, "Failed to initialize size");
683 }
684 catch (XdmfError & e) {
685 throw e;
686 }
687 }
688 if (MPI_Comm_rank(this->IntraComm, &rank) != MPI_SUCCESS) {
689 try {
690 XdmfError::message(XdmfError::FATAL, "Failed to initialize rank");
691 }
692 catch (XdmfError & e) {
693 throw e;
694 }
695 }
696
697 this->Id = rank;
698 this->IntraSize = size;
699 }
700
701 void
OpenPort()702 XdmfDSMCommMPI::OpenPort()
703 {
704 if (Id == 0) {
705 #ifndef XDMF_DSM_IS_CRAY
706 int status = MPI_Open_port(MPI_INFO_NULL, DsmPortName);
707 if (status != MPI_SUCCESS) {
708 try {
709 std::string message = "Failed to open port ";
710 message = message + DsmPortName;
711 XdmfError::message(XdmfError::FATAL, message);
712 }
713 catch (XdmfError & e) {
714 throw e;
715 }
716 }
717 PreviousDsmPortNames.push_back(DsmPortName);
718 #endif
719 std::ofstream connectFile (DsmFileName.c_str());
720 if (connectFile.is_open()) {
721 connectFile << DsmPortName;
722 connectFile.close();
723 }
724 else {
725 try {
726 XdmfError::message(XdmfError::FATAL, "Failed to write port to file");
727 }
728 catch (XdmfError & e) {
729 throw e;
730 }
731 }
732 HasOpenedPort = true;
733 }
734 #ifndef XDMF_DSM_IS_CRAY
735 MPI_Bcast(DsmPortName, MPI_MAX_PORT_NAME, MPI_CHAR, 0, IntraComm);
736 #endif
737 }
738
739 void
Probe(int * comm)740 XdmfDSMCommMPI::Probe(int *comm)
741 {
742 // Used for finding a comm that has a waiting command, then sets the comm
743 int status = XDMF_DSM_FAIL;
744 MPI_Status signalStatus;
745
746 int flag;
747 MPI_Comm probeComm =
748 this->GetIntraComm();
749
750 // Spin until a message is found on one of the communicators
751 while (status != XDMF_DSM_SUCCESS) {
752 status = MPI_Iprobe(XDMF_DSM_ANY_SOURCE,
753 XDMF_DSM_ANY_TAG,
754 probeComm,
755 &flag,
756 &signalStatus);
757 if (status != MPI_SUCCESS)
758 {
759 try {
760 XdmfError::message(XdmfError::FATAL,
761 "Error: Failed to probe for command header");
762 }
763 catch (XdmfError & e) {
764 throw e;
765 }
766 }
767 if (flag) {
768 status = XDMF_DSM_SUCCESS;
769 }
770 else {
771 if (this->GetInterComm() != MPI_COMM_NULL) {
772 if (probeComm == this->GetIntraComm()) {
773 probeComm = this->GetInterComm();
774 }
775 else {
776 probeComm = this->GetIntraComm();
777 }
778 }
779 }
780 }
781 if (probeComm == this->GetInterComm()) {
782 *comm = XDMF_DSM_INTER_COMM;
783 }
784 else
785 {
786 *comm = XDMF_DSM_INTRA_COMM;
787 }
788
789 probeComm = MPI_COMM_NULL;
790 }
791
792 void
ReadDsmPortName()793 XdmfDSMCommMPI::ReadDsmPortName()
794 {
795 #ifndef XDMF_DSM_IS_CRAY
796 std::ifstream connectFile(DsmFileName.c_str());
797 std::string connectLine;
798 if (connectFile.is_open()) {
799 getline(connectFile, connectLine);
800 }
801 strcpy(DsmPortName, connectLine.c_str());
802 #endif
803 }
804
805 void
Send(void * pointer,int sizebytes,int coreTo,int comm,int tag)806 XdmfDSMCommMPI::Send(void * pointer,
807 int sizebytes,
808 int coreTo,
809 int comm,
810 int tag)
811 {
812 int status;
813 if (comm == XDMF_DSM_INTRA_COMM) {
814 status = MPI_Send(pointer,
815 sizebytes,
816 MPI_UNSIGNED_CHAR,
817 coreTo,
818 tag,
819 IntraComm);
820 }
821 else if (comm == XDMF_DSM_INTER_COMM) {
822 status = MPI_Send(pointer,
823 sizebytes,
824 MPI_UNSIGNED_CHAR,
825 coreTo,
826 tag,
827 InterComm);
828 }
829 }
830
831 void
Receive(void * pointer,int sizebytes,int coreFrom,int comm,int tag)832 XdmfDSMCommMPI::Receive(void * pointer,
833 int sizebytes,
834 int coreFrom,
835 int comm,
836 int tag)
837 {
838 int status;
839 MPI_Status signalStatus;
840 if (comm == XDMF_DSM_INTRA_COMM) {
841 status = MPI_Recv(pointer,
842 sizebytes,
843 MPI_UNSIGNED_CHAR,
844 coreFrom,
845 tag,
846 IntraComm,
847 &signalStatus);
848 }
849 else if (comm == XDMF_DSM_INTER_COMM) {
850 status = MPI_Recv(pointer,
851 sizebytes,
852 MPI_UNSIGNED_CHAR,
853 coreFrom,
854 tag,
855 InterComm,
856 &signalStatus);
857 }
858 }
859
860 void
SetApplicationName(std::string newName)861 XdmfDSMCommMPI::SetApplicationName(std::string newName)
862 {
863 ApplicationName = newName;
864 }
865
866 void
SetDsmFileName(std::string filename)867 XdmfDSMCommMPI::SetDsmFileName(std::string filename)
868 {
869 DsmFileName = filename;
870 }
871
872 void
SetDsmPortName(const char * hostName)873 XdmfDSMCommMPI::SetDsmPortName(const char *hostName)
874 {
875 strcpy(DsmPortName, hostName);
876 }
877
878 void
SetDsmProcessStructure(std::vector<std::pair<std::string,unsigned int>> & newStructure)879 XdmfDSMCommMPI::SetDsmProcessStructure(std::vector<std::pair<std::string, unsigned int> > & newStructure)
880 {
881 DsmProcessStructure = newStructure;
882 }
883
884 void
SetUseEnvFileName(bool status)885 XdmfDSMCommMPI::SetUseEnvFileName(bool status)
886 {
887 XdmfDSMCommMPI::UseEnvFileName = status;
888 }
889
890 // C Wrappers
891
XdmfDSMCommMPINew()892 XDMFDSMCOMMMPI * XdmfDSMCommMPINew()
893 {
894 try
895 {
896 return (XDMFDSMCOMMMPI *)((void *)(new XdmfDSMCommMPI()));
897 }
898 catch (...)
899 {
900 return (XDMFDSMCOMMMPI *)((void *)(new XdmfDSMCommMPI()));
901 }
902 }
903
XdmfDSMCommMPIFree(XDMFDSMCOMMMPI * item)904 void XdmfDSMCommMPIFree(XDMFDSMCOMMMPI * item)
905 {
906 if (item != NULL) {
907 delete ((XdmfDSMCommMPI *)item);
908 }
909 item = NULL;
910 }
911
XdmfDSMCommMPIAccept(XDMFDSMCOMMMPI * dsmComm,unsigned int numConnections,int * status)912 void XdmfDSMCommMPIAccept(XDMFDSMCOMMMPI * dsmComm, unsigned int numConnections, int * status)
913 {
914 XDMF_ERROR_WRAP_START(status)
915 ((XdmfDSMCommMPI *)dsmComm)->Accept(numConnections);
916 XDMF_ERROR_WRAP_END(status)
917 }
918
XdmfDSMCommMPIClosePort(XDMFDSMCOMMMPI * dsmComm,int * status)919 void XdmfDSMCommMPIClosePort(XDMFDSMCOMMMPI * dsmComm, int * status)
920 {
921 XDMF_ERROR_WRAP_START(status)
922 ((XdmfDSMCommMPI *)dsmComm)->ClosePort();
923 XDMF_ERROR_WRAP_END(status)
924 }
925
XdmfDSMCommMPIConnect(XDMFDSMCOMMMPI * dsmComm,int * status)926 int XdmfDSMCommMPIConnect(XDMFDSMCOMMMPI * dsmComm, int * status)
927 {
928 XDMF_ERROR_WRAP_START(status)
929 return ((XdmfDSMCommMPI *)dsmComm)->Connect();
930 XDMF_ERROR_WRAP_END(status)
931 return -1;
932 }
933
XdmfDSMCommMPIDisconnect(XDMFDSMCOMMMPI * dsmComm,int * status)934 void XdmfDSMCommMPIDisconnect(XDMFDSMCOMMMPI * dsmComm, int * status)
935 {
936 XDMF_ERROR_WRAP_START(status)
937 ((XdmfDSMCommMPI *)dsmComm)->Disconnect();
938 XDMF_ERROR_WRAP_END(status)
939 }
940
XdmfDSMCommMPIDupComm(XDMFDSMCOMMMPI * dsmComm,MPI_Comm comm,int * status)941 void XdmfDSMCommMPIDupComm(XDMFDSMCOMMMPI * dsmComm, MPI_Comm comm, int * status)
942 {
943 XDMF_ERROR_WRAP_START(status)
944 ((XdmfDSMCommMPI *)dsmComm)->DupComm(comm);
945 XDMF_ERROR_WRAP_END(status)
946 }
947
XdmfDSMCommMPIDupInterComm(XDMFDSMCOMMMPI * dsmComm,MPI_Comm comm,int * status)948 void XdmfDSMCommMPIDupInterComm(XDMFDSMCOMMMPI * dsmComm, MPI_Comm comm, int * status)
949 {
950 XDMF_ERROR_WRAP_START(status)
951 ((XdmfDSMCommMPI *)dsmComm)->DupInterComm(comm);
952 XDMF_ERROR_WRAP_END(status)
953 }
954
XdmfDSMCommMPIGetApplicationName(XDMFDSMCOMMMPI * dsmComm)955 char * XdmfDSMCommMPIGetApplicationName(XDMFDSMCOMMMPI * dsmComm)
956 {
957 try
958 {
959 char * returnPointer = strdup(((XdmfDSMCommMPI *)dsmComm)->GetApplicationName().c_str());
960 return returnPointer;
961 }
962 catch (...)
963 {
964 char * returnPointer = strdup(((XdmfDSMCommMPI *)dsmComm)->GetApplicationName().c_str());
965 return returnPointer;
966 }
967 }
968
XdmfDSMCommMPIGetDsmFileName(XDMFDSMCOMMMPI * dsmComm)969 char * XdmfDSMCommMPIGetDsmFileName(XDMFDSMCOMMMPI * dsmComm)
970 {
971 try
972 {
973 char * returnPointer = strdup(((XdmfDSMCommMPI *)dsmComm)->GetDsmFileName().c_str());
974 return returnPointer;
975 }
976 catch (...)
977 {
978 char * returnPointer = strdup(((XdmfDSMCommMPI *)dsmComm)->GetDsmFileName().c_str());
979 return returnPointer;
980 }
981 }
982
XdmfDSMCommMPIGetDsmPortName(XDMFDSMCOMMMPI * dsmComm)983 char * XdmfDSMCommMPIGetDsmPortName(XDMFDSMCOMMMPI * dsmComm)
984 {
985 try
986 {
987 return ((XdmfDSMCommMPI *)dsmComm)->GetDsmPortName();
988 }
989 catch (...)
990 {
991 return ((XdmfDSMCommMPI *)dsmComm)->GetDsmPortName();
992 }
993 }
994
XdmfDSMCommMPIGetDsmProcessStructure(XDMFDSMCOMMMPI * dsmComm,char ** names,unsigned int * coreCount,int * numApplications)995 void XdmfDSMCommMPIGetDsmProcessStructure(XDMFDSMCOMMMPI * dsmComm,
996 char ** names,
997 unsigned int * coreCount,
998 int * numApplications)
999 {
1000 try
1001 {
1002 std::vector<std::pair<std::string, unsigned int> > structure =
1003 ((XdmfDSMCommMPI *)dsmComm)->GetDsmProcessStructure();
1004 *numApplications = structure.size();
1005 coreCount = new unsigned int[*numApplications]();
1006 names = new char *[*numApplications]();
1007 for (unsigned int i = 0; i < *numApplications; ++i)
1008 {
1009 coreCount[i] = structure[i].second;
1010 names[i] = strdup(structure[i].first.c_str());
1011 }
1012 }
1013 catch (...)
1014 {
1015 std::vector<std::pair<std::string, unsigned int> > structure =
1016 ((XdmfDSMCommMPI *)dsmComm)->GetDsmProcessStructure();
1017 *numApplications = structure.size();
1018 coreCount = new unsigned int[*numApplications]();
1019 names = new char *[*numApplications]();
1020 for (unsigned int i = 0; i < *numApplications; ++i)
1021 {
1022 coreCount[i] = structure[i].second;
1023 names[i] = strdup(structure[i].first.c_str());
1024 }
1025 }
1026 }
1027
XdmfDSMCommMPIGetId(XDMFDSMCOMMMPI * dsmComm)1028 int XdmfDSMCommMPIGetId(XDMFDSMCOMMMPI * dsmComm)
1029 {
1030 try
1031 {
1032 return ((XdmfDSMCommMPI *)dsmComm)->GetId();
1033 }
1034 catch (...)
1035 {
1036 return ((XdmfDSMCommMPI *)dsmComm)->GetId();
1037 }
1038 }
1039
XdmfDSMCommMPIGetInterComm(XDMFDSMCOMMMPI * dsmComm)1040 MPI_Comm XdmfDSMCommMPIGetInterComm(XDMFDSMCOMMMPI * dsmComm)
1041 {
1042 try
1043 {
1044 return ((XdmfDSMCommMPI *)dsmComm)->GetInterComm();
1045 }
1046 catch (...)
1047 {
1048 return ((XdmfDSMCommMPI *)dsmComm)->GetInterComm();
1049 }
1050 }
1051
XdmfDSMCommMPIGetInterCommType(XDMFDSMCOMMMPI * dsmComm)1052 int XdmfDSMCommMPIGetInterCommType(XDMFDSMCOMMMPI * dsmComm)
1053 {
1054 try
1055 {
1056 return ((XdmfDSMCommMPI *)dsmComm)->GetInterCommType();
1057 }
1058 catch (...)
1059 {
1060 return ((XdmfDSMCommMPI *)dsmComm)->GetInterCommType();
1061 }
1062 }
1063
XdmfDSMCommMPIGetInterId(XDMFDSMCOMMMPI * dsmComm)1064 int XdmfDSMCommMPIGetInterId(XDMFDSMCOMMMPI * dsmComm)
1065 {
1066 try
1067 {
1068 return ((XdmfDSMCommMPI *)dsmComm)->GetInterId();
1069 }
1070 catch (...)
1071 {
1072 return ((XdmfDSMCommMPI *)dsmComm)->GetInterId();
1073 }
1074 }
1075
XdmfDSMCommMPIGetInterSize(XDMFDSMCOMMMPI * dsmComm)1076 int XdmfDSMCommMPIGetInterSize(XDMFDSMCOMMMPI * dsmComm)
1077 {
1078 try
1079 {
1080 return ((XdmfDSMCommMPI *)dsmComm)->GetInterSize();
1081 }
1082 catch (...)
1083 {
1084 return ((XdmfDSMCommMPI *)dsmComm)->GetInterSize();
1085 }
1086 }
1087
XdmfDSMCommMPIGetIntraComm(XDMFDSMCOMMMPI * dsmComm)1088 MPI_Comm XdmfDSMCommMPIGetIntraComm(XDMFDSMCOMMMPI * dsmComm)
1089 {
1090 try
1091 {
1092 return ((XdmfDSMCommMPI *)dsmComm)->GetIntraComm();
1093 }
1094 catch (...)
1095 {
1096 return ((XdmfDSMCommMPI *)dsmComm)->GetIntraComm();
1097 }
1098 }
1099
XdmfDSMCommMPIGetIntraSize(XDMFDSMCOMMMPI * dsmComm)1100 int XdmfDSMCommMPIGetIntraSize(XDMFDSMCOMMMPI * dsmComm)
1101 {
1102 try
1103 {
1104 return ((XdmfDSMCommMPI *)dsmComm)->GetIntraSize();
1105 }
1106 catch (...)
1107 {
1108 return ((XdmfDSMCommMPI *)dsmComm)->GetIntraSize();
1109 }
1110 }
1111
XdmfDSMCommMPIGetUseEnvFileName(XDMFDSMCOMMMPI * dsmComm)1112 int XdmfDSMCommMPIGetUseEnvFileName(XDMFDSMCOMMMPI * dsmComm)
1113 {
1114 try
1115 {
1116 return ((XdmfDSMCommMPI *)dsmComm)->GetUseEnvFileName();
1117 }
1118 catch (...)
1119 {
1120 return ((XdmfDSMCommMPI *)dsmComm)->GetUseEnvFileName();
1121 }
1122 }
1123
XdmfDSMCommMPIInit(XDMFDSMCOMMMPI * dsmComm,int * status)1124 void XdmfDSMCommMPIInit(XDMFDSMCOMMMPI * dsmComm, int * status)
1125 {
1126 XDMF_ERROR_WRAP_START(status)
1127 ((XdmfDSMCommMPI *)dsmComm)->Init();
1128 XDMF_ERROR_WRAP_END(status)
1129 }
1130
XdmfDSMCommMPIOpenPort(XDMFDSMCOMMMPI * dsmComm,int * status)1131 void XdmfDSMCommMPIOpenPort(XDMFDSMCOMMMPI * dsmComm, int * status)
1132 {
1133 XDMF_ERROR_WRAP_START(status)
1134 ((XdmfDSMCommMPI *)dsmComm)->OpenPort();
1135 XDMF_ERROR_WRAP_END(status)
1136 }
1137
XdmfDSMCommMPIReadDsmPortName(XDMFDSMCOMMMPI * dsmComm)1138 void XdmfDSMCommMPIReadDsmPortName(XDMFDSMCOMMMPI * dsmComm)
1139 {
1140 try
1141 {
1142 ((XdmfDSMCommMPI *)dsmComm)->ReadDsmPortName();
1143 }
1144 catch (...)
1145 {
1146 ((XdmfDSMCommMPI *)dsmComm)->ReadDsmPortName();
1147 }
1148 }
1149
XdmfDSMCommMPISetApplicationName(XDMFDSMCOMMMPI * dsmComm,char * newName)1150 void XdmfDSMCommMPISetApplicationName(XDMFDSMCOMMMPI * dsmComm, char * newName)
1151 {
1152 try
1153 {
1154 ((XdmfDSMCommMPI *)dsmComm)->SetApplicationName(std::string(newName));
1155 }
1156 catch (...)
1157 {
1158 ((XdmfDSMCommMPI *)dsmComm)->SetApplicationName(std::string(newName));
1159 }
1160 }
1161
XdmfDSMCommMPISetDsmFileName(XDMFDSMCOMMMPI * dsmComm,char * filename)1162 void XdmfDSMCommMPISetDsmFileName(XDMFDSMCOMMMPI * dsmComm, char * filename)
1163 {
1164 try
1165 {
1166 ((XdmfDSMCommMPI *)dsmComm)->SetDsmFileName(std::string(filename));
1167 }
1168 catch (...)
1169 {
1170 ((XdmfDSMCommMPI *)dsmComm)->SetDsmFileName(std::string(filename));
1171 }
1172 }
1173
XdmfDSMCommMPISetDsmPortName(XDMFDSMCOMMMPI * dsmComm,char * hostName)1174 void XdmfDSMCommMPISetDsmPortName(XDMFDSMCOMMMPI * dsmComm, char * hostName)
1175 {
1176 try
1177 {
1178 ((XdmfDSMCommMPI *)dsmComm)->SetDsmPortName(hostName);
1179 }
1180 catch (...)
1181 {
1182 ((XdmfDSMCommMPI *)dsmComm)->SetDsmPortName(hostName);
1183 }
1184 }
1185
XdmfDSMCommMPISetUseEnvFileName(XDMFDSMCOMMMPI * dsmComm,int status)1186 void XdmfDSMCommMPISetUseEnvFileName(XDMFDSMCOMMMPI * dsmComm, int status)
1187 {
1188 try
1189 {
1190 ((XdmfDSMCommMPI *)dsmComm)->SetUseEnvFileName(status);
1191 }
1192 catch (...)
1193 {
1194 ((XdmfDSMCommMPI *)dsmComm)->SetUseEnvFileName(status);
1195 }
1196 }
1197