1'''
2Unit test for the low level vds interface for eiger
3https://support.hdfgroup.org/HDF5/docNewFeatures/VDS/HDF5-VDS-requirements-use-cases-2014-12-10.pdf
4'''
5
6
7from ..common import ut
8import numpy as np
9import h5py as h5
10import tempfile
11
12
13@ut.skipUnless(h5.version.hdf5_version_tuple >= (1, 9, 233),
14               'VDS requires HDF5 >= 1.9.233')
15class TestEigerLowLevel(ut.TestCase):
16    def setUp(self):
17        self.working_dir = tempfile.mkdtemp()
18        self.fname = ['raw_file_1.h5', 'raw_file_2.h5', 'raw_file_3.h5']
19        k = 0
20        for outfile in self.fname:
21            filename = self.working_dir + outfile
22            f = h5.File(filename, 'w')
23            f['data'] = np.ones((20, 200, 200))*k
24            k += 1
25            f.close()
26
27        f = h5.File(self.working_dir+'raw_file_4.h5', 'w')
28        f['data'] = np.ones((18, 200, 200))*3
29        self.fname.append('raw_file_4.h5')
30        self.fname = [self.working_dir+ix for ix in self.fname]
31        f.close()
32
33    def test_eiger_low_level(self):
34        self.outfile = self.working_dir + 'eiger.h5'
35        with h5.File(self.outfile, 'w', libver='latest') as f:
36            vdset_shape = (78, 200, 200)
37            vdset_max_shape = vdset_shape
38            virt_dspace = h5.h5s.create_simple(vdset_shape, vdset_max_shape)
39            dcpl = h5.h5p.create(h5.h5p.DATASET_CREATE)
40            dcpl.set_fill_value(np.array([-1]))
41            # Create the source dataset dataspace
42            k = 0
43            for foo in self.fname:
44                in_data = h5.File(foo, 'r')['data']
45                src_shape = in_data.shape
46                max_src_shape = src_shape
47                in_data.file.close()
48                src_dspace = h5.h5s.create_simple(src_shape, max_src_shape)
49                # Select the source dataset hyperslab
50                src_dspace.select_hyperslab(start=(0, 0, 0),
51                                            stride=(1, 1, 1),
52                                            count=(1, 1, 1),
53                                            block=src_shape)
54
55                virt_dspace.select_hyperslab(start=(k, 0, 0),
56                                             stride=(1, 1, 1),
57                                             count=(1, 1, 1),
58                                             block=src_shape)
59
60                dcpl.set_virtual(virt_dspace, foo.encode('utf-8'),
61                                 b'data', src_dspace)
62                k += src_shape[0]
63
64            # Create the virtual dataset
65            h5.h5d.create(f.id, name=b"data", tid=h5.h5t.NATIVE_INT16,
66                          space=virt_dspace, dcpl=dcpl)
67
68        f = h5.File(self.outfile, 'r')['data']
69        self.assertEqual(f[10, 100, 10], 0.0)
70        self.assertEqual(f[30, 100, 100], 1.0)
71        self.assertEqual(f[50, 100, 100], 2.0)
72        self.assertEqual(f[70, 100, 100], 3.0)
73        f.file.close()
74
75    def tearDown(self):
76        import os
77        for f in self.fname:
78            os.remove(f)
79        os.remove(self.outfile)
80
81
82if __name__ == "__main__":
83    ut.main()
84'''
85Unit test for the low level vds interface for excalibur
86https://support.hdfgroup.org/HDF5/docNewFeatures/VDS/HDF5-VDS-requirements-use-cases-2014-12-10.pdf
87'''
88
89
90class ExcaliburData(object):
91    FEM_PIXELS_PER_CHIP_X = 256
92    FEM_PIXELS_PER_CHIP_Y = 256
93    FEM_CHIPS_PER_STRIPE_X = 8
94    FEM_CHIPS_PER_STRIPE_Y = 1
95    FEM_STRIPES_PER_MODULE = 2
96
97    @property
98    def sensor_module_dimensions(self):
99        x_pixels = self.FEM_PIXELS_PER_CHIP_X * self.FEM_CHIPS_PER_STRIPE_X
100        y_pixels = self.FEM_PIXELS_PER_CHIP_Y * self.FEM_CHIPS_PER_STRIPE_Y * self.FEM_STRIPES_PER_MODULE
101        return y_pixels, x_pixels,
102
103    @property
104    def fem_stripe_dimensions(self):
105        x_pixels = self.FEM_PIXELS_PER_CHIP_X * self.FEM_CHIPS_PER_STRIPE_X
106        y_pixels = self.FEM_PIXELS_PER_CHIP_Y * self.FEM_CHIPS_PER_STRIPE_Y
107        return y_pixels, x_pixels,
108
109    def generate_sensor_module_image(self, value, dtype='uint16'):
110        dset = np.empty(shape=self.sensor_module_dimensions, dtype=dtype)
111        dset.fill(value)
112        return dset
113
114    def generate_fem_stripe_image(self, value, dtype='uint16'):
115        dset = np.empty(shape=self.fem_stripe_dimensions, dtype=dtype)
116        dset.fill(value)
117        return dset
118
119
120@ut.skipUnless(h5.version.hdf5_version_tuple >= (1, 9, 233),
121               'VDS requires HDF5 >= 1.9.233')
122class TestExcaliburLowLevel(ut.TestCase):
123    def create_excalibur_fem_stripe_datafile(self, fname, nframes, excalibur_data,scale):
124        shape = (nframes,) + excalibur_data.fem_stripe_dimensions
125        max_shape = (nframes,) + excalibur_data.fem_stripe_dimensions
126        chunk = (1,) + excalibur_data.fem_stripe_dimensions
127        with h5.File(fname, 'w', libver='latest') as f:
128            dset = f.create_dataset('data', shape=shape, maxshape=max_shape, chunks=chunk, dtype='uint16')
129            for data_value_index in np.arange(nframes):
130                dset[data_value_index] = excalibur_data.generate_fem_stripe_image(data_value_index*scale)
131
132    def setUp(self):
133        self.working_dir = tempfile.mkdtemp()
134        self.fname = ["stripe_%d.h5" % stripe for stripe in range(1,7)]
135        self.fname = [self.working_dir+ix for ix in self.fname]
136        nframes = 5
137        self.edata = ExcaliburData()
138        k=0
139        for raw_file in self.fname:
140            self.create_excalibur_fem_stripe_datafile(raw_file, nframes, self.edata,k)
141            k+=1
142
143    def test_excalibur_low_level(self):
144
145        excalibur_data = self.edata
146        self.outfile = self.working_dir+'excalibur.h5'
147        vdset_stripe_shape = (1,) + excalibur_data.fem_stripe_dimensions
148        vdset_stripe_max_shape = (5, ) + excalibur_data.fem_stripe_dimensions
149        vdset_shape = (5,
150                       excalibur_data.fem_stripe_dimensions[0] * len(self.fname) + (10 * (len(self.fname)-1)),
151                       excalibur_data.fem_stripe_dimensions[1])
152        vdset_max_shape = (5,
153                           excalibur_data.fem_stripe_dimensions[0] * len(self.fname) + (10 * (len(self.fname)-1)),
154                           excalibur_data.fem_stripe_dimensions[1])
155        vdset_y_offset = 0
156
157        # Create the virtual dataset file
158        with h5.File(self.outfile, 'w', libver='latest') as f:
159
160            # Create the source dataset dataspace
161            src_dspace = h5.h5s.create_simple(vdset_stripe_shape, vdset_stripe_max_shape)
162            # Create the virtual dataset dataspace
163            virt_dspace = h5.h5s.create_simple(vdset_shape, vdset_max_shape)
164
165            # Create the virtual dataset property list
166            dcpl = h5.h5p.create(h5.h5p.DATASET_CREATE)
167            dcpl.set_fill_value(np.array([0x01]))
168
169            # Select the source dataset hyperslab
170            src_dspace.select_hyperslab(start=(0, 0, 0), count=(1, 1, 1), block=vdset_stripe_max_shape)
171
172            for raw_file in self.fname:
173                # Select the virtual dataset hyperslab (for the source dataset)
174                virt_dspace.select_hyperslab(start=(0, vdset_y_offset, 0),
175                                             count=(1, 1, 1),
176                                             block=vdset_stripe_max_shape)
177                # Set the virtual dataset hyperslab to point to the real first dataset
178                dcpl.set_virtual(virt_dspace, raw_file.encode('utf-8'),
179                                 b"/data", src_dspace)
180                vdset_y_offset += vdset_stripe_shape[1] + 10
181
182            # Create the virtual dataset
183            dset = h5.h5d.create(f.id, name=b"data",
184                                 tid=h5.h5t.NATIVE_INT16, space=virt_dspace, dcpl=dcpl)
185            assert(f['data'].fillvalue == 0x01)
186
187        f = h5.File(self.outfile,'r')['data']
188        self.assertEqual(f[3,100,0], 0.0)
189        self.assertEqual(f[3,260,0], 1.0)
190        self.assertEqual(f[3,350,0], 3.0)
191        self.assertEqual(f[3,650,0], 6.0)
192        self.assertEqual(f[3,900,0], 9.0)
193        self.assertEqual(f[3,1150,0], 12.0)
194        self.assertEqual(f[3,1450,0], 15.0)
195        f.file.close()
196
197    def tearDown(self):
198        import os
199        for f in self.fname:
200            os.remove(f)
201        os.remove(self.outfile)
202
203'''
204Unit test for the low level vds interface for percival
205https://support.hdfgroup.org/HDF5/docNewFeatures/VDS/HDF5-VDS-requirements-use-cases-2014-12-10.pdf
206'''
207
208
209@ut.skipUnless(h5.version.hdf5_version_tuple >= (1, 9, 233),
210               'VDS requires HDF5 >= 1.9.233')
211class TestPercivalLowLevel(ut.TestCase):
212
213    def setUp(self):
214        self.working_dir = tempfile.mkdtemp()
215        self.fname = ['raw_file_1.h5','raw_file_2.h5','raw_file_3.h5']
216        k = 0
217        for outfile in self.fname:
218            filename = self.working_dir + outfile
219            f = h5.File(filename,'w')
220            f['data'] = np.ones((20,200,200))*k
221            k +=1
222            f.close()
223
224        f = h5.File(self.working_dir+'raw_file_4.h5','w')
225        f['data'] = np.ones((19,200,200))*3
226        self.fname.append('raw_file_4.h5')
227        self.fname = [self.working_dir+ix for ix in self.fname]
228        f.close()
229
230    def test_percival_low_level(self):
231        self.outfile = self.working_dir + 'percival.h5'
232        with h5.File(self.outfile, 'w', libver='latest') as f:
233            vdset_shape = (1,200,200)
234            num = h5.h5s.UNLIMITED
235            vdset_max_shape = (num,)+vdset_shape[1:]
236            virt_dspace = h5.h5s.create_simple(vdset_shape, vdset_max_shape)
237            dcpl = h5.h5p.create(h5.h5p.DATASET_CREATE)
238            dcpl.set_fill_value(np.array([-1]))
239            # Create the source dataset dataspace
240            k = 0
241            for foo in self.fname:
242                in_data = h5.File(foo, 'r')['data']
243                src_shape = in_data.shape
244                max_src_shape = (num,)+src_shape[1:]
245                in_data.file.close()
246                src_dspace = h5.h5s.create_simple(src_shape, max_src_shape)
247                # Select the source dataset hyperslab
248                src_dspace.select_hyperslab(start=(0, 0, 0),
249                                            stride=(1,1,1),
250                                            count=(num, 1, 1),
251                                            block=(1,)+src_shape[1:])
252
253                virt_dspace.select_hyperslab(start=(k, 0, 0),
254                                             stride=(4,1,1),
255                                             count=(num, 1, 1),
256                                             block=(1,)+src_shape[1:])
257
258                dcpl.set_virtual(virt_dspace, foo.encode('utf-8'), b'data', src_dspace)
259                k+=1
260
261            # Create the virtual dataset
262            dset = h5.h5d.create(f.id, name=b"data", tid=h5.h5t.NATIVE_INT16, space=virt_dspace, dcpl=dcpl)
263
264            f = h5.File(self.outfile,'r')
265            sh = f['data'].shape
266            line = f['data'][:8,100,100]
267            foo = np.array(2*list(range(4)))
268            f.close()
269            self.assertEqual(sh,(79,200,200),)
270            np.testing.assert_array_equal(line,foo)
271
272    def tearDown(self):
273        import os
274        for f in self.fname:
275            os.remove(f)
276        os.remove(self.outfile)
277
278
279@ut.skipUnless(h5.version.hdf5_version_tuple >= (1, 10, 2),
280               'get_ / set_virtual_prefix requires HDF5 >= 1.10.2')
281def test_virtual_prefix(tmp_path):
282    (tmp_path / 'a').mkdir()
283    (tmp_path / 'b').mkdir()
284    src_file = h5.File(tmp_path / 'a' / 'src.h5', 'w')
285    src_file['data'] = np.arange(10)
286
287    vds_file = h5.File(tmp_path / 'b' / 'vds.h5', 'w')
288    layout = h5.VirtualLayout(shape=(10,), dtype=np.int64)
289    layout[:] = h5.VirtualSource('src.h5', 'data', shape=(10,))
290    vds_file.create_virtual_dataset('data', layout, fillvalue=-1)
291
292    # Path doesn't resolve
293    np.testing.assert_array_equal(vds_file['data'], np.full(10, fill_value=-1))
294
295    path_a = bytes(tmp_path / 'a')
296    dapl = h5.h5p.create(h5.h5p.DATASET_ACCESS)
297    dapl.set_virtual_prefix(path_a)
298    vds_id = h5.h5d.open(vds_file.id, b'data', dapl=dapl)
299    vds = h5.Dataset(vds_id)
300
301    # Now it should find the source file and read the data correctly
302    np.testing.assert_array_equal(vds[:], np.arange(10))
303    # Check that get_virtual_prefix gives back what we put in
304    assert vds.id.get_access_plist().get_virtual_prefix() == path_a
305