1from contextlib import contextmanager
2
3from yt.utilities.on_demand_imports import NotAModule, _h5py as h5py
4
5
6def valid_hdf5_signature(fn):
7    signature = b"\x89HDF\r\n\x1a\n"
8    try:
9        with open(fn, "rb") as f:
10            header = f.read(8)
11            return header == signature
12    except Exception:
13        return False
14
15
16def warn_h5py(fn):
17    needs_h5py = valid_hdf5_signature(fn)
18    if needs_h5py and isinstance(h5py.File, NotAModule):
19        raise RuntimeError(
20            "This appears to be an HDF5 file, but h5py is not installed."
21        )
22
23
24class HDF5FileHandler:
25    handle = None
26
27    def __init__(self, filename):
28        self.handle = h5py.File(filename, mode="r")
29
30    def __getitem__(self, key):
31        return self.handle[key]
32
33    def __contains__(self, item):
34        return item in self.handle
35
36    def __len__(self):
37        return len(self.handle)
38
39    @property
40    def attrs(self):
41        return self.handle.attrs
42
43    def keys(self):
44        return list(self.handle.keys())
45
46    def items(self):
47        return list(self.handle.items())
48
49    def close(self):
50        if self.handle is not None:
51            self.handle.close()
52
53
54class FITSFileHandler(HDF5FileHandler):
55    def __init__(self, filename):
56        from yt.utilities.on_demand_imports import _astropy
57
58        if isinstance(filename, _astropy.pyfits.hdu.image._ImageBaseHDU):
59            self.handle = _astropy.pyfits.HDUList(filename)
60        elif isinstance(filename, _astropy.pyfits.HDUList):
61            self.handle = filename
62        else:
63            self.handle = _astropy.pyfits.open(
64                filename, memmap=True, do_not_scale_image_data=True, ignore_blank=True
65            )
66        self._fits_files = []
67
68    def __del__(self):
69        for f in self._fits_files:
70            f.close()
71        del self._fits_files
72        del self.handle
73        self.handle = None
74
75    def close(self):
76        self.handle.close()
77
78
79def valid_netcdf_classic_signature(filename):
80    signature_v1 = b"CDF\x01"
81    signature_v2 = b"CDF\x02"
82    try:
83        with open(filename, "rb") as f:
84            header = f.read(4)
85            return header == signature_v1 or header == signature_v2
86    except Exception:
87        return False
88
89
90def warn_netcdf(fn):
91    # There are a few variants of the netCDF format.
92    classic = valid_netcdf_classic_signature(fn)
93    # NetCDF-4 Classic files are HDF5 files constrained to the Classic
94    # data model used by netCDF-3.
95    netcdf4_classic = valid_hdf5_signature(fn) and fn.endswith((".nc", ".nc4"))
96    needs_netcdf = classic or netcdf4_classic
97    from yt.utilities.on_demand_imports import _netCDF4 as netCDF4
98
99    if needs_netcdf and isinstance(netCDF4.Dataset, NotAModule):
100        raise RuntimeError(
101            "This appears to be a netCDF file, but the "
102            "python bindings for netCDF4 are not installed."
103        )
104
105
106class NetCDF4FileHandler:
107    def __init__(self, filename):
108        self.filename = filename
109
110    @contextmanager
111    def open_ds(self, **kwargs):
112        from yt.utilities.on_demand_imports import _netCDF4 as netCDF4
113
114        ds = netCDF4.Dataset(self.filename, mode="r", **kwargs)
115        yield ds
116        ds.close()
117