1 #include <mpi.h>
2 #include <XdmfArray.h>
3 #include <XdmfHDF.h>
4 
5 
6 /// Simple memory buffer implementation that keeps track of it's stream pointer.
7 class Buffer {
8 private:
9   std::size_t m_size;
10   char* m_data;
11   char* m_put;
12   char* m_tell;
13 
14 public:
Buffer(std::size_t lsize)15   Buffer( std::size_t lsize ) :
16     m_size( lsize ),
17     m_data( new char[lsize] ),
18     m_put( m_data ),
19     m_tell( m_data )
20   {}
21 
~Buffer()22   ~Buffer() {
23     delete [] m_data;
24   }
25 
26   /// put a single value into the buffer
27   template< typename T >
put(const T & t)28   void put( const T& t ) {
29     std::size_t lsize = sizeof( T );
30     memcpy( m_put, &t, lsize );
31     m_put += lsize;
32   }
33 
34   /// copy a contiguous block into the buffer
put(void * data,std::size_t lsize)35   void put( void* data, std::size_t lsize ) {
36     memcpy( m_put, data, lsize );
37     m_put += lsize;
38   }
39 
40   /// Copy a single value into the buffer.
41   template< typename T >
tell()42   T tell() {
43     std::size_t tsize = sizeof( T );
44     T tmp;
45     memcpy( &tmp, m_tell, tsize );
46     m_tell += tsize;
47     return tmp;
48   }
49 
50   /// copy a contiguous block of data from the buffer to an already allocated
51   /// location
tell(void * out,std::size_t lsize)52   void tell( void* out, std::size_t lsize ) {
53     memcpy( out, m_tell, lsize );
54     m_tell += lsize;
55   }
56 
size() const57   std::size_t size() const {
58     return m_size;
59   }
60 
pointer()61   char* pointer() {
62     return m_data;
63   }
64 
reset()65   void reset() {
66     m_put = m_data;
67     m_tell = m_data;
68   }
69 };
70 
71 /// Callback implements parallel IO by communicating to rank 0 in MPI_COMM_WORLD.
72 class CommunicationCallback :
73   public XdmfOpenCallback,
74   public XdmfWriteCallback,
75   public XdmfCloseCallback,
76   public XdmfReadCallback
77 {
78 private:
79   int mCommRank;
80   int mCommSize;
81 public:
82 
CommunicationCallback()83   CommunicationCallback() {
84     MPI_Comm_size( MPI_COMM_WORLD, &mCommSize );
85     MPI_Comm_rank( MPI_COMM_WORLD, &mCommRank );
86   }
87 
DoOpen(XdmfHeavyData * ds,XdmfConstString name,XdmfConstString access)88   XdmfInt32 DoOpen(
89     XdmfHeavyData* ds,
90     XdmfConstString name,
91     XdmfConstString access )
92   {
93   // If HDF5 is compiled with Parallel IO, we must use collective open
94 #ifndef H5_HAVE_PARALLEL
95     if ( mCommRank == 0 ) {
96       return ds->DoOpen( name, access );
97     } else {
98       return XDMF_SUCCESS;
99     }
100 #else
101     return ds->DoOpen( name, access );
102 #endif
103   }
104 
DoClose(XdmfHeavyData * ds)105   XdmfInt32 DoClose( XdmfHeavyData* ds )
106   {
107 #ifndef H5_HAVE_PARALLEL
108     if ( mCommRank == 0 ) {
109       return ds->DoClose();
110     } else {
111       return XDMF_SUCCESS;
112     }
113 #else
114     return ds->DoClose();
115 #endif
116   }
117 
DoWrite(XdmfHeavyData * ds,XdmfArray * array)118   XdmfInt32 DoWrite( XdmfHeavyData* ds, XdmfArray* array )
119   {
120     MPI_Status stat;
121     // this is a really bad implementation that assumes rank 0 has the same data
122     // size as everyone else, but we're really just going for a simple
123     // example here.  The real coalescing implementation will require a few more
124     // classes to handle buffering the data cleanly and robustly.
125 
126     XdmfInt64 start[1], stride[1], count[1];
127     XdmfInt32 slab_rank = ds->GetHyperSlab( start, stride, count );
128     std::size_t slab_info_size =
129       sizeof( XdmfInt32 ) // slab rank
130       + slab_rank * sizeof( XdmfInt64 ) * 3; // start, stride, and count
131     Buffer buf( slab_info_size + array->GetCoreLength() );
132 
133     if ( mCommRank != 0 ) {
134       // copy local data to the buffer for sending
135       buf.put( slab_rank );
136       for ( int i = 0; i < slab_rank; ++i ) {
137         buf.put( start[i] );
138         buf.put( stride[i] );
139         buf.put( count[i] );
140       }
141       buf.put( array->GetDataPointer(), array->GetCoreLength() );
142       MPI_Send(
143         buf.pointer(),
144         buf.size(),
145         MPI_BYTE,
146         0,
147         0,
148         MPI_COMM_WORLD );
149     } else {
150       // first, it's easy to write my own data
151       ds->DoWrite( array );
152 
153       int processes_received = 1; // I've written local data
154       while ( processes_received < mCommSize ) {
155         MPI_Recv(
156           buf.pointer(),
157           buf.size(),
158           MPI_BYTE,
159           MPI_ANY_SOURCE,
160           0,
161           MPI_COMM_WORLD,
162           &stat );
163         processes_received++;
164 
165         // pull the information from the buffer
166         buf.reset();
167         slab_rank = buf.tell< XdmfInt32 >();
168         for( int i = 0; i < slab_rank; ++i ) {
169           start[i] = buf.tell< XdmfInt64 >();
170           stride[i] = buf.tell< XdmfInt64 >();
171           count[i] = buf.tell< XdmfInt64 >();
172         }
173         ds->SelectHyperSlab( start, stride, count );
174         XdmfArray* recv = new XdmfArray;
175         recv->CopyShape( array );
176         buf.tell( recv->GetDataPointer(), recv->GetCoreLength() );
177         ds->DoWrite( recv );
178         delete recv;
179       }
180     }
181 
182     return XDMF_SUCCESS;
183   }
184 
DoRead(XdmfHeavyData * ds,XdmfArray * array)185   XdmfArray* DoRead( XdmfHeavyData* ds, XdmfArray* array )
186   {
187     if ( mCommRank == 0 ) {
188       return ds->DoRead( array );
189     } else {
190       return NULL;
191     }
192   }
193 };
194 
195 char const * const kDatasetName = "FILE:TestFile.h5:/XdmfHDFMPI";
196 
main(int argc,char * argv[])197 int main( int argc, char* argv[] ) {
198   MPI_Init( &argc, &argv );
199 
200   int rank;
201   MPI_Comm_rank( MPI_COMM_WORLD, &rank );
202 
203   XdmfHDF* H5 = new XdmfHDF();
204   CommunicationCallback* cb = new CommunicationCallback;
205   H5->setOpenCallback( cb );
206   H5->setWriteCallback( cb );
207   H5->setCloseCallback(cb );
208   XdmfArray* MyData = new XdmfArray();
209 
210   MyData->SetNumberType( XDMF_FLOAT32_TYPE );
211   MyData->SetNumberOfElements( 25 );
212   MyData->Generate( rank * 25, rank*25 + 24 );
213 
214   H5->CopyType( MyData );
215   XdmfInt64 dims[1], start[1], stride[1], count[1];
216   dims[0] = 100;
217   H5->SetShape( 1, dims );
218   start[0] = rank * 25;
219   stride[0] = 1;
220   count[0] = 25;
221   H5->SelectHyperSlab( start, stride, count );
222   H5->Open( kDatasetName, "w" );
223   H5->Write( MyData );
224   H5->Close();
225 
226   bool failure = false;
227 
228   XdmfHDF* H5In = new XdmfHDF();
229   H5In->setReadCallback( cb );
230   H5In->setOpenCallback( cb );
231   H5In->Open( kDatasetName, "r" );
232   XdmfArray* result = H5In->Read();
233 
234   if ( result ) {
235     for ( size_t i = 0; i < 100; ++i ) {
236       float value = result->GetValueAsFloat32( i );
237       std::cout << i << " " << value << std::endl;
238       failure = ( value != i );
239     }
240   }
241 
242   delete H5;
243   delete cb;
244   delete MyData;
245   delete H5In;
246   delete result;
247 
248   MPI_Finalize();
249 
250   if ( failure ) {
251     return -1;
252   } else {
253     return 0;
254   }
255 };
256 
257