1 #include <mpi.h>
2 #include <XdmfArray.h>
3 #include <XdmfHDF.h>
4
5
6 /// Simple memory buffer implementation that keeps track of it's stream pointer.
7 class Buffer {
8 private:
9 std::size_t m_size;
10 char* m_data;
11 char* m_put;
12 char* m_tell;
13
14 public:
Buffer(std::size_t lsize)15 Buffer( std::size_t lsize ) :
16 m_size( lsize ),
17 m_data( new char[lsize] ),
18 m_put( m_data ),
19 m_tell( m_data )
20 {}
21
~Buffer()22 ~Buffer() {
23 delete [] m_data;
24 }
25
26 /// put a single value into the buffer
27 template< typename T >
put(const T & t)28 void put( const T& t ) {
29 std::size_t lsize = sizeof( T );
30 memcpy( m_put, &t, lsize );
31 m_put += lsize;
32 }
33
34 /// copy a contiguous block into the buffer
put(void * data,std::size_t lsize)35 void put( void* data, std::size_t lsize ) {
36 memcpy( m_put, data, lsize );
37 m_put += lsize;
38 }
39
40 /// Copy a single value into the buffer.
41 template< typename T >
tell()42 T tell() {
43 std::size_t tsize = sizeof( T );
44 T tmp;
45 memcpy( &tmp, m_tell, tsize );
46 m_tell += tsize;
47 return tmp;
48 }
49
50 /// copy a contiguous block of data from the buffer to an already allocated
51 /// location
tell(void * out,std::size_t lsize)52 void tell( void* out, std::size_t lsize ) {
53 memcpy( out, m_tell, lsize );
54 m_tell += lsize;
55 }
56
size() const57 std::size_t size() const {
58 return m_size;
59 }
60
pointer()61 char* pointer() {
62 return m_data;
63 }
64
reset()65 void reset() {
66 m_put = m_data;
67 m_tell = m_data;
68 }
69 };
70
71 /// Callback implements parallel IO by communicating to rank 0 in MPI_COMM_WORLD.
72 class CommunicationCallback :
73 public XdmfOpenCallback,
74 public XdmfWriteCallback,
75 public XdmfCloseCallback,
76 public XdmfReadCallback
77 {
78 private:
79 int mCommRank;
80 int mCommSize;
81 public:
82
CommunicationCallback()83 CommunicationCallback() {
84 MPI_Comm_size( MPI_COMM_WORLD, &mCommSize );
85 MPI_Comm_rank( MPI_COMM_WORLD, &mCommRank );
86 }
87
DoOpen(XdmfHeavyData * ds,XdmfConstString name,XdmfConstString access)88 XdmfInt32 DoOpen(
89 XdmfHeavyData* ds,
90 XdmfConstString name,
91 XdmfConstString access )
92 {
93 // If HDF5 is compiled with Parallel IO, we must use collective open
94 #ifndef H5_HAVE_PARALLEL
95 if ( mCommRank == 0 ) {
96 return ds->DoOpen( name, access );
97 } else {
98 return XDMF_SUCCESS;
99 }
100 #else
101 return ds->DoOpen( name, access );
102 #endif
103 }
104
DoClose(XdmfHeavyData * ds)105 XdmfInt32 DoClose( XdmfHeavyData* ds )
106 {
107 #ifndef H5_HAVE_PARALLEL
108 if ( mCommRank == 0 ) {
109 return ds->DoClose();
110 } else {
111 return XDMF_SUCCESS;
112 }
113 #else
114 return ds->DoClose();
115 #endif
116 }
117
DoWrite(XdmfHeavyData * ds,XdmfArray * array)118 XdmfInt32 DoWrite( XdmfHeavyData* ds, XdmfArray* array )
119 {
120 MPI_Status stat;
121 // this is a really bad implementation that assumes rank 0 has the same data
122 // size as everyone else, but we're really just going for a simple
123 // example here. The real coalescing implementation will require a few more
124 // classes to handle buffering the data cleanly and robustly.
125
126 XdmfInt64 start[1], stride[1], count[1];
127 XdmfInt32 slab_rank = ds->GetHyperSlab( start, stride, count );
128 std::size_t slab_info_size =
129 sizeof( XdmfInt32 ) // slab rank
130 + slab_rank * sizeof( XdmfInt64 ) * 3; // start, stride, and count
131 Buffer buf( slab_info_size + array->GetCoreLength() );
132
133 if ( mCommRank != 0 ) {
134 // copy local data to the buffer for sending
135 buf.put( slab_rank );
136 for ( int i = 0; i < slab_rank; ++i ) {
137 buf.put( start[i] );
138 buf.put( stride[i] );
139 buf.put( count[i] );
140 }
141 buf.put( array->GetDataPointer(), array->GetCoreLength() );
142 MPI_Send(
143 buf.pointer(),
144 buf.size(),
145 MPI_BYTE,
146 0,
147 0,
148 MPI_COMM_WORLD );
149 } else {
150 // first, it's easy to write my own data
151 ds->DoWrite( array );
152
153 int processes_received = 1; // I've written local data
154 while ( processes_received < mCommSize ) {
155 MPI_Recv(
156 buf.pointer(),
157 buf.size(),
158 MPI_BYTE,
159 MPI_ANY_SOURCE,
160 0,
161 MPI_COMM_WORLD,
162 &stat );
163 processes_received++;
164
165 // pull the information from the buffer
166 buf.reset();
167 slab_rank = buf.tell< XdmfInt32 >();
168 for( int i = 0; i < slab_rank; ++i ) {
169 start[i] = buf.tell< XdmfInt64 >();
170 stride[i] = buf.tell< XdmfInt64 >();
171 count[i] = buf.tell< XdmfInt64 >();
172 }
173 ds->SelectHyperSlab( start, stride, count );
174 XdmfArray* recv = new XdmfArray;
175 recv->CopyShape( array );
176 buf.tell( recv->GetDataPointer(), recv->GetCoreLength() );
177 ds->DoWrite( recv );
178 delete recv;
179 }
180 }
181
182 return XDMF_SUCCESS;
183 }
184
DoRead(XdmfHeavyData * ds,XdmfArray * array)185 XdmfArray* DoRead( XdmfHeavyData* ds, XdmfArray* array )
186 {
187 if ( mCommRank == 0 ) {
188 return ds->DoRead( array );
189 } else {
190 return NULL;
191 }
192 }
193 };
194
195 char const * const kDatasetName = "FILE:TestFile.h5:/XdmfHDFMPI";
196
main(int argc,char * argv[])197 int main( int argc, char* argv[] ) {
198 MPI_Init( &argc, &argv );
199
200 int rank;
201 MPI_Comm_rank( MPI_COMM_WORLD, &rank );
202
203 XdmfHDF* H5 = new XdmfHDF();
204 CommunicationCallback* cb = new CommunicationCallback;
205 H5->setOpenCallback( cb );
206 H5->setWriteCallback( cb );
207 H5->setCloseCallback(cb );
208 XdmfArray* MyData = new XdmfArray();
209
210 MyData->SetNumberType( XDMF_FLOAT32_TYPE );
211 MyData->SetNumberOfElements( 25 );
212 MyData->Generate( rank * 25, rank*25 + 24 );
213
214 H5->CopyType( MyData );
215 XdmfInt64 dims[1], start[1], stride[1], count[1];
216 dims[0] = 100;
217 H5->SetShape( 1, dims );
218 start[0] = rank * 25;
219 stride[0] = 1;
220 count[0] = 25;
221 H5->SelectHyperSlab( start, stride, count );
222 H5->Open( kDatasetName, "w" );
223 H5->Write( MyData );
224 H5->Close();
225
226 bool failure = false;
227
228 XdmfHDF* H5In = new XdmfHDF();
229 H5In->setReadCallback( cb );
230 H5In->setOpenCallback( cb );
231 H5In->Open( kDatasetName, "r" );
232 XdmfArray* result = H5In->Read();
233
234 if ( result ) {
235 for ( size_t i = 0; i < 100; ++i ) {
236 float value = result->GetValueAsFloat32( i );
237 std::cout << i << " " << value << std::endl;
238 failure = ( value != i );
239 }
240 }
241
242 delete H5;
243 delete cb;
244 delete MyData;
245 delete H5In;
246 delete result;
247
248 MPI_Finalize();
249
250 if ( failure ) {
251 return -1;
252 } else {
253 return 0;
254 }
255 };
256
257