1''' 2Unit test for the low level vds interface for eiger 3https://support.hdfgroup.org/HDF5/docNewFeatures/VDS/HDF5-VDS-requirements-use-cases-2014-12-10.pdf 4''' 5 6 7from ..common import ut 8import numpy as np 9import h5py as h5 10import tempfile 11 12 13@ut.skipUnless(h5.version.hdf5_version_tuple >= (1, 9, 233), 14 'VDS requires HDF5 >= 1.9.233') 15class TestEigerLowLevel(ut.TestCase): 16 def setUp(self): 17 self.working_dir = tempfile.mkdtemp() 18 self.fname = ['raw_file_1.h5', 'raw_file_2.h5', 'raw_file_3.h5'] 19 k = 0 20 for outfile in self.fname: 21 filename = self.working_dir + outfile 22 f = h5.File(filename, 'w') 23 f['data'] = np.ones((20, 200, 200))*k 24 k += 1 25 f.close() 26 27 f = h5.File(self.working_dir+'raw_file_4.h5', 'w') 28 f['data'] = np.ones((18, 200, 200))*3 29 self.fname.append('raw_file_4.h5') 30 self.fname = [self.working_dir+ix for ix in self.fname] 31 f.close() 32 33 def test_eiger_low_level(self): 34 self.outfile = self.working_dir + 'eiger.h5' 35 with h5.File(self.outfile, 'w', libver='latest') as f: 36 vdset_shape = (78, 200, 200) 37 vdset_max_shape = vdset_shape 38 virt_dspace = h5.h5s.create_simple(vdset_shape, vdset_max_shape) 39 dcpl = h5.h5p.create(h5.h5p.DATASET_CREATE) 40 dcpl.set_fill_value(np.array([-1])) 41 # Create the source dataset dataspace 42 k = 0 43 for foo in self.fname: 44 in_data = h5.File(foo, 'r')['data'] 45 src_shape = in_data.shape 46 max_src_shape = src_shape 47 in_data.file.close() 48 src_dspace = h5.h5s.create_simple(src_shape, max_src_shape) 49 # Select the source dataset hyperslab 50 src_dspace.select_hyperslab(start=(0, 0, 0), 51 stride=(1, 1, 1), 52 count=(1, 1, 1), 53 block=src_shape) 54 55 virt_dspace.select_hyperslab(start=(k, 0, 0), 56 stride=(1, 1, 1), 57 count=(1, 1, 1), 58 block=src_shape) 59 60 dcpl.set_virtual(virt_dspace, foo.encode('utf-8'), 61 b'data', src_dspace) 62 k += src_shape[0] 63 64 # Create the virtual dataset 65 h5.h5d.create(f.id, name=b"data", tid=h5.h5t.NATIVE_INT16, 66 space=virt_dspace, dcpl=dcpl) 67 68 f = h5.File(self.outfile, 'r')['data'] 69 self.assertEqual(f[10, 100, 10], 0.0) 70 self.assertEqual(f[30, 100, 100], 1.0) 71 self.assertEqual(f[50, 100, 100], 2.0) 72 self.assertEqual(f[70, 100, 100], 3.0) 73 f.file.close() 74 75 def tearDown(self): 76 import os 77 for f in self.fname: 78 os.remove(f) 79 os.remove(self.outfile) 80 81 82if __name__ == "__main__": 83 ut.main() 84''' 85Unit test for the low level vds interface for excalibur 86https://support.hdfgroup.org/HDF5/docNewFeatures/VDS/HDF5-VDS-requirements-use-cases-2014-12-10.pdf 87''' 88 89 90class ExcaliburData(object): 91 FEM_PIXELS_PER_CHIP_X = 256 92 FEM_PIXELS_PER_CHIP_Y = 256 93 FEM_CHIPS_PER_STRIPE_X = 8 94 FEM_CHIPS_PER_STRIPE_Y = 1 95 FEM_STRIPES_PER_MODULE = 2 96 97 @property 98 def sensor_module_dimensions(self): 99 x_pixels = self.FEM_PIXELS_PER_CHIP_X * self.FEM_CHIPS_PER_STRIPE_X 100 y_pixels = self.FEM_PIXELS_PER_CHIP_Y * self.FEM_CHIPS_PER_STRIPE_Y * self.FEM_STRIPES_PER_MODULE 101 return y_pixels, x_pixels, 102 103 @property 104 def fem_stripe_dimensions(self): 105 x_pixels = self.FEM_PIXELS_PER_CHIP_X * self.FEM_CHIPS_PER_STRIPE_X 106 y_pixels = self.FEM_PIXELS_PER_CHIP_Y * self.FEM_CHIPS_PER_STRIPE_Y 107 return y_pixels, x_pixels, 108 109 def generate_sensor_module_image(self, value, dtype='uint16'): 110 dset = np.empty(shape=self.sensor_module_dimensions, dtype=dtype) 111 dset.fill(value) 112 return dset 113 114 def generate_fem_stripe_image(self, value, dtype='uint16'): 115 dset = np.empty(shape=self.fem_stripe_dimensions, dtype=dtype) 116 dset.fill(value) 117 return dset 118 119 120@ut.skipUnless(h5.version.hdf5_version_tuple >= (1, 9, 233), 121 'VDS requires HDF5 >= 1.9.233') 122class TestExcaliburLowLevel(ut.TestCase): 123 def create_excalibur_fem_stripe_datafile(self, fname, nframes, excalibur_data,scale): 124 shape = (nframes,) + excalibur_data.fem_stripe_dimensions 125 max_shape = (nframes,) + excalibur_data.fem_stripe_dimensions 126 chunk = (1,) + excalibur_data.fem_stripe_dimensions 127 with h5.File(fname, 'w', libver='latest') as f: 128 dset = f.create_dataset('data', shape=shape, maxshape=max_shape, chunks=chunk, dtype='uint16') 129 for data_value_index in np.arange(nframes): 130 dset[data_value_index] = excalibur_data.generate_fem_stripe_image(data_value_index*scale) 131 132 def setUp(self): 133 self.working_dir = tempfile.mkdtemp() 134 self.fname = ["stripe_%d.h5" % stripe for stripe in range(1,7)] 135 self.fname = [self.working_dir+ix for ix in self.fname] 136 nframes = 5 137 self.edata = ExcaliburData() 138 k=0 139 for raw_file in self.fname: 140 self.create_excalibur_fem_stripe_datafile(raw_file, nframes, self.edata,k) 141 k+=1 142 143 def test_excalibur_low_level(self): 144 145 excalibur_data = self.edata 146 self.outfile = self.working_dir+'excalibur.h5' 147 vdset_stripe_shape = (1,) + excalibur_data.fem_stripe_dimensions 148 vdset_stripe_max_shape = (5, ) + excalibur_data.fem_stripe_dimensions 149 vdset_shape = (5, 150 excalibur_data.fem_stripe_dimensions[0] * len(self.fname) + (10 * (len(self.fname)-1)), 151 excalibur_data.fem_stripe_dimensions[1]) 152 vdset_max_shape = (5, 153 excalibur_data.fem_stripe_dimensions[0] * len(self.fname) + (10 * (len(self.fname)-1)), 154 excalibur_data.fem_stripe_dimensions[1]) 155 vdset_y_offset = 0 156 157 # Create the virtual dataset file 158 with h5.File(self.outfile, 'w', libver='latest') as f: 159 160 # Create the source dataset dataspace 161 src_dspace = h5.h5s.create_simple(vdset_stripe_shape, vdset_stripe_max_shape) 162 # Create the virtual dataset dataspace 163 virt_dspace = h5.h5s.create_simple(vdset_shape, vdset_max_shape) 164 165 # Create the virtual dataset property list 166 dcpl = h5.h5p.create(h5.h5p.DATASET_CREATE) 167 dcpl.set_fill_value(np.array([0x01])) 168 169 # Select the source dataset hyperslab 170 src_dspace.select_hyperslab(start=(0, 0, 0), count=(1, 1, 1), block=vdset_stripe_max_shape) 171 172 for raw_file in self.fname: 173 # Select the virtual dataset hyperslab (for the source dataset) 174 virt_dspace.select_hyperslab(start=(0, vdset_y_offset, 0), 175 count=(1, 1, 1), 176 block=vdset_stripe_max_shape) 177 # Set the virtual dataset hyperslab to point to the real first dataset 178 dcpl.set_virtual(virt_dspace, raw_file.encode('utf-8'), 179 b"/data", src_dspace) 180 vdset_y_offset += vdset_stripe_shape[1] + 10 181 182 # Create the virtual dataset 183 dset = h5.h5d.create(f.id, name=b"data", 184 tid=h5.h5t.NATIVE_INT16, space=virt_dspace, dcpl=dcpl) 185 assert(f['data'].fillvalue == 0x01) 186 187 f = h5.File(self.outfile,'r')['data'] 188 self.assertEqual(f[3,100,0], 0.0) 189 self.assertEqual(f[3,260,0], 1.0) 190 self.assertEqual(f[3,350,0], 3.0) 191 self.assertEqual(f[3,650,0], 6.0) 192 self.assertEqual(f[3,900,0], 9.0) 193 self.assertEqual(f[3,1150,0], 12.0) 194 self.assertEqual(f[3,1450,0], 15.0) 195 f.file.close() 196 197 def tearDown(self): 198 import os 199 for f in self.fname: 200 os.remove(f) 201 os.remove(self.outfile) 202 203''' 204Unit test for the low level vds interface for percival 205https://support.hdfgroup.org/HDF5/docNewFeatures/VDS/HDF5-VDS-requirements-use-cases-2014-12-10.pdf 206''' 207 208 209@ut.skipUnless(h5.version.hdf5_version_tuple >= (1, 9, 233), 210 'VDS requires HDF5 >= 1.9.233') 211class TestPercivalLowLevel(ut.TestCase): 212 213 def setUp(self): 214 self.working_dir = tempfile.mkdtemp() 215 self.fname = ['raw_file_1.h5','raw_file_2.h5','raw_file_3.h5'] 216 k = 0 217 for outfile in self.fname: 218 filename = self.working_dir + outfile 219 f = h5.File(filename,'w') 220 f['data'] = np.ones((20,200,200))*k 221 k +=1 222 f.close() 223 224 f = h5.File(self.working_dir+'raw_file_4.h5','w') 225 f['data'] = np.ones((19,200,200))*3 226 self.fname.append('raw_file_4.h5') 227 self.fname = [self.working_dir+ix for ix in self.fname] 228 f.close() 229 230 def test_percival_low_level(self): 231 self.outfile = self.working_dir + 'percival.h5' 232 with h5.File(self.outfile, 'w', libver='latest') as f: 233 vdset_shape = (1,200,200) 234 num = h5.h5s.UNLIMITED 235 vdset_max_shape = (num,)+vdset_shape[1:] 236 virt_dspace = h5.h5s.create_simple(vdset_shape, vdset_max_shape) 237 dcpl = h5.h5p.create(h5.h5p.DATASET_CREATE) 238 dcpl.set_fill_value(np.array([-1])) 239 # Create the source dataset dataspace 240 k = 0 241 for foo in self.fname: 242 in_data = h5.File(foo, 'r')['data'] 243 src_shape = in_data.shape 244 max_src_shape = (num,)+src_shape[1:] 245 in_data.file.close() 246 src_dspace = h5.h5s.create_simple(src_shape, max_src_shape) 247 # Select the source dataset hyperslab 248 src_dspace.select_hyperslab(start=(0, 0, 0), 249 stride=(1,1,1), 250 count=(num, 1, 1), 251 block=(1,)+src_shape[1:]) 252 253 virt_dspace.select_hyperslab(start=(k, 0, 0), 254 stride=(4,1,1), 255 count=(num, 1, 1), 256 block=(1,)+src_shape[1:]) 257 258 dcpl.set_virtual(virt_dspace, foo.encode('utf-8'), b'data', src_dspace) 259 k+=1 260 261 # Create the virtual dataset 262 dset = h5.h5d.create(f.id, name=b"data", tid=h5.h5t.NATIVE_INT16, space=virt_dspace, dcpl=dcpl) 263 264 f = h5.File(self.outfile,'r') 265 sh = f['data'].shape 266 line = f['data'][:8,100,100] 267 foo = np.array(2*list(range(4))) 268 f.close() 269 self.assertEqual(sh,(79,200,200),) 270 np.testing.assert_array_equal(line,foo) 271 272 def tearDown(self): 273 import os 274 for f in self.fname: 275 os.remove(f) 276 os.remove(self.outfile) 277 278 279@ut.skipUnless(h5.version.hdf5_version_tuple >= (1, 10, 2), 280 'get_ / set_virtual_prefix requires HDF5 >= 1.10.2') 281def test_virtual_prefix(tmp_path): 282 (tmp_path / 'a').mkdir() 283 (tmp_path / 'b').mkdir() 284 src_file = h5.File(tmp_path / 'a' / 'src.h5', 'w') 285 src_file['data'] = np.arange(10) 286 287 vds_file = h5.File(tmp_path / 'b' / 'vds.h5', 'w') 288 layout = h5.VirtualLayout(shape=(10,), dtype=np.int64) 289 layout[:] = h5.VirtualSource('src.h5', 'data', shape=(10,)) 290 vds_file.create_virtual_dataset('data', layout, fillvalue=-1) 291 292 # Path doesn't resolve 293 np.testing.assert_array_equal(vds_file['data'], np.full(10, fill_value=-1)) 294 295 path_a = bytes(tmp_path / 'a') 296 dapl = h5.h5p.create(h5.h5p.DATASET_ACCESS) 297 dapl.set_virtual_prefix(path_a) 298 vds_id = h5.h5d.open(vds_file.id, b'data', dapl=dapl) 299 vds = h5.Dataset(vds_id) 300 301 # Now it should find the source file and read the data correctly 302 np.testing.assert_array_equal(vds[:], np.arange(10)) 303 # Check that get_virtual_prefix gives back what we put in 304 assert vds.id.get_access_plist().get_virtual_prefix() == path_a 305