1import os
2
3import numpy as np
4
5from yt.config import ytcfg
6from yt.fields.derived_field import DerivedField
7from yt.funcs import mylog, only_on_root, parse_h5_attr
8from yt.units.yt_array import YTArray, YTQuantity
9from yt.utilities.cosmology import Cosmology
10from yt.utilities.exceptions import YTException, YTFieldNotFound
11from yt.utilities.linear_interpolators import (
12    BilinearFieldInterpolator,
13    UnilinearFieldInterpolator,
14)
15from yt.utilities.on_demand_imports import _h5py as h5py
16
17data_version = {"cloudy": 2, "apec": 3}
18
19data_url = "http://yt-project.org/data"
20
21
22def _get_data_file(table_type, data_dir=None):
23    data_file = "%s_emissivity_v%d.h5" % (table_type, data_version[table_type])
24    if data_dir is None:
25        supp_data_dir = ytcfg.get("yt", "supp_data_dir")
26        data_dir = supp_data_dir if os.path.exists(supp_data_dir) else "."
27    data_path = os.path.join(data_dir, data_file)
28    if not os.path.exists(data_path):
29        msg = "Failed to find emissivity data file {}! Please download from {}".format(
30            data_file,
31            data_url,
32        )
33        mylog.error(msg)
34        raise OSError(msg)
35    return data_path
36
37
38class EnergyBoundsException(YTException):
39    def __init__(self, lower, upper):
40        self.lower = lower
41        self.upper = upper
42
43    def __str__(self):
44        return f"Energy bounds are {self.lower:e} to {self.upper:e} keV."
45
46
47class ObsoleteDataException(YTException):
48    def __init__(self, table_type):
49        data_file = "%s_emissivity_v%d.h5" % (table_type, data_version[table_type])
50        self.msg = "X-ray emissivity data is out of date.\n"
51        self.msg += f"Download the latest data from {data_url}/{data_file}."
52
53    def __str__(self):
54        return self.msg
55
56
57class XrayEmissivityIntegrator:
58    r"""Class for making X-ray emissivity fields. Uses hdf5 data tables
59    generated from Cloudy and AtomDB/APEC.
60
61    Initialize an XrayEmissivityIntegrator object.
62
63    Parameters
64    ----------
65    table_type : string
66        The type of data to use when computing the emissivity values. If "cloudy",
67        a file called "cloudy_emissivity.h5" is used, for photoionized
68        plasmas. If, "apec", a file called "apec_emissivity.h5" is used for
69        collisionally ionized plasmas. These files contain emissivity tables
70        for primordial elements and for metals at solar metallicity for the
71        energy range 0.1 to 100 keV.
72    redshift : float, optional
73        The cosmological redshift of the source of the field. Default: 0.0.
74    data_dir : string, optional
75        The location to look for the data table in. If not supplied, the file
76        will be looked for in the location of the YT_DEST environment variable
77        or in the current working directory.
78    use_metals : boolean, optional
79        If set to True, the emissivity will include contributions from metals.
80        Default: True
81    """
82
83    def __init__(self, table_type, redshift=0.0, data_dir=None, use_metals=True):
84
85        filename = _get_data_file(table_type, data_dir=data_dir)
86        only_on_root(mylog.info, "Loading emissivity data from %s", filename)
87        in_file = h5py.File(filename, mode="r")
88        if "info" in in_file.attrs:
89            only_on_root(mylog.info, parse_h5_attr(in_file, "info"))
90        if parse_h5_attr(in_file, "version") != data_version[table_type]:
91            raise ObsoleteDataException(table_type)
92        else:
93            only_on_root(
94                mylog.info,
95                "X-ray '%s' emissivity data version: %s."
96                % (table_type, parse_h5_attr(in_file, "version")),
97            )
98
99        self.log_T = in_file["log_T"][:]
100        self.emissivity_primordial = in_file["emissivity_primordial"][:]
101        if "log_nH" in in_file:
102            self.log_nH = in_file["log_nH"][:]
103        if use_metals:
104            self.emissivity_metals = in_file["emissivity_metals"][:]
105        self.ebin = YTArray(in_file["E"], "keV")
106        in_file.close()
107        self.dE = np.diff(self.ebin)
108        self.emid = 0.5 * (self.ebin[1:] + self.ebin[:-1]).to("erg")
109        self.redshift = redshift
110
111    def get_interpolator(self, data_type, e_min, e_max, energy=True):
112        data = getattr(self, f"emissivity_{data_type}")
113        if not energy:
114            data = data[..., :] / self.emid.v
115        e_min = YTQuantity(e_min, "keV") * (1.0 + self.redshift)
116        e_max = YTQuantity(e_max, "keV") * (1.0 + self.redshift)
117        if (e_min - self.ebin[0]) / e_min < -1e-3 or (
118            e_max - self.ebin[-1]
119        ) / e_max > 1e-3:
120            raise EnergyBoundsException(self.ebin[0], self.ebin[-1])
121        e_is, e_ie = np.digitize([e_min, e_max], self.ebin)
122        e_is = np.clip(e_is - 1, 0, self.ebin.size - 1)
123        e_ie = np.clip(e_ie, 0, self.ebin.size - 1)
124
125        my_dE = self.dE[e_is:e_ie].copy()
126        # clip edge bins if the requested range is smaller
127        my_dE[0] -= e_min - self.ebin[e_is]
128        my_dE[-1] -= self.ebin[e_ie] - e_max
129
130        interp_data = (data[..., e_is:e_ie] * my_dE).sum(axis=-1)
131        if data.ndim == 2:
132            emiss = UnilinearFieldInterpolator(
133                np.log10(interp_data),
134                [self.log_T[0], self.log_T[-1]],
135                "log_T",
136                truncate=True,
137            )
138        else:
139            emiss = BilinearFieldInterpolator(
140                np.log10(interp_data),
141                [self.log_nH[0], self.log_nH[-1], self.log_T[0], self.log_T[-1]],
142                ["log_nH", "log_T"],
143                truncate=True,
144            )
145
146        return emiss
147
148
149def add_xray_emissivity_field(
150    ds,
151    e_min,
152    e_max,
153    redshift=0.0,
154    metallicity=("gas", "metallicity"),
155    table_type="cloudy",
156    data_dir=None,
157    cosmology=None,
158    dist=None,
159    ftype="gas",
160):
161    r"""Create X-ray emissivity fields for a given energy range.
162
163    Parameters
164    ----------
165    e_min : float
166        The minimum energy in keV for the energy band.
167    e_min : float
168        The maximum energy in keV for the energy band.
169    redshift : float, optional
170        The cosmological redshift of the source of the field. Default: 0.0.
171    metallicity : str or tuple of str or float, optional
172        Either the name of a metallicity field or a single floating-point
173        number specifying a spatially constant metallicity. Must be in
174        solar units. If set to None, no metals will be assumed. Default:
175        ("gas", "metallicity")
176    table_type : string, optional
177        The type of emissivity table to be used when creating the fields.
178        Options are "cloudy" or "apec". Default: "cloudy"
179    data_dir : string, optional
180        The location to look for the data table in. If not supplied, the file
181        will be looked for in the location of the YT_DEST environment variable
182        or in the current working directory.
183    cosmology : :class:`~yt.utilities.cosmology.Cosmology`, optional
184        If set and redshift > 0.0, this cosmology will be used when computing the
185        cosmological dependence of the emission fields. If not set, yt's default
186        LCDM cosmology will be used.
187    dist : (value, unit) tuple or :class:`~yt.units.yt_array.YTQuantity`, optional
188        The distance to the source, used for making intensity fields. You should
189        only use this if your source is nearby (not cosmological). Default: None
190    ftype : string, optional
191        The field type to use when creating the fields, default "gas"
192
193    This will create at least three fields:
194
195    "xray_emissivity_{e_min}_{e_max}_keV" (erg s^-1 cm^-3)
196    "xray_luminosity_{e_min}_{e_max}_keV" (erg s^-1)
197    "xray_photon_emissivity_{e_min}_{e_max}_keV" (photons s^-1 cm^-3)
198
199    and if a redshift or distance is specified it will create two others:
200
201    "xray_intensity_{e_min}_{e_max}_keV" (erg s^-1 cm^-3 arcsec^-2)
202    "xray_photon_intensity_{e_min}_{e_max}_keV" (photons s^-1 cm^-3 arcsec^-2)
203
204    These latter two are really only useful when making projections.
205
206    Examples
207    --------
208
209    >>> import yt
210    >>> ds = yt.load("sloshing_nomag2_hdf5_plt_cnt_0100")
211    >>> yt.add_xray_emissivity_field(ds, 0.5, 2)
212    >>> p = yt.ProjectionPlot(
213    ...     ds, "x", ("gas", "xray_emissivity_0.5_2_keV"), table_type="apec"
214    ... )
215    >>> p.save()
216    """
217    if not isinstance(metallicity, float) and metallicity is not None:
218        try:
219            metallicity = ds._get_field_info(*metallicity)
220        except YTFieldNotFound as e:
221            raise RuntimeError(
222                f"Your dataset does not have a {metallicity} field! "
223                + "Perhaps you should specify a constant metallicity instead?"
224            ) from e
225
226    if table_type == "cloudy":
227        # Cloudy wants to scale by nH**2
228        other_n = "H_nuclei_density"
229    else:
230        # APEC wants to scale by nH*ne
231        other_n = "El_number_density"
232
233    def _norm_field(field, data):
234        return data[ftype, "H_nuclei_density"] * data[ftype, other_n]
235
236    ds.add_field(
237        (ftype, "norm_field"), _norm_field, units="cm**-6", sampling_type="local"
238    )
239
240    my_si = XrayEmissivityIntegrator(table_type, data_dir=data_dir, redshift=redshift)
241
242    em_0 = my_si.get_interpolator("primordial", e_min, e_max)
243    emp_0 = my_si.get_interpolator("primordial", e_min, e_max, energy=False)
244    if metallicity is not None:
245        em_Z = my_si.get_interpolator("metals", e_min, e_max)
246        emp_Z = my_si.get_interpolator("metals", e_min, e_max, energy=False)
247
248    def _emissivity_field(field, data):
249        with np.errstate(all="ignore"):
250            dd = {
251                "log_nH": np.log10(data[ftype, "H_nuclei_density"]),
252                "log_T": np.log10(data[ftype, "temperature"]),
253            }
254
255        my_emissivity = np.power(10, em_0(dd))
256        if metallicity is not None:
257            if isinstance(metallicity, DerivedField):
258                my_Z = data[metallicity.name].to("Zsun")
259            else:
260                my_Z = metallicity
261            my_emissivity += my_Z * np.power(10, em_Z(dd))
262
263        my_emissivity[np.isnan(my_emissivity)] = 0
264
265        return data[ftype, "norm_field"] * YTArray(my_emissivity, "erg*cm**3/s")
266
267    emiss_name = (ftype, f"xray_emissivity_{e_min}_{e_max}_keV")
268    ds.add_field(
269        emiss_name,
270        function=_emissivity_field,
271        display_name=fr"\epsilon_{{X}} ({e_min}-{e_max} keV)",
272        sampling_type="local",
273        units="erg/cm**3/s",
274    )
275
276    def _luminosity_field(field, data):
277        return data[emiss_name] * data[ftype, "mass"] / data[ftype, "density"]
278
279    lum_name = (ftype, f"xray_luminosity_{e_min}_{e_max}_keV")
280    ds.add_field(
281        lum_name,
282        function=_luminosity_field,
283        display_name=fr"\rm{{L}}_{{X}} ({e_min}-{e_max} keV)",
284        sampling_type="local",
285        units="erg/s",
286    )
287
288    def _photon_emissivity_field(field, data):
289        dd = {
290            "log_nH": np.log10(data[ftype, "H_nuclei_density"]),
291            "log_T": np.log10(data[ftype, "temperature"]),
292        }
293
294        my_emissivity = np.power(10, emp_0(dd))
295        if metallicity is not None:
296            if isinstance(metallicity, DerivedField):
297                my_Z = data[metallicity.name].to("Zsun")
298            else:
299                my_Z = metallicity
300            my_emissivity += my_Z * np.power(10, emp_Z(dd))
301
302        return data[ftype, "norm_field"] * YTArray(my_emissivity, "photons*cm**3/s")
303
304    phot_name = (ftype, f"xray_photon_emissivity_{e_min}_{e_max}_keV")
305    ds.add_field(
306        phot_name,
307        function=_photon_emissivity_field,
308        display_name=fr"\epsilon_{{X}} ({e_min}-{e_max} keV)",
309        sampling_type="local",
310        units="photons/cm**3/s",
311    )
312
313    fields = [emiss_name, lum_name, phot_name]
314
315    if redshift > 0.0 or dist is not None:
316
317        if dist is None:
318            if cosmology is None:
319                if hasattr(ds, "cosmology"):
320                    cosmology = ds.cosmology
321                else:
322                    cosmology = Cosmology()
323            D_L = cosmology.luminosity_distance(0.0, redshift)
324            angular_scale = 1.0 / cosmology.angular_scale(0.0, redshift)
325            dist_fac = ds.quan(
326                1.0 / (4.0 * np.pi * D_L * D_L * angular_scale * angular_scale).v,
327                "rad**-2",
328            )
329        else:
330            redshift = 0.0  # Only for local sources!
331            try:
332                # normal behaviour, if dist is a YTQuantity
333                dist = ds.quan(dist.value, dist.units)
334            except AttributeError as e:
335                try:
336                    dist = ds.quan(*dist)
337                except (RuntimeError, TypeError):
338                    raise TypeError(
339                        "dist should be a YTQuantity or a (value, unit) tuple!"
340                    ) from e
341
342            angular_scale = dist / ds.quan(1.0, "radian")
343            dist_fac = ds.quan(
344                1.0 / (4.0 * np.pi * dist * dist * angular_scale * angular_scale).v,
345                "rad**-2",
346            )
347
348        ei_name = (ftype, f"xray_intensity_{e_min}_{e_max}_keV")
349
350        def _intensity_field(field, data):
351            I = dist_fac * data[emiss_name]
352            return I.in_units("erg/cm**3/s/arcsec**2")
353
354        ds.add_field(
355            ei_name,
356            function=_intensity_field,
357            display_name=fr"I_{{X}} ({e_min}-{e_max} keV)",
358            sampling_type="local",
359            units="erg/cm**3/s/arcsec**2",
360        )
361
362        i_name = (ftype, f"xray_photon_intensity_{e_min}_{e_max}_keV")
363
364        def _photon_intensity_field(field, data):
365            I = (1.0 + redshift) * dist_fac * data[phot_name]
366            return I.in_units("photons/cm**3/s/arcsec**2")
367
368        ds.add_field(
369            i_name,
370            function=_photon_intensity_field,
371            display_name=fr"I_{{X}} ({e_min}-{e_max} keV)",
372            sampling_type="local",
373            units="photons/cm**3/s/arcsec**2",
374        )
375
376        fields += [ei_name, i_name]
377
378    for field in fields:
379        mylog.info("Adding ('%s','%s') field.", field[0], field[1])
380
381    return fields
382