1 /*****************************************************************************/
2 /* XDMF */
3 /* eXtensible Data Model and Format */
4 /* */
5 /* Id : XdmfHDF5WriterDSM.cpp */
6 /* */
7 /* Author: */
8 /* Kenneth Leiter */
9 /* kenneth.leiter@arl.army.mil */
10 /* US Army Research Laboratory */
11 /* Aberdeen Proving Ground, MD */
12 /* */
13 /* Copyright @ 2011 US Army Research Laboratory */
14 /* All Rights Reserved */
15 /* See Copyright.txt for details */
16 /* */
17 /* This software is distributed WITHOUT ANY WARRANTY; without */
18 /* even the implied warranty of MERCHANTABILITY or FITNESS */
19 /* FOR A PARTICULAR PURPOSE. See the above copyright notice */
20 /* for more information. */
21 /* */
22 /*****************************************************************************/
23
24 #ifdef XDMF_BUILD_DSM_THREADS
25 #include <H5FDdsm.h>
26 #include <H5FDdsmManager.h>
27 #include <H5FDdsmBuffer.h>
28 #include <H5FDdsmBufferService.h>
29 #include <H5FDdsmComm.h>
30 #endif
31 #include <H5public.h>
32 #include <hdf5.h>
33 #include <XdmfDSMCommMPI.hpp>
34 #include <XdmfDSMBuffer.hpp>
35 #include <XdmfDSMManager.hpp>
36 #include <XdmfDSMDriver.hpp>
37 #include "XdmfHDF5ControllerDSM.hpp"
38 #include "XdmfHDF5WriterDSM.hpp"
39 #include "XdmfError.hpp"
40
41 #ifdef XDMF_BUILD_DSM_THREADS
42 shared_ptr<XdmfHDF5WriterDSM>
New(const std::string & filePath,H5FDdsmBuffer * const dsmBuffer)43 XdmfHDF5WriterDSM::New(const std::string & filePath,
44 H5FDdsmBuffer * const dsmBuffer)
45 {
46 shared_ptr<XdmfHDF5WriterDSM> p(new XdmfHDF5WriterDSM(filePath,
47 dsmBuffer));
48 return p;
49 }
50 #endif
51
52 shared_ptr<XdmfHDF5WriterDSM>
New(const std::string & filePath,XdmfDSMBuffer * const dsmBuffer)53 XdmfHDF5WriterDSM::New(const std::string & filePath,
54 XdmfDSMBuffer * const dsmBuffer)
55 {
56 shared_ptr<XdmfHDF5WriterDSM> p(new XdmfHDF5WriterDSM(filePath,
57 dsmBuffer));
58 return p;
59 }
60
61 #ifdef XDMF_BUILD_DSM_THREADS
62 shared_ptr<XdmfHDF5WriterDSM>
New(const std::string & filePath,MPI_Comm comm,unsigned int bufferSize)63 XdmfHDF5WriterDSM::New(const std::string & filePath,
64 MPI_Comm comm,
65 unsigned int bufferSize)
66 {
67 shared_ptr<XdmfHDF5WriterDSM> p(new XdmfHDF5WriterDSM(filePath,
68 comm,
69 bufferSize));
70 return p;
71 }
72 #endif
73
74 shared_ptr<XdmfHDF5WriterDSM>
New(const std::string & filePath,MPI_Comm comm,unsigned int bufferSize,int startCoreIndex,int endCoreIndex)75 XdmfHDF5WriterDSM::New(const std::string & filePath,
76 MPI_Comm comm,
77 unsigned int bufferSize,
78 int startCoreIndex,
79 int endCoreIndex)
80 {
81 shared_ptr<XdmfHDF5WriterDSM> p(new XdmfHDF5WriterDSM(filePath,
82 comm,
83 bufferSize,
84 startCoreIndex,
85 endCoreIndex));
86 return p;
87 }
88
89 #ifdef XDMF_BUILD_DSM_THREADS
XdmfHDF5WriterDSM(const std::string & filePath,H5FDdsmBuffer * const dsmBuffer)90 XdmfHDF5WriterDSM::XdmfHDF5WriterDSM(const std::string & filePath,
91 H5FDdsmBuffer * const dsmBuffer) :
92 XdmfHDF5Writer(filePath),
93 mDSMManager(NULL),
94 mDSMBuffer(dsmBuffer),
95 mFAPL(-1),
96 mDSMServerManager(NULL),
97 mDSMServerBuffer(NULL),
98 mWorkerComm(MPI_COMM_NULL),
99 mServerMode(false)
100 {
101 }
102
XdmfHDF5WriterDSM(const std::string & filePath,MPI_Comm comm,unsigned int bufferSize)103 XdmfHDF5WriterDSM::XdmfHDF5WriterDSM(const std::string & filePath,
104 MPI_Comm comm,
105 unsigned int bufferSize) :
106 XdmfHDF5Writer(filePath),
107 mFAPL(-1),
108 mDSMServerManager(NULL),
109 mDSMServerBuffer(NULL),
110 mWorkerComm(MPI_COMM_NULL),
111 mServerMode(false)
112 {
113 H5FDdsmManager * newManager = new H5FDdsmManager();
114 newManager->SetMpiComm(comm);
115 newManager->SetLocalBufferSizeMBytes(bufferSize);
116 newManager->SetIsStandAlone(H5FD_DSM_TRUE);
117 newManager->Create();
118
119 H5FD_dsm_set_manager(newManager);
120
121 H5FD_dsm_set_options(H5FD_DSM_LOCK_ASYNCHRONOUS);
122
123 H5FDdsmBuffer * newBuffer = newManager->GetDsmBuffer();
124 mDSMManager = newManager;
125 mDSMBuffer = newBuffer;
126 }
127 #endif
128
129 // The database/nonthreaded version
XdmfHDF5WriterDSM(const std::string & filePath,XdmfDSMBuffer * const dsmBuffer)130 XdmfHDF5WriterDSM::XdmfHDF5WriterDSM(const std::string & filePath,
131 XdmfDSMBuffer * const dsmBuffer) :
132 XdmfHDF5Writer(filePath),
133 #ifdef XDMF_BUILD_DSM_THREADS
134 mDSMManager(NULL),
135 mDSMBuffer(NULL),
136 #endif
137 mFAPL(-1),
138 mDSMServerManager(NULL),
139 mDSMServerBuffer(dsmBuffer),
140 mServerMode(true)
141 {
142 mWorkerComm = mDSMServerBuffer->GetComm()->GetIntraComm();
143 if (xdmf_dsm_get_manager() == NULL) {
144 mDSMServerManager = new XdmfDSMManager();
145 mDSMServerManager->SetLocalBufferSizeMBytes(mDSMServerBuffer->GetLength());
146 mDSMServerManager->SetInterCommType(XDMF_DSM_COMM_MPI);
147 mDSMServerManager->SetIsServer(false);
148 mDSMServerManager->SetMpiComm(mDSMServerBuffer->GetComm()->GetIntraComm());
149 mDSMServerManager->SetDsmBuffer(mDSMServerBuffer);
150 XDMF_dsm_set_manager(mDSMServerManager);
151 }
152 else {
153 static_cast<XdmfDSMManager *>(xdmf_dsm_get_manager())->SetDsmBuffer(mDSMServerBuffer);
154 }
155 }
156
XdmfHDF5WriterDSM(const std::string & filePath,MPI_Comm comm,unsigned int bufferSize,int startCoreIndex,int endCoreIndex)157 XdmfHDF5WriterDSM::XdmfHDF5WriterDSM(const std::string & filePath,
158 MPI_Comm comm,
159 unsigned int bufferSize,
160 int startCoreIndex,
161 int endCoreIndex) :
162 XdmfHDF5Writer(filePath),
163 mFAPL(-1),
164 #ifdef XDMF_BUILD_DSM_THREADS
165 mDSMManager(NULL),
166 mDSMBuffer(NULL),
167 #endif
168 mServerMode(true)
169 {
170 int rank, size;
171
172 MPI_Comm_size(comm, &size);
173 MPI_Comm_rank(comm, &rank);
174
175 // Negative values will be changed to maximum range
176 if (startCoreIndex < 0) {
177 startCoreIndex = 0;
178 }
179 if (endCoreIndex < 0) {
180 endCoreIndex = size - 1;
181 }
182
183 // Ensure start index is less than end index
184 if (startCoreIndex > endCoreIndex) {
185 int tempholder = startCoreIndex;
186 startCoreIndex = endCoreIndex;
187 endCoreIndex = tempholder;
188 }
189
190 MPI_Comm serverComm;
191
192 MPI_Group workers, dsmgroup, serversplit, servergroup;
193
194 int * ServerIds = (int *)calloc((endCoreIndex - startCoreIndex + 1), sizeof(int));
195 unsigned int index = 0;
196 for(int i=startCoreIndex ; i <= endCoreIndex ; ++i) {
197 ServerIds[index++] = i;
198 }
199
200 MPI_Comm_group(comm, &serversplit);
201 MPI_Group_incl(serversplit, index, ServerIds, &servergroup);
202 MPI_Comm_create(comm, servergroup, &serverComm);
203 MPI_Comm_group(comm, &dsmgroup);
204 MPI_Group_excl(dsmgroup, index, ServerIds, &workers);
205 MPI_Comm_create(comm, workers, &mWorkerComm);
206 cfree(ServerIds);
207
208 // Create the manager
209
210 mDSMServerManager = new XdmfDSMManager();
211
212 mDSMServerManager->SetLocalBufferSizeMBytes(bufferSize);
213 mDSMServerManager->SetInterCommType(XDMF_DSM_COMM_MPI);
214
215 MPI_Barrier(comm);
216
217 if (rank >= startCoreIndex && rank <= endCoreIndex) {
218 mDSMServerManager->SetMpiComm(serverComm);
219 mDSMServerManager->Create();
220 }
221 else {
222 mDSMServerManager->SetMpiComm(mWorkerComm);
223 mDSMServerManager->SetIsServer(false);
224 mDSMServerManager->Create(startCoreIndex, endCoreIndex);
225 }
226
227 XDMF_dsm_set_manager(mDSMServerManager);
228
229 mDSMServerBuffer = mDSMServerManager->GetDsmBuffer();
230
231 mDSMServerBuffer->GetComm()->DupInterComm(comm);
232 mDSMServerBuffer->SetIsConnected(true);
233
234 if (startCoreIndex < size) {
235 if (rank >= startCoreIndex && rank <= endCoreIndex) {
236 mDSMServerManager->GetDsmBuffer()->ReceiveInfo();
237 }
238 else {
239 mDSMServerManager->GetDsmBuffer()->SendInfo();
240 }
241 }
242
243 MPI_Barrier(comm);
244
245 // Loop needs to be started before anything can be done to the file
246 // since the service is what sets up the file
247
248 if (rank < startCoreIndex || rank > endCoreIndex) {
249 // Turn off the server designation
250 mDSMServerBuffer->SetIsServer(false);
251 // If this is set to false then the buffer will attempt to connect
252 // to the intercomm for DSM communications
253 mDSMServerManager->SetIsServer(false);
254 }
255 else {
256 // On cores where memory is set up, start the service loop
257 // This should iterate infinitely until a value to end the loop is passed
258 int returnOpCode;
259 mDSMServerBuffer->BufferServiceLoop(&returnOpCode);
260 }
261 }
262
~XdmfHDF5WriterDSM()263 XdmfHDF5WriterDSM::~XdmfHDF5WriterDSM()
264 {
265
266 }
267
268 shared_ptr<XdmfHeavyDataController>
createController(const std::string & hdf5FilePath,const std::string & dataSetPath,const shared_ptr<const XdmfArrayType> type,const std::vector<unsigned int> & start,const std::vector<unsigned int> & stride,const std::vector<unsigned int> & dimensions,const std::vector<unsigned int> & dataspaceDimensions)269 XdmfHDF5WriterDSM::createController(const std::string & hdf5FilePath,
270 const std::string & dataSetPath,
271 const shared_ptr<const XdmfArrayType> type,
272 const std::vector<unsigned int> & start,
273 const std::vector<unsigned int> & stride,
274 const std::vector<unsigned int> & dimensions,
275 const std::vector<unsigned int> & dataspaceDimensions)
276 {
277 if (mDSMServerBuffer != NULL) {
278 return XdmfHDF5ControllerDSM::New(hdf5FilePath,
279 dataSetPath,
280 type,
281 start,
282 stride,
283 dimensions,
284 dataspaceDimensions,
285 mDSMServerBuffer);
286 }
287 #ifdef XDMF_BUILD_DSM_THREADS
288 else if (mDSMBuffer != NULL) {
289 return XdmfHDF5ControllerDSM::New(hdf5FilePath,
290 dataSetPath,
291 type,
292 start,
293 stride,
294 dimensions,
295 dataspaceDimensions,
296 mDSMBuffer);
297 }
298 #endif
299 else {
300 return shared_ptr<XdmfHDF5ControllerDSM>();
301 }
302 }
303
deleteManager()304 void XdmfHDF5WriterDSM::deleteManager()
305 {
306 #ifdef XDMF_BUILD_DSM_THREADS
307 if (mDSMManager != NULL)
308 {
309 delete mDSMManager;
310 }
311 #endif
312 if (mDSMServerManager != NULL)
313 {
314 closeFile();
315 delete mDSMServerManager;
316 }
317 }
318
319 void
closeFile()320 XdmfHDF5WriterDSM::closeFile()
321 {
322 if(mFAPL >= 0) {
323 H5Pclose(mFAPL);
324 mFAPL = -1;
325 }
326 XdmfHDF5Writer::closeFile();
327 }
328
329 #ifdef XDMF_BUILD_DSM_THREADS
getBuffer()330 H5FDdsmBuffer * XdmfHDF5WriterDSM::getBuffer()
331 {
332 return mDSMBuffer;
333 }
334
getManager()335 H5FDdsmManager * XdmfHDF5WriterDSM::getManager()
336 {
337 return mDSMManager;
338 }
339 #endif
340
getServerBuffer()341 XdmfDSMBuffer * XdmfHDF5WriterDSM::getServerBuffer()
342 {
343 return mDSMServerBuffer;
344 }
345
getServerManager()346 XdmfDSMManager * XdmfHDF5WriterDSM::getServerManager()
347 {
348 return mDSMServerManager;
349 }
350
getServerMode()351 bool XdmfHDF5WriterDSM::getServerMode()
352 {
353 return mServerMode;
354 }
355
getWorkerComm()356 MPI_Comm XdmfHDF5WriterDSM::getWorkerComm()
357 {
358 MPI_Comm returnComm = MPI_COMM_NULL;
359 if (mWorkerComm != MPI_COMM_NULL) {
360 MPI_Comm_dup(mWorkerComm, &returnComm);
361 }
362 return returnComm;
363 }
364
setAllowSetSplitting(bool newAllow)365 void XdmfHDF5WriterDSM::setAllowSetSplitting(bool newAllow)
366 {
367 //overrides to disable the parent version
368 XdmfHDF5Writer::setAllowSetSplitting(false);
369 }
370
371 #ifdef XDMF_BUILD_DSM_THREADS
setBuffer(H5FDdsmBuffer * newBuffer)372 void XdmfHDF5WriterDSM::setBuffer(H5FDdsmBuffer * newBuffer)
373 {
374 mDSMBuffer = newBuffer;
375 }
376 #endif
377
setBuffer(XdmfDSMBuffer * newBuffer)378 void XdmfHDF5WriterDSM::setBuffer(XdmfDSMBuffer * newBuffer)
379 {
380 mDSMServerBuffer = newBuffer;
381 }
382
383 #ifdef XDMF_BUILD_DSM_THREADS
setManager(H5FDdsmManager * newManager)384 void XdmfHDF5WriterDSM::setManager(H5FDdsmManager * newManager)
385 {
386 H5FDdsmBuffer * newBuffer = newManager->GetDsmBuffer();
387 mDSMManager = newManager;
388 mDSMBuffer = newBuffer;
389 }
390 #endif
391
setManager(XdmfDSMManager * newManager)392 void XdmfHDF5WriterDSM::setManager(XdmfDSMManager * newManager)
393 {
394 XdmfDSMBuffer * newBuffer = newManager->GetDsmBuffer();
395 mDSMServerManager = newManager;
396 mDSMServerBuffer = newBuffer;
397 }
398
setServerMode(bool newMode)399 void XdmfHDF5WriterDSM::setServerMode(bool newMode)
400 {
401 mServerMode = newMode;
402 }
403
setWorkerComm(MPI_Comm comm)404 void XdmfHDF5WriterDSM::setWorkerComm(MPI_Comm comm)
405 {
406 int status;
407 #ifndef OPEN_MPI
408 if (mWorkerComm != MPI_COMM_NULL) {
409 status = MPI_Comm_free(&mWorkerComm);
410 if (status != MPI_SUCCESS) {
411 XdmfError::message(XdmfError::FATAL, "Failed to disconnect Comm");
412 }
413 }
414 #endif
415 if (comm != MPI_COMM_NULL) {
416 status = MPI_Comm_dup(comm, &mWorkerComm);
417 if (status != MPI_SUCCESS) {
418 XdmfError::message(XdmfError::FATAL, "Failed to duplicate Comm");
419 }
420 }
421 mDSMServerBuffer->GetComm()->DupComm(comm);
422 }
423
stopDSM()424 void XdmfHDF5WriterDSM::stopDSM()
425 {
426 if (mServerMode) {
427 // Send manually
428 for (int i = mDSMServerBuffer->GetStartServerId();
429 i <= mDSMServerBuffer->GetEndServerId();
430 ++i) {
431 mDSMServerBuffer->SendCommandHeader(XDMF_DSM_OPCODE_DONE, i, 0, 0, XDMF_DSM_INTER_COMM);
432 }
433 }
434 else {
435 XdmfError::message(XdmfError::FATAL, "Error: Stopping DSM manually only available in server mode.");
436 }
437 }
438
restartDSM()439 void XdmfHDF5WriterDSM::restartDSM()
440 {
441 if (mServerMode) {
442 if (mDSMServerBuffer->GetComm()->GetInterId() >=
443 mDSMServerBuffer->GetStartServerId() &&
444 mDSMServerBuffer->GetComm()->GetInterId() <=
445 mDSMServerBuffer->GetEndServerId()) {
446 int returnOpCode;
447 mDSMServerBuffer->BufferServiceLoop(&returnOpCode);
448 }
449 }
450 else {
451 XdmfError::message(XdmfError::FATAL, "Error: Restarting DSM only available in server mode.");
452 }
453 }
454
455 void
openFile()456 XdmfHDF5WriterDSM::openFile()
457 {
458 if(mFAPL >= 0) {
459 this->closeFile();
460 }
461
462 // Set file access property list for DSM
463 mFAPL = H5Pcreate(H5P_FILE_ACCESS);
464
465 if (mServerMode) {
466 if (mWorkerComm != MPI_COMM_NULL) {
467 XDMFH5Pset_fapl_dsm(mFAPL, mWorkerComm, mDSMServerBuffer, 0);
468 }
469 }
470 else {
471 #ifdef XDMF_BUILD_DSM_THREADS
472 H5Pset_fapl_dsm(mFAPL, MPI_COMM_WORLD, mDSMBuffer, 0);
473 #else
474 XdmfError::message(XdmfError::FATAL, "Error: Threaded DSM not enabled.");
475 #endif
476 }
477 XdmfHDF5Writer::openFile(mFAPL);
478 }
479
visit(XdmfArray & array,const shared_ptr<XdmfBaseVisitor>)480 void XdmfHDF5WriterDSM::visit(XdmfArray & array,
481 const shared_ptr<XdmfBaseVisitor>)
482 {
483 bool closeFAPL = false;
484
485 if(mFAPL < 0) {
486 // Set file access property list for DSM
487 mFAPL = H5Pcreate(H5P_FILE_ACCESS);
488 // Use DSM driver
489 if (mServerMode) {
490 if (mWorkerComm != MPI_COMM_NULL) {
491 XDMFH5Pset_fapl_dsm(mFAPL, mWorkerComm, mDSMServerBuffer, 0);
492 }
493 }
494 else {
495 #ifdef XDMF_BUILD_DSM_THREADS
496 H5Pset_fapl_dsm(mFAPL, MPI_COMM_WORLD, mDSMBuffer, 0);
497 #else
498 XdmfError::message(XdmfError::FATAL, "Error: Threaded DSM not enabled.");
499 #endif
500 }
501
502 closeFAPL = true;
503 }
504
505 // Write to DSM Buffer
506 this->write(array, mFAPL);
507
508 if(closeFAPL) {
509 // Close file access property list
510 H5Pclose(mFAPL);
511 mFAPL = -1;
512 }
513
514 }
515