1# Licensed under a 3-clause BSD style license - see LICENSE.rst
2
3import sys
4
5import pytest
6import numpy as np
7from numpy.testing import assert_equal
8
9from astropy import units as u
10from astropy.time import Time
11from astropy.utils.exceptions import AstropyUserWarning
12
13from astropy.timeseries.sampled import TimeSeries
14from astropy.timeseries.downsample import aggregate_downsample, reduceat
15
16
17INPUT_TIME = Time(['2016-03-22T12:30:31', '2016-03-22T12:30:32',
18                   '2016-03-22T12:30:33', '2016-03-22T12:30:34',
19                   '2016-03-22T12:30:35'])
20
21
22def test_reduceat():
23    add_output = np.add.reduceat(np.arange(8),[0, 4, 1, 5, 2, 6, 3, 7])
24    # Similar to np.add for an array input.
25    sum_output = reduceat(np.arange(8), [0, 4, 1, 5, 2, 6, 3, 7], np.sum)
26    assert_equal(sum_output, add_output)
27
28    mean_output = reduceat(np.arange(8), np.arange(8)[::2], np.mean)
29    assert_equal(mean_output, np.array([0.5, 2.5, 4.5, 6.5]))
30    nanmean_output = reduceat(np.arange(8), [0, 4, 1, 5, 2, 6, 3, 7], np.mean)
31    assert_equal(nanmean_output, np.array([1.5, 4, 2.5, 5, 3.5, 6, 4.5, 7.]))
32    assert_equal(reduceat(np.arange(8), np.arange(8)[::2], np.mean),
33                 reduceat(np.arange(8), np.arange(8)[::2], np.nanmean))
34
35
36def test_timeseries_invalid():
37    with pytest.raises(TypeError) as exc:
38        aggregate_downsample(None)
39    assert exc.value.args[0] == ("time_series should be a TimeSeries")
40
41
42def test_time_bin_invalid():
43
44    # Make sure to raise the right exception when time_bin_* is passed incorrectly.
45
46    with pytest.raises(TypeError, match=r"'time_bin_size' should be a Quantity or a TimeDelta"):
47        aggregate_downsample(TimeSeries(), time_bin_size=1)
48
49
50def test_binning_arg_invalid():
51    ts = TimeSeries(time=INPUT_TIME, data=[[1, 2, 3, 4, 5]], names=['a'])
52    with pytest.raises(TypeError, match=r"With single 'time_bin_start' either 'n_bins', "
53                                         "'time_bin_size' or time_bin_end' must be provided"):
54        aggregate_downsample(ts)
55
56
57def test_time_bin_conversion():
58
59    ts = TimeSeries(time=INPUT_TIME, data=[[1, 2, 3, 4, 5]], names=['a'])
60
61    # Make sure time_bin_start and time_bin_end are properly converted to Time
62    down_start = aggregate_downsample(ts, time_bin_start=['2016-03-22T12:30:31'],
63                                      time_bin_size=[1]*u.s)
64    assert_equal(down_start.time_bin_start.isot, ['2016-03-22T12:30:31.000'])
65
66    down_end = aggregate_downsample(ts, time_bin_start=['2016-03-22T12:30:31', '2016-03-22T12:30:33'],
67                                    time_bin_end='2016-03-22T12:30:34')
68    assert_equal(down_end.time_bin_end.isot, ['2016-03-22T12:30:33.000', '2016-03-22T12:30:34.000'])
69
70
71def test_time_bin_end_auto():
72
73    ts = TimeSeries(time=INPUT_TIME, data=[[1, 2, 3, 4, 5]], names=['a'])
74
75    # Interpret `time_bin_end` as the end of timeseries when `time_bin_start` is
76    # an array and `time_bin_size` is not provided
77    down_auto_end = aggregate_downsample(ts, time_bin_start=['2016-03-22T12:30:31', '2016-03-22T12:30:33'])
78    assert_equal(down_auto_end.time_bin_end.isot, ['2016-03-22T12:30:33.000', '2016-03-22T12:30:35.000'])
79
80
81def test_time_bin_start_array():
82
83    ts = TimeSeries(time=INPUT_TIME, data=[[1, 2, 3, 4, 5]], names=['a'])
84
85    # When `time_bin_end` is an array and `time_bin_start` is not provided, `time_bin_start` is converted
86    # to an array with its first element set to the start of the timeseries and rest populated using
87    #`time_bin_end`. This case is separately tested since `BinnedTimeSeries` allows `time_bin_end` to
88    # be an array only if `time_bin_start` is an array.
89    down_start_array = aggregate_downsample(ts, time_bin_end=['2016-03-22T12:30:33', '2016-03-22T12:30:35'])
90    assert_equal(down_start_array.time_bin_start.isot, ['2016-03-22T12:30:31.000', '2016-03-22T12:30:33.000'])
91
92
93def test_nbins():
94
95    ts = TimeSeries(time=INPUT_TIME, data=[[1, 2, 3, 4, 5]], names=['a'])
96
97    # n_bins should default to the number needed to fit all the original points
98    down_nbins = aggregate_downsample(ts, n_bins=2)
99    assert_equal(down_nbins.time_bin_start.isot, ['2016-03-22T12:30:31.000', '2016-03-22T12:30:33.000'])
100
101
102def test_downsample():
103    ts = TimeSeries(time=INPUT_TIME, data=[[1, 2, 3, 4, 5]], names=['a'])
104    ts_units = TimeSeries(time=INPUT_TIME, data=[[1, 2, 3, 4, 5] * u.count], names=['a'])
105
106    # Avoid precision problems with floating-point comparisons on 32bit
107    if sys.maxsize > 2**32:
108        # 64 bit
109        time_bin_incr = 1 * u.s
110        time_bin_start = None
111    else:
112        # 32 bit
113        time_bin_incr = (1 - 1e-6) * u.s
114        time_bin_start = ts.time[0] - 1 * u.ns
115
116    down_1 = aggregate_downsample(ts, time_bin_size=time_bin_incr, time_bin_start=time_bin_start)
117    u.isclose(down_1.time_bin_size, [1, 1, 1, 1, 1]*time_bin_incr)
118    assert_equal(down_1.time_bin_start.isot, Time(['2016-03-22T12:30:31.000', '2016-03-22T12:30:32.000',
119                                                   '2016-03-22T12:30:33.000', '2016-03-22T12:30:34.000',
120                                                   '2016-03-22T12:30:35.000']))
121    assert_equal(down_1["a"].data.data, np.array([1, 2, 3, 4, 5]))
122
123    down_2 = aggregate_downsample(ts, time_bin_size=2*time_bin_incr, time_bin_start=time_bin_start)
124    u.isclose(down_2.time_bin_size, [2, 2, 2]*time_bin_incr)
125    assert_equal(down_2.time_bin_start.isot, Time(['2016-03-22T12:30:31.000', '2016-03-22T12:30:33.000',
126                                                   '2016-03-22T12:30:35.000']))
127    assert_equal(down_2["a"].data.data, np.array([1, 3, 5]))
128
129    down_3 = aggregate_downsample(ts, time_bin_size=3*time_bin_incr, time_bin_start=time_bin_start)
130    u.isclose(down_3.time_bin_size, [3, 3]*time_bin_incr)
131    assert_equal(down_3.time_bin_start.isot, Time(['2016-03-22T12:30:31.000', '2016-03-22T12:30:34.000']))
132    assert_equal(down_3["a"].data.data, np.array([2, 4]))
133
134    down_4 = aggregate_downsample(ts, time_bin_size=4*time_bin_incr, time_bin_start=time_bin_start)
135    u.isclose(down_4.time_bin_size, [4, 4]*time_bin_incr)
136    assert_equal(down_4.time_bin_start.isot, Time(['2016-03-22T12:30:31.000', '2016-03-22T12:30:35.000']))
137    assert_equal(down_4["a"].data.data, np.array([2, 5]))
138
139    down_units = aggregate_downsample(ts_units, time_bin_size=4*time_bin_incr, time_bin_start=time_bin_start)
140    u.isclose(down_units.time_bin_size, [4, 4]*time_bin_incr)
141    assert_equal(down_units.time_bin_start.isot, Time(['2016-03-22T12:30:31.000', '2016-03-22T12:30:35.000']))
142    assert down_units["a"].unit.name == 'ct'
143    assert_equal(down_units["a"].data, np.array([2.5, 5.0]))
144
145    # Contiguous bins with uneven bin sizes: `time_bin_size` is an array
146    down_uneven_bins = aggregate_downsample(ts, time_bin_size=[2, 1, 1]*time_bin_incr,
147                                            time_bin_start=time_bin_start)
148    u.isclose(down_uneven_bins.time_bin_size, [2, 1, 1]*time_bin_incr)
149    assert_equal(down_uneven_bins.time_bin_start.isot, Time(['2016-03-22T12:30:31.000',
150                                                             '2016-03-22T12:30:33.000',
151                                                             '2016-03-22T12:30:34.000']))
152    assert_equal(down_uneven_bins["a"].data.data, np.array([1, 3, 4]))
153
154    # Uncontiguous bins with even bin sizes: `time_bin_start` and `time_bin_end` are both arrays
155    down_time_array = aggregate_downsample(ts, time_bin_start=Time(['2016-03-22T12:30:31.000',
156                                                                    '2016-03-22T12:30:34.000']),
157                                           time_bin_end=Time(['2016-03-22T12:30:32.000',
158                                                              '2016-03-22T12:30:35.000']))
159    u.isclose(down_time_array.time_bin_size, [1, 1]*u.second)
160    assert_equal(down_time_array.time_bin_start.isot, Time(['2016-03-22T12:30:31.000',
161                                                            '2016-03-22T12:30:34.000']))
162    assert_equal(down_time_array["a"].data.data, np.array([1, 4]))
163
164    # Overlapping bins
165    with pytest.warns(AstropyUserWarning, match="Overlapping bins should be avoided since they "
166                                                "can lead to double-counting of data during binning."):
167        down_overlap_bins = aggregate_downsample(ts, time_bin_start=Time(['2016-03-22T12:30:31.000',
168                                                                          '2016-03-22T12:30:33.000']),
169                                                 time_bin_end=Time(['2016-03-22T12:30:34',
170                                                              '2016-03-22T12:30:36.000']))
171        assert_equal(down_overlap_bins["a"].data, np.array([2, 5]))
172