1# This file is part of h5py, a Python interface to the HDF5 library.
2#
3# http://www.h5py.org
4#
5# Copyright 2008-2013 Andrew Collette and contributors
6#
7# License:  Standard 3-clause BSD; see "license.txt" for full license terms
8#           and contributor agreement.
9
10"""
11    Tests the h5py._hl.filters module.
12
13"""
14import os
15import numpy as np
16import h5py
17import pytest
18
19from .common import ut, TestCase, insubprocess
20
21
22class TestFilters(TestCase):
23
24    def setUp(self):
25        """ like TestCase.setUp but also store the file path """
26        self.path = self.mktemp()
27        self.f = h5py.File(self.path, 'w')
28
29    @ut.skipUnless(h5py.h5z.filter_avail(h5py.h5z.FILTER_SZIP), 'szip filter required')
30    def test_wr_szip_fletcher32_64bit(self):
31        """ test combination of szip, fletcher32, and 64bit arrays
32
33        The fletcher32 checksum must be computed after the szip
34        compression is applied.
35
36        References:
37        - GitHub issue #953
38        - https://lists.hdfgroup.org/pipermail/
39          hdf-forum_lists.hdfgroup.org/2018-January/010753.html
40        """
41        self.f.create_dataset("test_data",
42                              data=np.zeros(10000, dtype=np.float64),
43                              fletcher32=True,
44                              compression="szip",
45                              )
46        self.f.close()
47
48        with h5py.File(self.path, "r") as h5:
49            # Access the data which will compute the fletcher32
50            # checksum and raise an OSError if something is wrong.
51            h5["test_data"][0]
52
53    def test_wr_scaleoffset_fletcher32(self):
54        """ make sure that scaleoffset + fletcher32 is prevented
55        """
56        data = np.linspace(0, 1, 100)
57        with self.assertRaises(ValueError):
58            self.f.create_dataset("test_data",
59                                  data=data,
60                                  fletcher32=True,
61                                  # retain 3 digits after the decimal point
62                                  scaleoffset=3,
63                                  )
64
65
66@ut.skipIf('gzip' not in h5py.filters.encode, "DEFLATE is not installed")
67def test_filter_ref_obj(writable_file):
68    gzip8 = h5py.filters.Gzip(level=8)
69    # **kwargs unpacking (compatible with earlier h5py versions)
70    assert dict(**gzip8) == {
71        'compression': h5py.h5z.FILTER_DEFLATE,
72        'compression_opts': (8,)
73    }
74
75    # Pass object as compression argument (new in h5py 3.0)
76    ds = writable_file.create_dataset(
77        'x', shape=(100,), dtype=np.uint32, compression=gzip8
78    )
79    assert ds.compression == 'gzip'
80    assert ds.compression_opts == 8
81
82
83def test_filter_ref_obj_eq():
84    gzip8 = h5py.filters.Gzip(level=8)
85
86    assert gzip8 == h5py.filters.Gzip(level=8)
87    assert gzip8 != h5py.filters.Gzip(level=7)
88
89
90@pytest.mark.mpi_skip
91@insubprocess
92def test_unregister_filter(request):
93    if h5py.h5z.filter_avail(h5py.h5z.FILTER_LZF):
94        res = h5py.h5z.unregister_filter(h5py.h5z.FILTER_LZF)
95        assert res
96
97
98@ut.skipIf(not os.getenv('H5PY_TEST_CHECK_FILTERS'),  "H5PY_TEST_CHECK_FILTERS not set")
99def test_filters_available():
100    assert 'gzip' in h5py.filters.decode
101    assert 'gzip' in h5py.filters.encode
102    assert 'lzf' in h5py.filters.decode
103    assert 'lzf' in h5py.filters.encode
104