1 /*****************************************************************************/
2 /*                                    XDMF                                   */
3 /*                       eXtensible Data Model and Format                    */
4 /*                                                                           */
5 /*  Id : XdmfHDF5WriterDSM.cpp                                               */
6 /*                                                                           */
7 /*  Author:                                                                  */
8 /*     Kenneth Leiter                                                        */
9 /*     kenneth.leiter@arl.army.mil                                           */
10 /*     US Army Research Laboratory                                           */
11 /*     Aberdeen Proving Ground, MD                                           */
12 /*                                                                           */
13 /*     Copyright @ 2011 US Army Research Laboratory                          */
14 /*     All Rights Reserved                                                   */
15 /*     See Copyright.txt for details                                         */
16 /*                                                                           */
17 /*     This software is distributed WITHOUT ANY WARRANTY; without            */
18 /*     even the implied warranty of MERCHANTABILITY or FITNESS               */
19 /*     FOR A PARTICULAR PURPOSE.  See the above copyright notice             */
20 /*     for more information.                                                 */
21 /*                                                                           */
22 /*****************************************************************************/
23 
24 #ifdef XDMF_BUILD_DSM_THREADS
25   #include <H5FDdsm.h>
26   #include <H5FDdsmManager.h>
27   #include <H5FDdsmBuffer.h>
28   #include <H5FDdsmBufferService.h>
29   #include <H5FDdsmComm.h>
30 #endif
31 #include <H5public.h>
32 #include <hdf5.h>
33 #include <XdmfDSMCommMPI.hpp>
34 #include <XdmfDSMBuffer.hpp>
35 #include <XdmfDSMManager.hpp>
36 #include <XdmfDSMDriver.hpp>
37 #include "XdmfHDF5ControllerDSM.hpp"
38 #include "XdmfHDF5WriterDSM.hpp"
39 #include "XdmfError.hpp"
40 
41 #ifdef XDMF_BUILD_DSM_THREADS
42 shared_ptr<XdmfHDF5WriterDSM>
New(const std::string & filePath,H5FDdsmBuffer * const dsmBuffer)43 XdmfHDF5WriterDSM::New(const std::string & filePath,
44                        H5FDdsmBuffer * const dsmBuffer)
45 {
46   shared_ptr<XdmfHDF5WriterDSM> p(new XdmfHDF5WriterDSM(filePath,
47                                                         dsmBuffer));
48   return p;
49 }
50 #endif
51 
52 shared_ptr<XdmfHDF5WriterDSM>
New(const std::string & filePath,XdmfDSMBuffer * const dsmBuffer)53 XdmfHDF5WriterDSM::New(const std::string & filePath,
54                        XdmfDSMBuffer * const dsmBuffer)
55 {
56   shared_ptr<XdmfHDF5WriterDSM> p(new XdmfHDF5WriterDSM(filePath,
57                                                         dsmBuffer));
58   return p;
59 }
60 
61 #ifdef XDMF_BUILD_DSM_THREADS
62 shared_ptr<XdmfHDF5WriterDSM>
New(const std::string & filePath,MPI_Comm comm,unsigned int bufferSize)63 XdmfHDF5WriterDSM::New(const std::string & filePath,
64                        MPI_Comm comm,
65                        unsigned int bufferSize)
66 {
67   shared_ptr<XdmfHDF5WriterDSM> p(new XdmfHDF5WriterDSM(filePath,
68                                                         comm,
69                                                         bufferSize));
70   return p;
71 }
72 #endif
73 
74 shared_ptr<XdmfHDF5WriterDSM>
New(const std::string & filePath,MPI_Comm comm,unsigned int bufferSize,int startCoreIndex,int endCoreIndex)75 XdmfHDF5WriterDSM::New(const std::string & filePath,
76                        MPI_Comm comm,
77                        unsigned int bufferSize,
78                        int startCoreIndex,
79                        int endCoreIndex)
80 {
81   shared_ptr<XdmfHDF5WriterDSM> p(new XdmfHDF5WriterDSM(filePath,
82                                                         comm,
83                                                         bufferSize,
84                                                         startCoreIndex,
85                                                         endCoreIndex));
86   return p;
87 }
88 
89 #ifdef XDMF_BUILD_DSM_THREADS
XdmfHDF5WriterDSM(const std::string & filePath,H5FDdsmBuffer * const dsmBuffer)90 XdmfHDF5WriterDSM::XdmfHDF5WriterDSM(const std::string & filePath,
91                                      H5FDdsmBuffer * const dsmBuffer) :
92   XdmfHDF5Writer(filePath),
93   mDSMManager(NULL),
94   mDSMBuffer(dsmBuffer),
95   mFAPL(-1),
96   mDSMServerManager(NULL),
97   mDSMServerBuffer(NULL),
98   mWorkerComm(MPI_COMM_NULL),
99   mServerMode(false)
100 {
101 }
102 
XdmfHDF5WriterDSM(const std::string & filePath,MPI_Comm comm,unsigned int bufferSize)103 XdmfHDF5WriterDSM::XdmfHDF5WriterDSM(const std::string & filePath,
104                                      MPI_Comm comm,
105                                      unsigned int bufferSize) :
106   XdmfHDF5Writer(filePath),
107   mFAPL(-1),
108   mDSMServerManager(NULL),
109   mDSMServerBuffer(NULL),
110   mWorkerComm(MPI_COMM_NULL),
111   mServerMode(false)
112 {
113   H5FDdsmManager * newManager = new H5FDdsmManager();
114   newManager->SetMpiComm(comm);
115   newManager->SetLocalBufferSizeMBytes(bufferSize);
116   newManager->SetIsStandAlone(H5FD_DSM_TRUE);
117   newManager->Create();
118 
119   H5FD_dsm_set_manager(newManager);
120 
121   H5FD_dsm_set_options(H5FD_DSM_LOCK_ASYNCHRONOUS);
122 
123   H5FDdsmBuffer * newBuffer = newManager->GetDsmBuffer();
124   mDSMManager = newManager;
125   mDSMBuffer = newBuffer;
126 }
127 #endif
128 
129 // The database/nonthreaded version
XdmfHDF5WriterDSM(const std::string & filePath,XdmfDSMBuffer * const dsmBuffer)130 XdmfHDF5WriterDSM::XdmfHDF5WriterDSM(const std::string & filePath,
131                                      XdmfDSMBuffer * const dsmBuffer) :
132   XdmfHDF5Writer(filePath),
133 #ifdef XDMF_BUILD_DSM_THREADS
134   mDSMManager(NULL),
135   mDSMBuffer(NULL),
136 #endif
137   mFAPL(-1),
138   mDSMServerManager(NULL),
139   mDSMServerBuffer(dsmBuffer),
140   mServerMode(true)
141 {
142   mWorkerComm = mDSMServerBuffer->GetComm()->GetIntraComm();
143   if (xdmf_dsm_get_manager() == NULL) {
144     mDSMServerManager = new XdmfDSMManager();
145     mDSMServerManager->SetLocalBufferSizeMBytes(mDSMServerBuffer->GetLength());
146     mDSMServerManager->SetInterCommType(XDMF_DSM_COMM_MPI);
147     mDSMServerManager->SetIsServer(false);
148     mDSMServerManager->SetMpiComm(mDSMServerBuffer->GetComm()->GetIntraComm());
149     mDSMServerManager->SetDsmBuffer(mDSMServerBuffer);
150     XDMF_dsm_set_manager(mDSMServerManager);
151   }
152   else {
153     static_cast<XdmfDSMManager *>(xdmf_dsm_get_manager())->SetDsmBuffer(mDSMServerBuffer);
154   }
155 }
156 
XdmfHDF5WriterDSM(const std::string & filePath,MPI_Comm comm,unsigned int bufferSize,int startCoreIndex,int endCoreIndex)157 XdmfHDF5WriterDSM::XdmfHDF5WriterDSM(const std::string & filePath,
158                                      MPI_Comm comm,
159                                      unsigned int bufferSize,
160                                      int startCoreIndex,
161                                      int endCoreIndex) :
162   XdmfHDF5Writer(filePath),
163   mFAPL(-1),
164 #ifdef XDMF_BUILD_DSM_THREADS
165   mDSMManager(NULL),
166   mDSMBuffer(NULL),
167 #endif
168   mServerMode(true)
169 {
170   int rank, size;
171 
172   MPI_Comm_size(comm, &size);
173   MPI_Comm_rank(comm, &rank);
174 
175   // Negative values will be changed to maximum range
176   if (startCoreIndex < 0) {
177     startCoreIndex = 0;
178   }
179   if (endCoreIndex < 0) {
180     endCoreIndex = size - 1;
181   }
182 
183   // Ensure start index is less than end index
184   if (startCoreIndex > endCoreIndex) {
185     int tempholder = startCoreIndex;
186     startCoreIndex = endCoreIndex;
187     endCoreIndex = tempholder;
188   }
189 
190   MPI_Comm serverComm;
191 
192   MPI_Group workers, dsmgroup, serversplit, servergroup;
193 
194   int * ServerIds = (int *)calloc((endCoreIndex - startCoreIndex + 1), sizeof(int));
195   unsigned int index = 0;
196   for(int i=startCoreIndex ; i <= endCoreIndex ; ++i) {
197     ServerIds[index++] = i;
198   }
199 
200   MPI_Comm_group(comm, &serversplit);
201   MPI_Group_incl(serversplit, index, ServerIds, &servergroup);
202   MPI_Comm_create(comm, servergroup, &serverComm);
203   MPI_Comm_group(comm, &dsmgroup);
204   MPI_Group_excl(dsmgroup, index, ServerIds, &workers);
205   MPI_Comm_create(comm, workers, &mWorkerComm);
206   cfree(ServerIds);
207 
208   // Create the manager
209 
210   mDSMServerManager = new XdmfDSMManager();
211 
212   mDSMServerManager->SetLocalBufferSizeMBytes(bufferSize);
213   mDSMServerManager->SetInterCommType(XDMF_DSM_COMM_MPI);
214 
215   MPI_Barrier(comm);
216 
217   if (rank >= startCoreIndex && rank <= endCoreIndex) {
218     mDSMServerManager->SetMpiComm(serverComm);
219     mDSMServerManager->Create();
220   }
221   else {
222     mDSMServerManager->SetMpiComm(mWorkerComm);
223     mDSMServerManager->SetIsServer(false);
224     mDSMServerManager->Create(startCoreIndex, endCoreIndex);
225   }
226 
227   XDMF_dsm_set_manager(mDSMServerManager);
228 
229   mDSMServerBuffer = mDSMServerManager->GetDsmBuffer();
230 
231   mDSMServerBuffer->GetComm()->DupInterComm(comm);
232   mDSMServerBuffer->SetIsConnected(true);
233 
234   if (startCoreIndex < size) {
235     if (rank >= startCoreIndex && rank <= endCoreIndex) {
236       mDSMServerManager->GetDsmBuffer()->ReceiveInfo();
237     }
238     else {
239       mDSMServerManager->GetDsmBuffer()->SendInfo();
240     }
241   }
242 
243   MPI_Barrier(comm);
244 
245   // Loop needs to be started before anything can be done to the file
246   // since the service is what sets up the file
247 
248   if (rank < startCoreIndex || rank > endCoreIndex) {
249     // Turn off the server designation
250     mDSMServerBuffer->SetIsServer(false);
251     // If this is set to false then the buffer will attempt to connect
252     // to the intercomm for DSM communications
253     mDSMServerManager->SetIsServer(false);
254   }
255   else {
256     // On cores where memory is set up, start the service loop
257     // This should iterate infinitely until a value to end the loop is passed
258     int returnOpCode;
259     mDSMServerBuffer->BufferServiceLoop(&returnOpCode);
260   }
261 }
262 
~XdmfHDF5WriterDSM()263 XdmfHDF5WriterDSM::~XdmfHDF5WriterDSM()
264 {
265 
266 }
267 
268 shared_ptr<XdmfHeavyDataController>
createController(const std::string & hdf5FilePath,const std::string & dataSetPath,const shared_ptr<const XdmfArrayType> type,const std::vector<unsigned int> & start,const std::vector<unsigned int> & stride,const std::vector<unsigned int> & dimensions,const std::vector<unsigned int> & dataspaceDimensions)269 XdmfHDF5WriterDSM::createController(const std::string & hdf5FilePath,
270                                     const std::string & dataSetPath,
271                                     const shared_ptr<const XdmfArrayType> type,
272                                     const std::vector<unsigned int> & start,
273                                     const std::vector<unsigned int> & stride,
274                                     const std::vector<unsigned int> & dimensions,
275                                     const std::vector<unsigned int> & dataspaceDimensions)
276 {
277   if (mDSMServerBuffer != NULL) {
278         return XdmfHDF5ControllerDSM::New(hdf5FilePath,
279                                       dataSetPath,
280                                       type,
281                                       start,
282                                       stride,
283                                       dimensions,
284                                       dataspaceDimensions,
285                                       mDSMServerBuffer);
286   }
287 #ifdef XDMF_BUILD_DSM_THREADS
288   else if (mDSMBuffer != NULL) {
289     return XdmfHDF5ControllerDSM::New(hdf5FilePath,
290                                       dataSetPath,
291                                       type,
292                                       start,
293                                       stride,
294                                       dimensions,
295                                       dataspaceDimensions,
296                                       mDSMBuffer);
297   }
298 #endif
299   else {
300     return shared_ptr<XdmfHDF5ControllerDSM>();
301   }
302 }
303 
deleteManager()304 void XdmfHDF5WriterDSM::deleteManager()
305 {
306 #ifdef XDMF_BUILD_DSM_THREADS
307   if (mDSMManager != NULL)
308   {
309     delete mDSMManager;
310   }
311 #endif
312   if (mDSMServerManager != NULL)
313   {
314     closeFile();
315     delete mDSMServerManager;
316   }
317 }
318 
319 void
closeFile()320 XdmfHDF5WriterDSM::closeFile()
321 {
322   if(mFAPL >= 0) {
323     H5Pclose(mFAPL);
324     mFAPL = -1;
325   }
326   XdmfHDF5Writer::closeFile();
327 }
328 
329 #ifdef XDMF_BUILD_DSM_THREADS
getBuffer()330 H5FDdsmBuffer * XdmfHDF5WriterDSM::getBuffer()
331 {
332   return mDSMBuffer;
333 }
334 
getManager()335 H5FDdsmManager * XdmfHDF5WriterDSM::getManager()
336 {
337   return mDSMManager;
338 }
339 #endif
340 
getServerBuffer()341 XdmfDSMBuffer * XdmfHDF5WriterDSM::getServerBuffer()
342 {
343   return mDSMServerBuffer;
344 }
345 
getServerManager()346 XdmfDSMManager * XdmfHDF5WriterDSM::getServerManager()
347 {
348   return mDSMServerManager;
349 }
350 
getServerMode()351 bool XdmfHDF5WriterDSM::getServerMode()
352 {
353   return mServerMode;
354 }
355 
getWorkerComm()356 MPI_Comm XdmfHDF5WriterDSM::getWorkerComm()
357 {
358   MPI_Comm returnComm = MPI_COMM_NULL;
359   if (mWorkerComm != MPI_COMM_NULL) {
360     MPI_Comm_dup(mWorkerComm, &returnComm);
361   }
362   return returnComm;
363 }
364 
setAllowSetSplitting(bool newAllow)365 void XdmfHDF5WriterDSM::setAllowSetSplitting(bool newAllow)
366 {
367   //overrides to disable the parent version
368   XdmfHDF5Writer::setAllowSetSplitting(false);
369 }
370 
371 #ifdef XDMF_BUILD_DSM_THREADS
setBuffer(H5FDdsmBuffer * newBuffer)372 void XdmfHDF5WriterDSM::setBuffer(H5FDdsmBuffer * newBuffer)
373 {
374   mDSMBuffer = newBuffer;
375 }
376 #endif
377 
setBuffer(XdmfDSMBuffer * newBuffer)378 void XdmfHDF5WriterDSM::setBuffer(XdmfDSMBuffer * newBuffer)
379 {
380   mDSMServerBuffer = newBuffer;
381 }
382 
383 #ifdef XDMF_BUILD_DSM_THREADS
setManager(H5FDdsmManager * newManager)384 void XdmfHDF5WriterDSM::setManager(H5FDdsmManager * newManager)
385 {
386   H5FDdsmBuffer * newBuffer = newManager->GetDsmBuffer();
387   mDSMManager = newManager;
388   mDSMBuffer = newBuffer;
389 }
390 #endif
391 
setManager(XdmfDSMManager * newManager)392 void XdmfHDF5WriterDSM::setManager(XdmfDSMManager * newManager)
393 {
394   XdmfDSMBuffer * newBuffer = newManager->GetDsmBuffer();
395   mDSMServerManager = newManager;
396   mDSMServerBuffer = newBuffer;
397 }
398 
setServerMode(bool newMode)399 void XdmfHDF5WriterDSM::setServerMode(bool newMode)
400 {
401   mServerMode = newMode;
402 }
403 
setWorkerComm(MPI_Comm comm)404 void XdmfHDF5WriterDSM::setWorkerComm(MPI_Comm comm)
405 {
406   int status;
407 #ifndef OPEN_MPI
408   if (mWorkerComm != MPI_COMM_NULL) {
409     status = MPI_Comm_free(&mWorkerComm);
410     if (status != MPI_SUCCESS) {
411       XdmfError::message(XdmfError::FATAL, "Failed to disconnect Comm");
412     }
413   }
414 #endif
415   if (comm != MPI_COMM_NULL) {
416     status = MPI_Comm_dup(comm, &mWorkerComm);
417     if (status != MPI_SUCCESS) {
418       XdmfError::message(XdmfError::FATAL, "Failed to duplicate Comm");
419     }
420   }
421   mDSMServerBuffer->GetComm()->DupComm(comm);
422 }
423 
stopDSM()424 void XdmfHDF5WriterDSM::stopDSM()
425 {
426   if (mServerMode) {
427     // Send manually
428     for (int i = mDSMServerBuffer->GetStartServerId();
429          i <= mDSMServerBuffer->GetEndServerId();
430          ++i) {
431       mDSMServerBuffer->SendCommandHeader(XDMF_DSM_OPCODE_DONE, i, 0, 0, XDMF_DSM_INTER_COMM);
432     }
433   }
434   else {
435     XdmfError::message(XdmfError::FATAL, "Error: Stopping DSM manually only available in server mode.");
436   }
437 }
438 
restartDSM()439 void XdmfHDF5WriterDSM::restartDSM()
440 {
441   if (mServerMode) {
442     if (mDSMServerBuffer->GetComm()->GetInterId() >=
443           mDSMServerBuffer->GetStartServerId() &&
444         mDSMServerBuffer->GetComm()->GetInterId() <=
445           mDSMServerBuffer->GetEndServerId()) {
446       int returnOpCode;
447       mDSMServerBuffer->BufferServiceLoop(&returnOpCode);
448     }
449   }
450   else {
451     XdmfError::message(XdmfError::FATAL, "Error: Restarting DSM only available in server mode.");
452   }
453 }
454 
455 void
openFile()456 XdmfHDF5WriterDSM::openFile()
457 {
458   if(mFAPL >= 0) {
459     this->closeFile();
460   }
461 
462   // Set file access property list for DSM
463   mFAPL = H5Pcreate(H5P_FILE_ACCESS);
464 
465   if (mServerMode) {
466     if (mWorkerComm != MPI_COMM_NULL) {
467       XDMFH5Pset_fapl_dsm(mFAPL, mWorkerComm, mDSMServerBuffer, 0);
468     }
469   }
470   else {
471 #ifdef XDMF_BUILD_DSM_THREADS
472     H5Pset_fapl_dsm(mFAPL, MPI_COMM_WORLD, mDSMBuffer, 0);
473 #else
474     XdmfError::message(XdmfError::FATAL, "Error: Threaded DSM not enabled.");
475 #endif
476   }
477   XdmfHDF5Writer::openFile(mFAPL);
478 }
479 
visit(XdmfArray & array,const shared_ptr<XdmfBaseVisitor>)480 void XdmfHDF5WriterDSM::visit(XdmfArray & array,
481                               const shared_ptr<XdmfBaseVisitor>)
482 {
483   bool closeFAPL = false;
484 
485   if(mFAPL < 0) {
486     // Set file access property list for DSM
487     mFAPL = H5Pcreate(H5P_FILE_ACCESS);
488     // Use DSM driver
489     if (mServerMode) {
490       if (mWorkerComm != MPI_COMM_NULL) {
491         XDMFH5Pset_fapl_dsm(mFAPL, mWorkerComm, mDSMServerBuffer, 0);
492       }
493     }
494     else {
495 #ifdef XDMF_BUILD_DSM_THREADS
496       H5Pset_fapl_dsm(mFAPL, MPI_COMM_WORLD, mDSMBuffer, 0);
497 #else
498       XdmfError::message(XdmfError::FATAL, "Error: Threaded DSM not enabled.");
499 #endif
500     }
501 
502     closeFAPL = true;
503   }
504 
505   // Write to DSM Buffer
506   this->write(array, mFAPL);
507 
508   if(closeFAPL) {
509     // Close file access property list
510     H5Pclose(mFAPL);
511     mFAPL = -1;
512   }
513 
514 }
515