1import numpy as np
2import pytest
3from astropy.io import ascii
4from astropy.io.ascii.qdp import _read_table_qdp, _write_table_qdp
5from astropy.io.ascii.qdp import _get_lines_from_file
6from astropy.table import Table, Column, MaskedColumn
7from astropy.utils.exceptions import AstropyUserWarning
8
9
10def test_get_tables_from_qdp_file(tmpdir):
11    example_qdp = """
12    ! Swift/XRT hardness ratio of trigger: XXXX, name: BUBU X-2
13    ! Columns are as labelled
14    READ TERR 1
15    READ SERR 2
16    ! WT -- hard data
17    !MJD            Err (pos)       Err(neg)        Rate            Error
18    53000.123456 2.37847222222222e-05    -2.37847222222222e-05   -0.212439       0.212439
19    55045.099887 1.14467592592593e-05    -1.14467592592593e-05   0.000000        0.000000
20    NO NO NO NO NO
21    ! WT -- soft data
22    !MJD            Err (pos)       Err(neg)        Rate            Error
23    53000.123456 2.37847222222222e-05    -2.37847222222222e-05   0.726155        0.583890
24    55045.099887 1.14467592592593e-05    -1.14467592592593e-05   2.410935        1.393592
25    NO NO NO NO NO
26    ! WT -- hardness ratio
27    !MJD            Err (pos)       Err(neg)        Rate            Error
28    53000.123456 2.37847222222222e-05    -2.37847222222222e-05   -0.292553       -0.374935
29    55045.099887 1.14467592592593e-05    -1.14467592592593e-05   0.000000        -nan
30    """
31
32    path = str(tmpdir.join('test.qdp'))
33
34    with open(path, "w") as fp:
35        print(example_qdp, file=fp)
36
37    table0 = _read_table_qdp(fp.name, names=["MJD", "Rate"], table_id=0)
38    assert table0.meta["initial_comments"][0].startswith("Swift")
39    assert table0.meta["comments"][0].startswith("WT -- hard data")
40    table2 = _read_table_qdp(fp.name, names=["MJD", "Rate"], table_id=2)
41    assert table2.meta["initial_comments"][0].startswith("Swift")
42    assert table2.meta["comments"][0].startswith("WT -- hardness")
43    assert np.isclose(table2["MJD_nerr"][0], -2.37847222222222e-05)
44
45
46def test_roundtrip(tmpdir):
47    example_qdp = """
48    ! Swift/XRT hardness ratio of trigger: XXXX, name: BUBU X-2
49    ! Columns are as labelled
50    READ TERR 1
51    READ SERR 2
52    ! WT -- hard data
53    !MJD            Err (pos)       Err(neg)        Rate            Error
54    53000.123456 2.37847222222222e-05    -2.37847222222222e-05   NO       0.212439
55    55045.099887 1.14467592592593e-05    -1.14467592592593e-05   0.000000        0.000000
56    NO NO NO NO NO
57    ! WT -- soft data
58    !MJD            Err (pos)       Err(neg)        Rate            Error
59    53000.123456 2.37847222222222e-05    -2.37847222222222e-05   0.726155        0.583890
60    55045.099887 1.14467592592593e-05    -1.14467592592593e-05   2.410935        1.393592
61    NO NO NO NO NO
62    ! WT -- hardness ratio
63    !MJD            Err (pos)       Err(neg)        Rate            Error
64    53000.123456 2.37847222222222e-05    -2.37847222222222e-05   -0.292553       -0.374935
65    55045.099887 1.14467592592593e-05    -1.14467592592593e-05   0.000000        NO
66    ! Add command, just to raise the warning.
67    READ TERR 1
68    ! WT -- whatever
69    !MJD            Err (pos)       Err(neg)        Rate            Error
70    53000.123456 2.37847222222222e-05    -2.37847222222222e-05   -0.292553       -0.374935
71    NO 1.14467592592593e-05    -1.14467592592593e-05   0.000000        NO
72    """
73
74    path = str(tmpdir.join('test.qdp'))
75    path2 = str(tmpdir.join('test2.qdp'))
76
77    with open(path, "w") as fp:
78        print(example_qdp, file=fp)
79    with pytest.warns(AstropyUserWarning) as record:
80        table = _read_table_qdp(path, names=["MJD", "Rate"],
81                                table_id=0)
82    assert np.any(["This file contains multiple command blocks"
83                   in r.message.args[0]
84                   for r in record])
85
86    _write_table_qdp(table, path2)
87
88    new_table = _read_table_qdp(path2, names=["MJD", "Rate"], table_id=0)
89
90    for col in new_table.colnames:
91        is_masked = np.array([np.ma.is_masked(val) for val in new_table[col]])
92        if np.any(is_masked):
93            # All NaN values are read as such.
94            assert np.ma.is_masked(table[col][is_masked])
95
96        is_nan = np.array([(not np.ma.is_masked(val) and np.isnan(val))
97                           for val in new_table[col]])
98        # All non-NaN values are the same
99        assert np.allclose(new_table[col][~is_nan], table[col][~is_nan])
100        if np.any(is_nan):
101            # All NaN values are read as such.
102            assert np.isnan(table[col][is_nan])
103    assert np.allclose(new_table['MJD_perr'], [2.378472e-05, 1.1446759e-05])
104
105    for meta_name in ['initial_comments', 'comments']:
106        assert meta_name in new_table.meta
107
108
109def test_read_example(tmpdir):
110    example_qdp = """
111        ! Initial comment line 1
112        ! Initial comment line 2
113        READ TERR 1
114        READ SERR 3
115        ! Table 0 comment
116        !a a(pos) a(neg) b c ce d
117        53000.5   0.25  -0.5   1  1.5  3.5 2
118        54000.5   1.25  -1.5   2  2.5  4.5 3
119        NO NO NO NO NO
120        ! Table 1 comment
121        !a a(pos) a(neg) b c ce d
122        54000.5   2.25  -2.5   NO  3.5  5.5 5
123        55000.5   3.25  -3.5   4  4.5  6.5 nan
124        """
125    dat = ascii.read(example_qdp, format='qdp', table_id=1,
126                     names=['a', 'b', 'c', 'd'])
127    t = Table.read(example_qdp, format='ascii.qdp', table_id=1,
128                   names=['a', 'b', 'c', 'd'])
129
130    assert np.allclose(t['a'], [54000, 55000])
131    assert t['c_err'][0] == 5.5
132    assert np.ma.is_masked(t['b'][0])
133    assert np.isnan(t['d'][1])
134
135    for col1, col2 in zip(t.itercols(), dat.itercols()):
136        assert np.allclose(col1, col2, equal_nan=True)
137
138
139def test_roundtrip_example(tmpdir):
140    example_qdp = """
141        ! Initial comment line 1
142        ! Initial comment line 2
143        READ TERR 1
144        READ SERR 3
145        ! Table 0 comment
146        !a a(pos) a(neg) b c ce d
147        53000.5   0.25  -0.5   1  1.5  3.5 2
148        54000.5   1.25  -1.5   2  2.5  4.5 3
149        NO NO NO NO NO
150        ! Table 1 comment
151        !a a(pos) a(neg) b c ce d
152        54000.5   2.25  -2.5   NO  3.5  5.5 5
153        55000.5   3.25  -3.5   4  4.5  6.5 nan
154        """
155    test_file = str(tmpdir.join('test.qdp'))
156
157    t = Table.read(example_qdp, format='ascii.qdp', table_id=1,
158                   names=['a', 'b', 'c', 'd'])
159    t.write(test_file, err_specs={'terr': [1], 'serr': [3]})
160    t2 = Table.read(test_file, names=['a', 'b', 'c', 'd'], table_id=0)
161
162    for col1, col2 in zip(t.itercols(), t2.itercols()):
163        assert np.allclose(col1, col2, equal_nan=True)
164
165
166def test_roundtrip_example_comma(tmpdir):
167    example_qdp = """
168        ! Initial comment line 1
169        ! Initial comment line 2
170        READ TERR 1
171        READ SERR 3
172        ! Table 0 comment
173        !a,a(pos),a(neg),b,c,ce,d
174        53000.5,0.25,-0.5,1,1.5,3.5,2
175        54000.5,1.25,-1.5,2,2.5,4.5,3
176        NO,NO,NO,NO,NO
177        ! Table 1 comment
178        !a,a(pos),a(neg),b,c,ce,d
179        54000.5,2.25,-2.5,NO,3.5,5.5,5
180        55000.5,3.25,-3.5,4,4.5,6.5,nan
181        """
182    test_file = str(tmpdir.join('test.qdp'))
183
184    t = Table.read(example_qdp, format='ascii.qdp', table_id=1,
185                   names=['a', 'b', 'c', 'd'], sep=',')
186    t.write(test_file, err_specs={'terr': [1], 'serr': [3]})
187    t2 = Table.read(test_file, names=['a', 'b', 'c', 'd'], table_id=0)
188
189    # t.values_equal(t2)
190    for col1, col2 in zip(t.itercols(), t2.itercols()):
191        assert np.allclose(col1, col2, equal_nan=True)
192
193
194def test_read_write_simple(tmpdir):
195    test_file = str(tmpdir.join('test.qdp'))
196    t1 = Table()
197    t1.add_column(Column(name='a', data=[1, 2, 3, 4]))
198    t1.add_column(MaskedColumn(data=[4., np.nan, 3., 1.], name='b',
199                               mask=[False, False, False, True]))
200    t1.write(test_file, format='ascii.qdp')
201    with pytest.warns(UserWarning) as record:
202        t2 = Table.read(test_file, format='ascii.qdp')
203    assert np.any(["table_id not specified. Reading the first available table"
204                   in r.message.args[0]
205                   for r in record])
206
207    assert np.allclose(t2['col1'], t1['a'])
208    assert np.all(t2['col1'] == t1['a'])
209
210    good = ~np.isnan(t1['b'])
211    assert np.allclose(t2['col2'][good], t1['b'][good])
212
213
214def test_read_write_simple_specify_name(tmpdir):
215    test_file = str(tmpdir.join('test.qdp'))
216    t1 = Table()
217    t1.add_column(Column(name='a', data=[1, 2, 3]))
218    # Give a non-None err_specs
219    t1.write(test_file, format='ascii.qdp')
220    t2 = Table.read(test_file, table_id=0, format='ascii.qdp', names=['a'])
221    assert np.all(t2['a'] == t1['a'])
222
223
224def test_get_lines_from_qdp(tmpdir):
225    test_file = str(tmpdir.join('test.qdp'))
226    text_string = "A\nB"
227    text_output = _get_lines_from_file(text_string)
228    with open(test_file, "w") as fobj:
229        print(text_string, file=fobj)
230    file_output = _get_lines_from_file(test_file)
231    list_output = _get_lines_from_file(["A", "B"])
232    for i, line in enumerate(["A", "B"]):
233        assert file_output[i] == line
234        assert list_output[i] == line
235        assert text_output[i] == line
236