1 /*****************************************************************************/
2 /*                                    XDMF                                   */
3 /*                       eXtensible Data Model and Format                    */
4 /*                                                                           */
5 /*  Id : XdmfDSMCommMPI.cpp                                                  */
6 /*                                                                           */
7 /*  Author:                                                                  */
8 /*     Andrew Burns                                                          */
9 /*     andrew.j.burns2@us.army.mil                                           */
10 /*     US Army Research Laboratory                                           */
11 /*     Aberdeen Proving Ground, MD                                           */
12 /*                                                                           */
13 /*     Copyright @ 2013 US Army Research Laboratory                          */
14 /*     All Rights Reserved                                                   */
15 /*     See Copyright.txt for details                                         */
16 /*                                                                           */
17 /*     This software is distributed WITHOUT ANY WARRANTY; without            */
18 /*     even the implied warranty of MERCHANTABILITY or FITNESS               */
19 /*     FOR A PARTICULAR PURPOSE.  See the above copyright notice             */
20 /*     for more information.                                                 */
21 /*                                                                           */
22 /*****************************************************************************/
23 
24 /*=========================================================================
25   This code is derived from an earlier work and is distributed
26   with permission from, and thanks to ...
27 =========================================================================*/
28 
29 /*============================================================================
30 
31   Project                 : H5FDdsm
32   Module                  : H5FDdsmCommMpi.cxx
33 
34   Authors:
35      John Biddiscombe     Jerome Soumagne
36      biddisco@cscs.ch     soumagne@cscs.ch
37 
38   Copyright (C) CSCS - Swiss National Supercomputing Centre.
39   You may use modify and and distribute this code freely providing
40   1) This copyright notice appears on all copies of source code
41   2) An acknowledgment appears with any substantial usage of the code
42   3) If this code is contributed to any other open source project, it
43   must not be reformatted such that the indentation, bracketing or
44   overall style is modified significantly.
45 
46   This software is distributed WITHOUT ANY WARRANTY; without even the
47   implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
48 
49   This work has received funding from the European Community's Seventh
50   Framework Programme (FP7/2007-2013) under grant agreement 225967 âxtMuSEâOC
51 
52 ============================================================================*/
53 
54 #include <XdmfDSMCommMPI.hpp>
55 #include <XdmfError.hpp>
56 #include <mpi.h>
57 #include <string.h>
58 #include <cstdlib>
59 #include <fstream>
60 #include <iostream>
61 
62 bool XdmfDSMCommMPI::UseEnvFileName = false;
63 
XdmfDSMCommMPI()64 XdmfDSMCommMPI::XdmfDSMCommMPI()
65 {
66   IntraComm = MPI_COMM_NULL;
67   Id = -1;
68   IntraSize = -1;
69   InterComm = MPI_COMM_NULL;
70   InterId = -1;
71   InterSize = -1;
72   SetDsmPortName("");
73   // This is the default file name for the config file.
74   DsmFileName = "dsmconnect.cfg";
75   if (XdmfDSMCommMPI::UseEnvFileName)
76   {
77     // Grab from ENV
78     if (std::getenv("XDMFDSM_CONFIG_FILE") != NULL)
79     {
80       DsmFileName = std::getenv("XDMFDSM_CONFIG_FILE");
81     }
82   }
83   InterCommType = XDMF_DSM_COMM_MPI;
84   HasOpenedPort = false;
85   ApplicationName = "Application";
86 }
87 
~XdmfDSMCommMPI()88 XdmfDSMCommMPI::~XdmfDSMCommMPI()
89 {
90 #ifndef OPEN_MPI
91   if (InterComm != MPI_COMM_NULL) {
92     int status = MPI_Comm_free(&InterComm);
93     if (status != MPI_SUCCESS) {
94       try {
95         XdmfError::message(XdmfError::FATAL, "Failed to free intercomm Comm");
96       }
97       catch (XdmfError & e) {
98         throw e;
99       }
100     }
101   }
102   if (IntraComm != MPI_COMM_NULL) {
103     int status = MPI_Comm_free(&IntraComm);
104     if (status != MPI_SUCCESS) {
105       try {
106         XdmfError::message(XdmfError::FATAL, "Failed to free intercomm Comm");
107       }
108       catch (XdmfError & e) {
109         throw e;
110       }
111     }
112   }
113 #endif
114 }
115 
116 void
Accept(unsigned int numConnections)117 XdmfDSMCommMPI::Accept(unsigned int numConnections)
118 {
119 #ifndef XDMF_DSM_IS_CRAY
120   int status;
121   int acceptingLeadId;
122   while (numConnections > 0) {
123     if (InterComm == MPI_COMM_NULL) {
124       acceptingLeadId = this->IntraSize;
125       // If there is no InterComm, then accept from IntraComm and merge into InterComm
126       MPI_Comm tempComm;
127       int * portCheck = new int[GetInterSize()]();
128       int portStatus;
129       portStatus = 0;
130       if (HasOpenedPort) {
131         portStatus = 1;
132       }
133       MPI_Allgather(&portStatus, 1, MPI_INT, &(portCheck[0]), 1, MPI_INT, InterComm);
134       int index = 0;
135       for (index = 0; index < GetInterSize(); ++index) {
136         if (portCheck[index] == 1) {
137           break;
138         }
139       }
140       int status = MPI_Comm_accept(DsmPortName, MPI_INFO_NULL, index, IntraComm, &tempComm);
141       if (status != MPI_SUCCESS) {
142         try {
143           std::string message = "Failed to accept port ";
144           message = message + DsmPortName;
145           XdmfError::message(XdmfError::FATAL, message);
146         }
147         catch (XdmfError & e) {
148           throw e;
149         }
150       }
151       // False is specified for high so that the index of the cores doesn't change
152       MPI_Comm mergedComm;
153       status = MPI_Intercomm_merge(tempComm, false, &mergedComm);
154       this->DupInterComm(mergedComm);
155       if (status != MPI_SUCCESS) {
156         try {
157           XdmfError::message(XdmfError::FATAL, "Failed to merge intercomm");
158         }
159         catch (XdmfError & e) {
160           throw e;
161         }
162       }
163       else {
164         MPI_Comm_rank(InterComm, &InterId);
165         MPI_Comm_size(InterComm, &InterSize);
166       }
167     }
168     else {
169       acceptingLeadId = this->InterSize;
170       // If there is an InterComm, accept into the InterComm and merge
171       MPI_Comm tempComm;
172       int * portCheck = new int[GetInterSize()]();
173       int portStatus;
174       portStatus = 0;
175       if (HasOpenedPort) {
176         portStatus = 1;
177       }
178       MPI_Allgather(&portStatus, 1, MPI_INT, &(portCheck[0]), 1, MPI_INT, InterComm);
179       int index = 0;
180       for (index = 0; index < GetInterSize(); ++index) {
181         if (portCheck[index] == 1) {
182           break;
183         }
184       }
185       int status = MPI_Comm_accept(DsmPortName, MPI_INFO_NULL, index, InterComm, &tempComm);
186       if (status != MPI_SUCCESS) {
187         try {
188           std::string message = "Failed to accept port ";
189           message = message + DsmPortName;
190           XdmfError::message(XdmfError::FATAL, message);
191         }
192         catch (XdmfError & e) {
193           throw e;
194         }
195       }
196       // False is specified for high so that the index of the cores doesn't change
197       MPI_Comm mergedComm;
198       status = MPI_Intercomm_merge(tempComm, false, &mergedComm);
199       this->DupInterComm(mergedComm);
200       if (status != MPI_SUCCESS) {
201         try {
202           XdmfError::message(XdmfError::FATAL, "Failed to merge InterComm");
203         }
204         catch (XdmfError & e) {
205           throw e;
206         }
207       }
208       else {
209         MPI_Comm_rank(InterComm, &InterId);
210         MPI_Comm_size(InterComm, &InterSize);
211       }
212     }
213 
214     //regen Intra comm from Inter Comm
215     MPI_Group IntraGroup, InterGroup;
216     MPI_Comm_group(InterComm, &InterGroup);
217     int * ServerIds = (int *)calloc(this->IntraSize, sizeof(int));
218     unsigned int index = 0;
219     for(int i=this->InterId - this->Id; i < this->InterId - this->Id + this->IntraSize; ++i)
220     {
221       ServerIds[index++] = i;
222     }
223     MPI_Group_incl(InterGroup, this->IntraSize, ServerIds, &IntraGroup);
224     MPI_Comm_create(InterComm, IntraGroup, &IntraComm);
225     cfree(ServerIds);
226 
227     // Since this is accept, we will be recieving the local data from the new core and
228     // sending the overarching data to the connecting cores
229     unsigned int length;
230     char * appname;
231     unsigned int appsize;
232     if (DsmProcessStructure.size() == 0)
233     {
234       DsmProcessStructure.push_back(std::pair<std::string, unsigned int>(this->ApplicationName, this->IntraSize));
235     }
236     int numSections = DsmProcessStructure.size();
237     if (InterId == 0)
238     {
239       // Loop in the applicaiton names of the already existing sections of the DSM
240       // Get the number of application sections
241       MPI_Send(&numSections, 1, MPI_INT, acceptingLeadId, XDMF_DSM_EXCHANGE_TAG, this->InterComm);
242       for (unsigned int i = 0; i < numSections; ++i)
243       {
244         length = DsmProcessStructure[i].first.size();
245         // Get the length of the name
246         MPI_Send(&length, 1, MPI_UNSIGNED, acceptingLeadId, XDMF_DSM_EXCHANGE_TAG, InterComm);
247         // Get the string of characters
248         appname = new char[length]();
249         strcpy(appname, DsmProcessStructure[i].first.c_str());
250         MPI_Send(appname, length, MPI_CHAR, acceptingLeadId, XDMF_DSM_EXCHANGE_TAG, InterComm);
251         delete appname;
252         // Get associated numprocs
253         appsize = DsmProcessStructure[i].second;
254         MPI_Send(&appsize, 1, MPI_UNSIGNED, acceptingLeadId, XDMF_DSM_EXCHANGE_TAG, InterComm);
255       }
256     }
257     // Add the information for the newly added application
258     // For each application in the connecting set.
259     MPI_Bcast(&numSections, 1, MPI_UNSIGNED, acceptingLeadId, InterComm);
260     for (unsigned int i = 0; i < numSections; ++i)
261     {
262       MPI_Bcast(&length, 1, MPI_UNSIGNED, acceptingLeadId, InterComm);
263       appname = new char[length+1]();
264       MPI_Bcast(appname, length, MPI_CHAR, acceptingLeadId, InterComm);
265       appname[length] = 0;
266       MPI_Bcast(&appsize, 1, MPI_UNSIGNED, acceptingLeadId, InterComm);
267       DsmProcessStructure.push_back(std::pair<std::string, unsigned int>(std::string(appname), appsize));
268     }
269     --numConnections;
270     MPI_Bcast(&numConnections, 1, MPI_INT, 0, InterComm);
271   }
272 #endif
273 }
274 
275 void
AllGather(void * sendbuf,int sendbytes,void * recvbuf,int recvbytes,int comm)276 XdmfDSMCommMPI::AllGather(void *sendbuf,
277                           int sendbytes,
278                           void *recvbuf,
279                           int recvbytes,
280                           int comm)
281 {
282   if (comm == XDMF_DSM_INTRA_COMM)
283   {
284     MPI_Allgather(sendbuf,
285                   sendbytes,
286                   MPI_UNSIGNED_CHAR,
287                   recvbuf,
288                   recvbytes,
289                   MPI_UNSIGNED_CHAR,
290                   IntraComm);
291   }
292   else if (comm == XDMF_DSM_INTER_COMM)
293   {
294     MPI_Allgather(sendbuf,
295                   sendbytes,
296                   MPI_UNSIGNED_CHAR,
297                   recvbuf,
298                   recvbytes,
299                   MPI_UNSIGNED_CHAR,
300                   InterComm);
301   }
302 }
303 
304 void
Barrier(int comm)305 XdmfDSMCommMPI::Barrier(int comm)
306 {
307   int status;
308 
309   if (comm == XDMF_DSM_INTRA_COMM)
310   {
311     status = MPI_Barrier(IntraComm);
312   }
313   else if (comm == XDMF_DSM_INTER_COMM)
314   {
315     status = MPI_Barrier(InterComm);
316   }
317 }
318 
319 void
Broadcast(void * pointer,int sizebytes,int root,int comm)320 XdmfDSMCommMPI::Broadcast(void * pointer,
321                           int sizebytes,
322                           int root,
323                           int comm)
324 {
325   int status;
326 
327   if (comm == XDMF_DSM_INTRA_COMM)
328   {
329     status = MPI_Bcast(pointer, sizebytes, MPI_UNSIGNED_CHAR, root, IntraComm);
330   }
331   else if (comm == XDMF_DSM_INTER_COMM)
332   {
333     status = MPI_Bcast(pointer, sizebytes, MPI_UNSIGNED_CHAR, root, InterComm);
334   }
335 }
336 
337 void
ClosePort()338 XdmfDSMCommMPI::ClosePort()
339 {
340 #ifndef XDMF_DSM_IS_CRAY
341   if (Id == 0) {
342     int status;
343     for (unsigned int i = 0; i < PreviousDsmPortNames.size(); ++i)
344     {
345       status = MPI_Close_port(PreviousDsmPortNames[i]);
346       if (status != MPI_SUCCESS) {
347         try {// OpenMPI iterate through open ports in order to close the multiple needed
348           std::string message = "Failed to close port ";
349           message = message + PreviousDsmPortNames[i];
350           XdmfError::message(XdmfError::FATAL, message);
351         }
352         catch (XdmfError & e) {
353           throw e;
354         }
355       }
356     }
357   }
358 #endif
359   HasOpenedPort = false;
360 }
361 
362 int
Connect()363 XdmfDSMCommMPI::Connect()
364 {
365 #ifndef XDMF_DSM_IS_CRAY
366   int status;
367   MPI_Status mpistatus;
368   if (InterComm == MPI_COMM_NULL) {
369     this->DupInterComm(IntraComm);
370   }
371   MPI_Comm tempComm;
372   MPI_Comm tempConnectComm;
373   MPI_Comm_dup(InterComm, &tempConnectComm);
374   MPI_Errhandler_set(InterComm, MPI_ERRORS_RETURN);
375   status = MPI_Comm_connect(DsmPortName, MPI_INFO_NULL, 0, tempConnectComm, &tempComm);
376   MPI_Errhandler_set(InterComm, MPI_ERRORS_ARE_FATAL);
377   if (status != MPI_SUCCESS) {
378     try {
379       std::string message = "Failed to connect to port ";
380       message = message + DsmPortName;
381       XdmfError::message(XdmfError::FATAL, message);
382     }
383     catch (XdmfError & e) {
384       throw e;
385     }
386   }
387   MPI_Comm mergedComm;
388   status = MPI_Intercomm_merge(tempComm, true, &mergedComm);
389   this->DupInterComm(mergedComm);
390   if (status != MPI_SUCCESS) {
391     try {
392       XdmfError::message(XdmfError::FATAL, "Failed to merge InterComm");
393     }
394     catch (XdmfError & e) {
395       throw e;
396     }
397   }
398   else {
399     status = MPI_Comm_rank(InterComm, &InterId);
400     status = MPI_Comm_size(InterComm, &InterSize);
401   }
402 
403   //regen Intra comm from Inter Comm
404   MPI_Group IntraGroup, InterGroup;
405   MPI_Comm_group(InterComm, &InterGroup);
406   int * ServerIds = (int *)calloc(this->IntraSize, sizeof(int));
407   unsigned int index = 0;
408   for(int i=this->InterId - this->Id; i < this->InterId - this->Id + this->IntraSize; ++i)
409   {
410     ServerIds[index++] = i;
411   }
412   MPI_Group_incl(InterGroup, this->IntraSize, ServerIds, &IntraGroup);
413   MPI_Comm_create(InterComm, IntraGroup, &IntraComm);
414   cfree(ServerIds);
415 
416   // Here the process will send information about itself to the server and
417   // the server will diseminate that info across all the connected processes
418   std::vector<std::pair<std::string, unsigned int> > structureArchive;
419   // Archive old structure if it exists
420   if (DsmProcessStructure.size() > 0)
421   {
422     for (unsigned int i = 0; i < DsmProcessStructure.size(); ++i)
423     {
424       structureArchive.push_back(DsmProcessStructure[i]);
425     }
426     DsmProcessStructure.clear();
427   }
428   // Loop in the applicaiton names of the already existing sections of the DSM
429   // Get the number of application sections
430   int numSections;
431   if (this->Id == 0)
432   {
433     MPI_Recv(&numSections, 1, MPI_INT, 0, XDMF_DSM_EXCHANGE_TAG, this->InterComm, &mpistatus);
434   }
435   MPI_Bcast(&numSections, 1, MPI_INT, 0, IntraComm);
436   unsigned int length;
437   char * appname;
438   unsigned int appsize;
439   for (unsigned int i = 0; i < numSections; ++i)
440   {
441     if (this->Id == 0)
442     {
443       // Get the length of the name
444       MPI_Recv(&length, 1, MPI_UNSIGNED, 0, XDMF_DSM_EXCHANGE_TAG, InterComm, &mpistatus);
445       // Get the string of characters
446       appname = new char[length+1]();
447       MPI_Recv(appname, length, MPI_CHAR, 0, XDMF_DSM_EXCHANGE_TAG, InterComm, &mpistatus);
448       appname[length] = 0;
449       // Get associated numprocs
450       MPI_Recv(&appsize, 1, MPI_UNSIGNED, 0, XDMF_DSM_EXCHANGE_TAG, InterComm, &mpistatus);
451     }
452     // Broadcast to local comm
453     MPI_Bcast(&length, 1, MPI_UNSIGNED, 0, IntraComm);
454     if (this->Id != 0)
455     {
456       appname = new char[length+1]();
457     }
458     MPI_Bcast(appname, length+1, MPI_CHAR, 0, IntraComm);
459     MPI_Bcast(&appsize, 1, MPI_UNSIGNED, 0, IntraComm);
460 
461     DsmProcessStructure.push_back(std::pair<std::string, unsigned int>(std::string(appname), appsize));
462   }
463   if (structureArchive.size() == 0)
464   {
465     numSections = 1;
466     MPI_Bcast(&numSections, 1, MPI_INT, InterId-Id, InterComm);
467     length = ApplicationName.size();
468     appsize = this->IntraSize;
469     MPI_Bcast(&length, 1, MPI_UNSIGNED, InterId-Id, InterComm);
470     appname = new char[length]();
471     strcpy(appname, ApplicationName.c_str());
472     MPI_Bcast(appname, length, MPI_CHAR, InterId-Id, InterComm);
473     delete appname;
474     MPI_Bcast(&appsize, 1, MPI_UNSIGNED, InterId-Id, InterComm);
475     DsmProcessStructure.push_back(std::pair<std::string, unsigned int>(ApplicationName, appsize));
476   }
477   else
478   {
479     numSections = structureArchive.size();
480     MPI_Bcast(&numSections, 1, MPI_UNSIGNED, InterId-Id, InterComm);
481     for (unsigned int i = 0; i < numSections; ++i)
482     {
483       length = structureArchive[i].first.size();
484       appsize = structureArchive[i].second;
485       MPI_Bcast(&length, 1, MPI_UNSIGNED, InterId-Id, InterComm);
486       appname = new char[length]();
487       strcpy(appname, structureArchive[i].first.c_str());
488       MPI_Bcast(appname, length, MPI_CHAR, InterId-Id, InterComm);
489       delete appname;
490       MPI_Bcast(&appsize, 1, MPI_UNSIGNED, InterId-Id, InterComm);
491       DsmProcessStructure.push_back(std::pair<std::string, unsigned int>(structureArchive[i].first, appsize));
492     }
493   }
494   int numAccepts = 0;
495   MPI_Bcast(&numAccepts, 1, MPI_INT, 0, InterComm);
496 #ifdef OPEN_MPI
497     if (numAccepts > 0) {
498       MPI_Bcast(DsmPortName, MPI_MAX_PORT_NAME, MPI_CHAR, 0, InterComm);
499     }
500 #endif
501   Accept(numAccepts);
502   return MPI_SUCCESS;
503 #endif
504   return MPI_SUCCESS;
505 }
506 
507 void
Disconnect()508 XdmfDSMCommMPI::Disconnect()
509 {
510 #ifndef XDMF_DSM_IS_CRAY
511 #ifndef OPEN_MPI
512   if (InterComm != MPI_COMM_NULL) {
513     int status = MPI_Comm_free(&InterComm);
514     if (status != MPI_SUCCESS) {
515       try {
516         XdmfError::message(XdmfError::FATAL, "Failed to disconnect Comm");
517       }
518       catch (XdmfError & e) {
519         throw e;
520       }
521     }
522   }
523 #endif
524 #endif
525   InterComm = MPI_COMM_NULL;
526 }
527 
528 void
DupComm(MPI_Comm comm)529 XdmfDSMCommMPI::DupComm(MPI_Comm comm)
530 {
531   if (IntraComm != comm) {
532     int status;
533 #ifndef OPEN_MPI
534     if (IntraComm != MPI_COMM_NULL) {
535       status = MPI_Comm_free(&IntraComm);
536       if (status != MPI_SUCCESS) {
537         try {
538           XdmfError::message(XdmfError::FATAL, "Failed to disconnect Comm");
539         }
540         catch (XdmfError & e) {
541           throw e;
542         }
543       }
544     }
545 #endif
546     if (comm != MPI_COMM_NULL) {
547       status = MPI_Comm_dup(comm, &IntraComm);
548       if (status != MPI_SUCCESS) {
549         try {
550           XdmfError::message(XdmfError::FATAL, "Failed to duplicate Comm");
551         }
552         catch (XdmfError & e) {
553           throw e;
554         }
555       }
556       else {
557         status = MPI_Comm_size(IntraComm, &IntraSize);
558         status = MPI_Comm_rank(IntraComm, &Id);
559       }
560     }
561   }
562 }
563 
564 void
DupInterComm(MPI_Comm comm)565 XdmfDSMCommMPI::DupInterComm(MPI_Comm comm)
566 {
567   if (InterComm != comm) {
568     int status;
569 #ifndef OPEN_MPI
570     if (InterComm != MPI_COMM_NULL) {
571       status = MPI_Comm_free(&InterComm);
572       if (status != MPI_SUCCESS) {
573         try {
574           XdmfError::message(XdmfError::FATAL, "Failed to disconnect Comm");
575         }
576         catch (XdmfError & e) {
577           throw e;
578         }
579       }
580     }
581 #endif
582     if (comm != MPI_COMM_NULL) {
583       status = MPI_Comm_dup(comm, &InterComm);
584       if (status != MPI_SUCCESS) {
585         try {
586           XdmfError::message(XdmfError::FATAL, "Failed to duplicate Comm");
587         }
588         catch (XdmfError & e) {
589           throw e;
590         }
591       }
592       else {
593         status = MPI_Comm_rank(InterComm, &InterId);
594         status = MPI_Comm_size(InterComm, &InterSize);
595       }
596     }
597     else {
598       InterId = -1;
599       InterSize = -1;
600     }
601   }
602 }
603 
604 std::string
GetApplicationName()605 XdmfDSMCommMPI::GetApplicationName()
606 {
607   return ApplicationName;
608 }
609 
610 std::string
GetDsmFileName()611 XdmfDSMCommMPI::GetDsmFileName()
612 {
613   return DsmFileName;
614 }
615 
616 char *
GetDsmPortName()617 XdmfDSMCommMPI::GetDsmPortName()
618 {
619   return DsmPortName;
620 }
621 
622 std::vector<std::pair<std::string, unsigned int> >
GetDsmProcessStructure()623 XdmfDSMCommMPI::GetDsmProcessStructure()
624 {
625   return DsmProcessStructure;
626 }
627 
628 int
GetId()629 XdmfDSMCommMPI::GetId()
630 {
631   return this->Id;
632 }
633 
634 MPI_Comm
GetInterComm()635 XdmfDSMCommMPI::GetInterComm()
636 {
637   return InterComm;
638 }
639 
640 int
GetInterCommType()641 XdmfDSMCommMPI::GetInterCommType()
642 {
643   return this->InterCommType;
644 }
645 
646 int
GetInterId()647 XdmfDSMCommMPI::GetInterId()
648 {
649   return this->InterId;
650 }
651 
652 int
GetInterSize()653 XdmfDSMCommMPI::GetInterSize()
654 {
655   return this->InterSize;
656 }
657 
658 MPI_Comm
GetIntraComm()659 XdmfDSMCommMPI::GetIntraComm()
660 {
661   return IntraComm;
662 }
663 
664 int
GetIntraSize()665 XdmfDSMCommMPI::GetIntraSize()
666 {
667   return this->IntraSize;
668 }
669 
670 bool
GetUseEnvFileName()671 XdmfDSMCommMPI::GetUseEnvFileName()
672 {
673   return XdmfDSMCommMPI::UseEnvFileName;
674 }
675 
676 void
Init()677 XdmfDSMCommMPI::Init()
678 {
679   int size, rank;
680   if (MPI_Comm_size(this->IntraComm, &size) != MPI_SUCCESS) {
681     try {
682       XdmfError::message(XdmfError::FATAL, "Failed to initialize size");
683     }
684     catch (XdmfError & e) {
685       throw e;
686     }
687   }
688   if (MPI_Comm_rank(this->IntraComm, &rank) != MPI_SUCCESS) {
689     try {
690       XdmfError::message(XdmfError::FATAL, "Failed to initialize rank");
691     }
692     catch (XdmfError & e) {
693       throw e;
694     }
695   }
696 
697   this->Id = rank;
698   this->IntraSize = size;
699 }
700 
701 void
OpenPort()702 XdmfDSMCommMPI::OpenPort()
703 {
704   if (Id == 0) {
705 #ifndef XDMF_DSM_IS_CRAY
706     int status = MPI_Open_port(MPI_INFO_NULL, DsmPortName);
707     if (status != MPI_SUCCESS) {
708       try {
709         std::string message = "Failed to open port ";
710         message = message + DsmPortName;
711         XdmfError::message(XdmfError::FATAL, message);
712       }
713       catch (XdmfError & e) {
714         throw e;
715       }
716     }
717     PreviousDsmPortNames.push_back(DsmPortName);
718 #endif
719     std::ofstream connectFile (DsmFileName.c_str());
720     if (connectFile.is_open()) {
721       connectFile << DsmPortName;
722       connectFile.close();
723     }
724     else {
725       try {
726         XdmfError::message(XdmfError::FATAL, "Failed to write port to file");
727       }
728       catch (XdmfError & e) {
729         throw e;
730       }
731     }
732     HasOpenedPort = true;
733   }
734 #ifndef XDMF_DSM_IS_CRAY
735   MPI_Bcast(DsmPortName, MPI_MAX_PORT_NAME, MPI_CHAR, 0, IntraComm);
736 #endif
737 }
738 
739 void
Probe(int * comm)740 XdmfDSMCommMPI::Probe(int *comm)
741 {
742   // Used for finding a comm that has a waiting command, then sets the comm
743   int status = XDMF_DSM_FAIL;
744   MPI_Status signalStatus;
745 
746   int flag;
747   MPI_Comm probeComm =
748     this->GetIntraComm();
749 
750   // Spin until a message is found on one of the communicators
751   while (status != XDMF_DSM_SUCCESS) {
752     status = MPI_Iprobe(XDMF_DSM_ANY_SOURCE,
753                         XDMF_DSM_ANY_TAG,
754                         probeComm,
755                         &flag,
756                         &signalStatus);
757     if (status != MPI_SUCCESS)
758     {
759        try {
760          XdmfError::message(XdmfError::FATAL,
761                             "Error: Failed to probe for command header");
762        }
763        catch (XdmfError & e) {
764          throw e;
765        }
766     }
767     if (flag) {
768       status = XDMF_DSM_SUCCESS;
769     }
770     else {
771       if (this->GetInterComm() != MPI_COMM_NULL) {
772         if (probeComm == this->GetIntraComm()) {
773           probeComm = this->GetInterComm();
774         }
775         else {
776           probeComm = this->GetIntraComm();
777         }
778       }
779     }
780   }
781   if (probeComm == this->GetInterComm()) {
782     *comm = XDMF_DSM_INTER_COMM;
783   }
784   else
785   {
786     *comm = XDMF_DSM_INTRA_COMM;
787   }
788 
789   probeComm = MPI_COMM_NULL;
790 }
791 
792 void
ReadDsmPortName()793 XdmfDSMCommMPI::ReadDsmPortName()
794 {
795 #ifndef XDMF_DSM_IS_CRAY
796   std::ifstream connectFile(DsmFileName.c_str());
797   std::string connectLine;
798   if (connectFile.is_open()) {
799     getline(connectFile, connectLine);
800   }
801   strcpy(DsmPortName, connectLine.c_str());
802 #endif
803 }
804 
805 void
Send(void * pointer,int sizebytes,int coreTo,int comm,int tag)806 XdmfDSMCommMPI::Send(void * pointer,
807                      int sizebytes,
808                      int coreTo,
809                      int comm,
810                      int tag)
811 {
812   int status;
813   if (comm == XDMF_DSM_INTRA_COMM) {
814     status = MPI_Send(pointer,
815                       sizebytes,
816                       MPI_UNSIGNED_CHAR,
817                       coreTo,
818                       tag,
819                       IntraComm);
820   }
821   else if (comm == XDMF_DSM_INTER_COMM) {
822     status = MPI_Send(pointer,
823                       sizebytes,
824                       MPI_UNSIGNED_CHAR,
825                       coreTo,
826                       tag,
827                       InterComm);
828   }
829 }
830 
831 void
Receive(void * pointer,int sizebytes,int coreFrom,int comm,int tag)832 XdmfDSMCommMPI::Receive(void * pointer,
833                         int sizebytes,
834                         int coreFrom,
835                         int comm,
836                         int tag)
837 {
838   int status;
839   MPI_Status signalStatus;
840   if (comm == XDMF_DSM_INTRA_COMM) {
841     status = MPI_Recv(pointer,
842                       sizebytes,
843                       MPI_UNSIGNED_CHAR,
844                       coreFrom,
845                       tag,
846                       IntraComm,
847                       &signalStatus);
848   }
849   else if (comm == XDMF_DSM_INTER_COMM) {
850     status = MPI_Recv(pointer,
851                       sizebytes,
852                       MPI_UNSIGNED_CHAR,
853                       coreFrom,
854                       tag,
855                       InterComm,
856                       &signalStatus);
857   }
858 }
859 
860 void
SetApplicationName(std::string newName)861 XdmfDSMCommMPI::SetApplicationName(std::string newName)
862 {
863   ApplicationName = newName;
864 }
865 
866 void
SetDsmFileName(std::string filename)867 XdmfDSMCommMPI::SetDsmFileName(std::string filename)
868 {
869   DsmFileName = filename;
870 }
871 
872 void
SetDsmPortName(const char * hostName)873 XdmfDSMCommMPI::SetDsmPortName(const char *hostName)
874 {
875   strcpy(DsmPortName, hostName);
876 }
877 
878 void
SetDsmProcessStructure(std::vector<std::pair<std::string,unsigned int>> & newStructure)879 XdmfDSMCommMPI::SetDsmProcessStructure(std::vector<std::pair<std::string, unsigned int> > & newStructure)
880 {
881   DsmProcessStructure = newStructure;
882 }
883 
884 void
SetUseEnvFileName(bool status)885 XdmfDSMCommMPI::SetUseEnvFileName(bool status)
886 {
887   XdmfDSMCommMPI::UseEnvFileName = status;
888 }
889 
890 // C Wrappers
891 
XdmfDSMCommMPINew()892 XDMFDSMCOMMMPI * XdmfDSMCommMPINew()
893 {
894   try
895   {
896     return (XDMFDSMCOMMMPI *)((void *)(new XdmfDSMCommMPI()));
897   }
898   catch (...)
899   {
900     return (XDMFDSMCOMMMPI *)((void *)(new XdmfDSMCommMPI()));
901   }
902 }
903 
XdmfDSMCommMPIFree(XDMFDSMCOMMMPI * item)904 void XdmfDSMCommMPIFree(XDMFDSMCOMMMPI * item)
905 {
906   if (item != NULL) {
907     delete ((XdmfDSMCommMPI *)item);
908   }
909   item = NULL;
910 }
911 
XdmfDSMCommMPIAccept(XDMFDSMCOMMMPI * dsmComm,unsigned int numConnections,int * status)912 void XdmfDSMCommMPIAccept(XDMFDSMCOMMMPI * dsmComm, unsigned int numConnections, int * status)
913 {
914   XDMF_ERROR_WRAP_START(status)
915   ((XdmfDSMCommMPI *)dsmComm)->Accept(numConnections);
916   XDMF_ERROR_WRAP_END(status)
917 }
918 
XdmfDSMCommMPIClosePort(XDMFDSMCOMMMPI * dsmComm,int * status)919 void XdmfDSMCommMPIClosePort(XDMFDSMCOMMMPI * dsmComm, int * status)
920 {
921   XDMF_ERROR_WRAP_START(status)
922   ((XdmfDSMCommMPI *)dsmComm)->ClosePort();
923   XDMF_ERROR_WRAP_END(status)
924 }
925 
XdmfDSMCommMPIConnect(XDMFDSMCOMMMPI * dsmComm,int * status)926 int XdmfDSMCommMPIConnect(XDMFDSMCOMMMPI * dsmComm, int * status)
927 {
928   XDMF_ERROR_WRAP_START(status)
929   return ((XdmfDSMCommMPI *)dsmComm)->Connect();
930   XDMF_ERROR_WRAP_END(status)
931   return -1;
932 }
933 
XdmfDSMCommMPIDisconnect(XDMFDSMCOMMMPI * dsmComm,int * status)934 void XdmfDSMCommMPIDisconnect(XDMFDSMCOMMMPI * dsmComm, int * status)
935 {
936   XDMF_ERROR_WRAP_START(status)
937   ((XdmfDSMCommMPI *)dsmComm)->Disconnect();
938   XDMF_ERROR_WRAP_END(status)
939 }
940 
XdmfDSMCommMPIDupComm(XDMFDSMCOMMMPI * dsmComm,MPI_Comm comm,int * status)941 void XdmfDSMCommMPIDupComm(XDMFDSMCOMMMPI * dsmComm, MPI_Comm comm, int * status)
942 {
943   XDMF_ERROR_WRAP_START(status)
944   ((XdmfDSMCommMPI *)dsmComm)->DupComm(comm);
945   XDMF_ERROR_WRAP_END(status)
946 }
947 
XdmfDSMCommMPIDupInterComm(XDMFDSMCOMMMPI * dsmComm,MPI_Comm comm,int * status)948 void XdmfDSMCommMPIDupInterComm(XDMFDSMCOMMMPI * dsmComm, MPI_Comm comm, int * status)
949 {
950   XDMF_ERROR_WRAP_START(status)
951   ((XdmfDSMCommMPI *)dsmComm)->DupInterComm(comm);
952   XDMF_ERROR_WRAP_END(status)
953 }
954 
XdmfDSMCommMPIGetApplicationName(XDMFDSMCOMMMPI * dsmComm)955 char * XdmfDSMCommMPIGetApplicationName(XDMFDSMCOMMMPI * dsmComm)
956 {
957   try
958   {
959     char * returnPointer = strdup(((XdmfDSMCommMPI *)dsmComm)->GetApplicationName().c_str());
960     return returnPointer;
961   }
962   catch (...)
963   {
964     char * returnPointer = strdup(((XdmfDSMCommMPI *)dsmComm)->GetApplicationName().c_str());
965     return returnPointer;
966   }
967 }
968 
XdmfDSMCommMPIGetDsmFileName(XDMFDSMCOMMMPI * dsmComm)969 char * XdmfDSMCommMPIGetDsmFileName(XDMFDSMCOMMMPI * dsmComm)
970 {
971   try
972   {
973     char * returnPointer = strdup(((XdmfDSMCommMPI *)dsmComm)->GetDsmFileName().c_str());
974     return returnPointer;
975   }
976   catch (...)
977   {
978     char * returnPointer = strdup(((XdmfDSMCommMPI *)dsmComm)->GetDsmFileName().c_str());
979     return returnPointer;
980   }
981 }
982 
XdmfDSMCommMPIGetDsmPortName(XDMFDSMCOMMMPI * dsmComm)983 char * XdmfDSMCommMPIGetDsmPortName(XDMFDSMCOMMMPI * dsmComm)
984 {
985   try
986   {
987     return ((XdmfDSMCommMPI *)dsmComm)->GetDsmPortName();
988   }
989   catch (...)
990   {
991     return ((XdmfDSMCommMPI *)dsmComm)->GetDsmPortName();
992   }
993 }
994 
XdmfDSMCommMPIGetDsmProcessStructure(XDMFDSMCOMMMPI * dsmComm,char ** names,unsigned int * coreCount,int * numApplications)995 void XdmfDSMCommMPIGetDsmProcessStructure(XDMFDSMCOMMMPI * dsmComm,
996                                           char ** names,
997                                           unsigned int * coreCount,
998                                           int * numApplications)
999 {
1000   try
1001   {
1002     std::vector<std::pair<std::string, unsigned int> > structure =
1003       ((XdmfDSMCommMPI *)dsmComm)->GetDsmProcessStructure();
1004     *numApplications = structure.size();
1005     coreCount = new unsigned int[*numApplications]();
1006     names = new char *[*numApplications]();
1007     for (unsigned int i = 0; i < *numApplications; ++i)
1008     {
1009       coreCount[i] = structure[i].second;
1010       names[i] = strdup(structure[i].first.c_str());
1011     }
1012   }
1013   catch (...)
1014   {
1015     std::vector<std::pair<std::string, unsigned int> > structure =
1016       ((XdmfDSMCommMPI *)dsmComm)->GetDsmProcessStructure();
1017     *numApplications = structure.size();
1018     coreCount = new unsigned int[*numApplications]();
1019     names = new char *[*numApplications]();
1020     for (unsigned int i = 0; i < *numApplications; ++i)
1021     {
1022       coreCount[i] = structure[i].second;
1023       names[i] = strdup(structure[i].first.c_str());
1024     }
1025   }
1026 }
1027 
XdmfDSMCommMPIGetId(XDMFDSMCOMMMPI * dsmComm)1028 int XdmfDSMCommMPIGetId(XDMFDSMCOMMMPI * dsmComm)
1029 {
1030   try
1031   {
1032     return ((XdmfDSMCommMPI *)dsmComm)->GetId();
1033   }
1034   catch (...)
1035   {
1036     return ((XdmfDSMCommMPI *)dsmComm)->GetId();
1037   }
1038 }
1039 
XdmfDSMCommMPIGetInterComm(XDMFDSMCOMMMPI * dsmComm)1040 MPI_Comm XdmfDSMCommMPIGetInterComm(XDMFDSMCOMMMPI * dsmComm)
1041 {
1042   try
1043   {
1044     return ((XdmfDSMCommMPI *)dsmComm)->GetInterComm();
1045   }
1046   catch (...)
1047   {
1048     return ((XdmfDSMCommMPI *)dsmComm)->GetInterComm();
1049   }
1050 }
1051 
XdmfDSMCommMPIGetInterCommType(XDMFDSMCOMMMPI * dsmComm)1052 int XdmfDSMCommMPIGetInterCommType(XDMFDSMCOMMMPI * dsmComm)
1053 {
1054   try
1055   {
1056     return ((XdmfDSMCommMPI *)dsmComm)->GetInterCommType();
1057   }
1058   catch (...)
1059   {
1060     return ((XdmfDSMCommMPI *)dsmComm)->GetInterCommType();
1061   }
1062 }
1063 
XdmfDSMCommMPIGetInterId(XDMFDSMCOMMMPI * dsmComm)1064 int XdmfDSMCommMPIGetInterId(XDMFDSMCOMMMPI * dsmComm)
1065 {
1066   try
1067   {
1068     return ((XdmfDSMCommMPI *)dsmComm)->GetInterId();
1069   }
1070   catch (...)
1071   {
1072     return ((XdmfDSMCommMPI *)dsmComm)->GetInterId();
1073   }
1074 }
1075 
XdmfDSMCommMPIGetInterSize(XDMFDSMCOMMMPI * dsmComm)1076 int XdmfDSMCommMPIGetInterSize(XDMFDSMCOMMMPI * dsmComm)
1077 {
1078   try
1079   {
1080     return ((XdmfDSMCommMPI *)dsmComm)->GetInterSize();
1081   }
1082   catch (...)
1083   {
1084     return ((XdmfDSMCommMPI *)dsmComm)->GetInterSize();
1085   }
1086 }
1087 
XdmfDSMCommMPIGetIntraComm(XDMFDSMCOMMMPI * dsmComm)1088 MPI_Comm XdmfDSMCommMPIGetIntraComm(XDMFDSMCOMMMPI * dsmComm)
1089 {
1090   try
1091   {
1092     return ((XdmfDSMCommMPI *)dsmComm)->GetIntraComm();
1093   }
1094   catch (...)
1095   {
1096     return ((XdmfDSMCommMPI *)dsmComm)->GetIntraComm();
1097   }
1098 }
1099 
XdmfDSMCommMPIGetIntraSize(XDMFDSMCOMMMPI * dsmComm)1100 int XdmfDSMCommMPIGetIntraSize(XDMFDSMCOMMMPI * dsmComm)
1101 {
1102   try
1103   {
1104     return ((XdmfDSMCommMPI *)dsmComm)->GetIntraSize();
1105   }
1106   catch (...)
1107   {
1108     return ((XdmfDSMCommMPI *)dsmComm)->GetIntraSize();
1109   }
1110 }
1111 
XdmfDSMCommMPIGetUseEnvFileName(XDMFDSMCOMMMPI * dsmComm)1112 int XdmfDSMCommMPIGetUseEnvFileName(XDMFDSMCOMMMPI * dsmComm)
1113 {
1114   try
1115   {
1116     return ((XdmfDSMCommMPI *)dsmComm)->GetUseEnvFileName();
1117   }
1118   catch (...)
1119   {
1120     return ((XdmfDSMCommMPI *)dsmComm)->GetUseEnvFileName();
1121   }
1122 }
1123 
XdmfDSMCommMPIInit(XDMFDSMCOMMMPI * dsmComm,int * status)1124 void XdmfDSMCommMPIInit(XDMFDSMCOMMMPI * dsmComm, int * status)
1125 {
1126   XDMF_ERROR_WRAP_START(status)
1127   ((XdmfDSMCommMPI *)dsmComm)->Init();
1128   XDMF_ERROR_WRAP_END(status)
1129 }
1130 
XdmfDSMCommMPIOpenPort(XDMFDSMCOMMMPI * dsmComm,int * status)1131 void XdmfDSMCommMPIOpenPort(XDMFDSMCOMMMPI * dsmComm, int * status)
1132 {
1133   XDMF_ERROR_WRAP_START(status)
1134   ((XdmfDSMCommMPI *)dsmComm)->OpenPort();
1135   XDMF_ERROR_WRAP_END(status)
1136 }
1137 
XdmfDSMCommMPIReadDsmPortName(XDMFDSMCOMMMPI * dsmComm)1138 void XdmfDSMCommMPIReadDsmPortName(XDMFDSMCOMMMPI * dsmComm)
1139 {
1140   try
1141   {
1142     ((XdmfDSMCommMPI *)dsmComm)->ReadDsmPortName();
1143   }
1144   catch (...)
1145   {
1146     ((XdmfDSMCommMPI *)dsmComm)->ReadDsmPortName();
1147   }
1148 }
1149 
XdmfDSMCommMPISetApplicationName(XDMFDSMCOMMMPI * dsmComm,char * newName)1150 void XdmfDSMCommMPISetApplicationName(XDMFDSMCOMMMPI * dsmComm, char * newName)
1151 {
1152   try
1153   {
1154     ((XdmfDSMCommMPI *)dsmComm)->SetApplicationName(std::string(newName));
1155   }
1156   catch (...)
1157   {
1158     ((XdmfDSMCommMPI *)dsmComm)->SetApplicationName(std::string(newName));
1159   }
1160 }
1161 
XdmfDSMCommMPISetDsmFileName(XDMFDSMCOMMMPI * dsmComm,char * filename)1162 void XdmfDSMCommMPISetDsmFileName(XDMFDSMCOMMMPI * dsmComm, char * filename)
1163 {
1164   try
1165   {
1166     ((XdmfDSMCommMPI *)dsmComm)->SetDsmFileName(std::string(filename));
1167   }
1168   catch (...)
1169   {
1170     ((XdmfDSMCommMPI *)dsmComm)->SetDsmFileName(std::string(filename));
1171   }
1172 }
1173 
XdmfDSMCommMPISetDsmPortName(XDMFDSMCOMMMPI * dsmComm,char * hostName)1174 void XdmfDSMCommMPISetDsmPortName(XDMFDSMCOMMMPI * dsmComm, char * hostName)
1175 {
1176   try
1177   {
1178     ((XdmfDSMCommMPI *)dsmComm)->SetDsmPortName(hostName);
1179   }
1180   catch (...)
1181   {
1182     ((XdmfDSMCommMPI *)dsmComm)->SetDsmPortName(hostName);
1183   }
1184 }
1185 
XdmfDSMCommMPISetUseEnvFileName(XDMFDSMCOMMMPI * dsmComm,int status)1186 void XdmfDSMCommMPISetUseEnvFileName(XDMFDSMCOMMMPI * dsmComm, int status)
1187 {
1188   try
1189   {
1190     ((XdmfDSMCommMPI *)dsmComm)->SetUseEnvFileName(status);
1191   }
1192   catch (...)
1193   {
1194     ((XdmfDSMCommMPI *)dsmComm)->SetUseEnvFileName(status);
1195   }
1196 }
1197