1# Licensed under a 3-clause BSD style license - see LICENSE.rst 2 3import sys 4 5import pytest 6import numpy as np 7from numpy.testing import assert_equal 8 9from astropy import units as u 10from astropy.time import Time 11from astropy.utils.exceptions import AstropyUserWarning 12 13from astropy.timeseries.sampled import TimeSeries 14from astropy.timeseries.downsample import aggregate_downsample, reduceat 15 16 17INPUT_TIME = Time(['2016-03-22T12:30:31', '2016-03-22T12:30:32', 18 '2016-03-22T12:30:33', '2016-03-22T12:30:34', 19 '2016-03-22T12:30:35']) 20 21 22def test_reduceat(): 23 add_output = np.add.reduceat(np.arange(8),[0, 4, 1, 5, 2, 6, 3, 7]) 24 # Similar to np.add for an array input. 25 sum_output = reduceat(np.arange(8), [0, 4, 1, 5, 2, 6, 3, 7], np.sum) 26 assert_equal(sum_output, add_output) 27 28 mean_output = reduceat(np.arange(8), np.arange(8)[::2], np.mean) 29 assert_equal(mean_output, np.array([0.5, 2.5, 4.5, 6.5])) 30 nanmean_output = reduceat(np.arange(8), [0, 4, 1, 5, 2, 6, 3, 7], np.mean) 31 assert_equal(nanmean_output, np.array([1.5, 4, 2.5, 5, 3.5, 6, 4.5, 7.])) 32 assert_equal(reduceat(np.arange(8), np.arange(8)[::2], np.mean), 33 reduceat(np.arange(8), np.arange(8)[::2], np.nanmean)) 34 35 36def test_timeseries_invalid(): 37 with pytest.raises(TypeError) as exc: 38 aggregate_downsample(None) 39 assert exc.value.args[0] == ("time_series should be a TimeSeries") 40 41 42def test_time_bin_invalid(): 43 44 # Make sure to raise the right exception when time_bin_* is passed incorrectly. 45 46 with pytest.raises(TypeError, match=r"'time_bin_size' should be a Quantity or a TimeDelta"): 47 aggregate_downsample(TimeSeries(), time_bin_size=1) 48 49 50def test_binning_arg_invalid(): 51 ts = TimeSeries(time=INPUT_TIME, data=[[1, 2, 3, 4, 5]], names=['a']) 52 with pytest.raises(TypeError, match=r"With single 'time_bin_start' either 'n_bins', " 53 "'time_bin_size' or time_bin_end' must be provided"): 54 aggregate_downsample(ts) 55 56 57def test_time_bin_conversion(): 58 59 ts = TimeSeries(time=INPUT_TIME, data=[[1, 2, 3, 4, 5]], names=['a']) 60 61 # Make sure time_bin_start and time_bin_end are properly converted to Time 62 down_start = aggregate_downsample(ts, time_bin_start=['2016-03-22T12:30:31'], 63 time_bin_size=[1]*u.s) 64 assert_equal(down_start.time_bin_start.isot, ['2016-03-22T12:30:31.000']) 65 66 down_end = aggregate_downsample(ts, time_bin_start=['2016-03-22T12:30:31', '2016-03-22T12:30:33'], 67 time_bin_end='2016-03-22T12:30:34') 68 assert_equal(down_end.time_bin_end.isot, ['2016-03-22T12:30:33.000', '2016-03-22T12:30:34.000']) 69 70 71def test_time_bin_end_auto(): 72 73 ts = TimeSeries(time=INPUT_TIME, data=[[1, 2, 3, 4, 5]], names=['a']) 74 75 # Interpret `time_bin_end` as the end of timeseries when `time_bin_start` is 76 # an array and `time_bin_size` is not provided 77 down_auto_end = aggregate_downsample(ts, time_bin_start=['2016-03-22T12:30:31', '2016-03-22T12:30:33']) 78 assert_equal(down_auto_end.time_bin_end.isot, ['2016-03-22T12:30:33.000', '2016-03-22T12:30:35.000']) 79 80 81def test_time_bin_start_array(): 82 83 ts = TimeSeries(time=INPUT_TIME, data=[[1, 2, 3, 4, 5]], names=['a']) 84 85 # When `time_bin_end` is an array and `time_bin_start` is not provided, `time_bin_start` is converted 86 # to an array with its first element set to the start of the timeseries and rest populated using 87 #`time_bin_end`. This case is separately tested since `BinnedTimeSeries` allows `time_bin_end` to 88 # be an array only if `time_bin_start` is an array. 89 down_start_array = aggregate_downsample(ts, time_bin_end=['2016-03-22T12:30:33', '2016-03-22T12:30:35']) 90 assert_equal(down_start_array.time_bin_start.isot, ['2016-03-22T12:30:31.000', '2016-03-22T12:30:33.000']) 91 92 93def test_nbins(): 94 95 ts = TimeSeries(time=INPUT_TIME, data=[[1, 2, 3, 4, 5]], names=['a']) 96 97 # n_bins should default to the number needed to fit all the original points 98 down_nbins = aggregate_downsample(ts, n_bins=2) 99 assert_equal(down_nbins.time_bin_start.isot, ['2016-03-22T12:30:31.000', '2016-03-22T12:30:33.000']) 100 101 102def test_downsample(): 103 ts = TimeSeries(time=INPUT_TIME, data=[[1, 2, 3, 4, 5]], names=['a']) 104 ts_units = TimeSeries(time=INPUT_TIME, data=[[1, 2, 3, 4, 5] * u.count], names=['a']) 105 106 # Avoid precision problems with floating-point comparisons on 32bit 107 if sys.maxsize > 2**32: 108 # 64 bit 109 time_bin_incr = 1 * u.s 110 time_bin_start = None 111 else: 112 # 32 bit 113 time_bin_incr = (1 - 1e-6) * u.s 114 time_bin_start = ts.time[0] - 1 * u.ns 115 116 down_1 = aggregate_downsample(ts, time_bin_size=time_bin_incr, time_bin_start=time_bin_start) 117 u.isclose(down_1.time_bin_size, [1, 1, 1, 1, 1]*time_bin_incr) 118 assert_equal(down_1.time_bin_start.isot, Time(['2016-03-22T12:30:31.000', '2016-03-22T12:30:32.000', 119 '2016-03-22T12:30:33.000', '2016-03-22T12:30:34.000', 120 '2016-03-22T12:30:35.000'])) 121 assert_equal(down_1["a"].data.data, np.array([1, 2, 3, 4, 5])) 122 123 down_2 = aggregate_downsample(ts, time_bin_size=2*time_bin_incr, time_bin_start=time_bin_start) 124 u.isclose(down_2.time_bin_size, [2, 2, 2]*time_bin_incr) 125 assert_equal(down_2.time_bin_start.isot, Time(['2016-03-22T12:30:31.000', '2016-03-22T12:30:33.000', 126 '2016-03-22T12:30:35.000'])) 127 assert_equal(down_2["a"].data.data, np.array([1, 3, 5])) 128 129 down_3 = aggregate_downsample(ts, time_bin_size=3*time_bin_incr, time_bin_start=time_bin_start) 130 u.isclose(down_3.time_bin_size, [3, 3]*time_bin_incr) 131 assert_equal(down_3.time_bin_start.isot, Time(['2016-03-22T12:30:31.000', '2016-03-22T12:30:34.000'])) 132 assert_equal(down_3["a"].data.data, np.array([2, 4])) 133 134 down_4 = aggregate_downsample(ts, time_bin_size=4*time_bin_incr, time_bin_start=time_bin_start) 135 u.isclose(down_4.time_bin_size, [4, 4]*time_bin_incr) 136 assert_equal(down_4.time_bin_start.isot, Time(['2016-03-22T12:30:31.000', '2016-03-22T12:30:35.000'])) 137 assert_equal(down_4["a"].data.data, np.array([2, 5])) 138 139 down_units = aggregate_downsample(ts_units, time_bin_size=4*time_bin_incr, time_bin_start=time_bin_start) 140 u.isclose(down_units.time_bin_size, [4, 4]*time_bin_incr) 141 assert_equal(down_units.time_bin_start.isot, Time(['2016-03-22T12:30:31.000', '2016-03-22T12:30:35.000'])) 142 assert down_units["a"].unit.name == 'ct' 143 assert_equal(down_units["a"].data, np.array([2.5, 5.0])) 144 145 # Contiguous bins with uneven bin sizes: `time_bin_size` is an array 146 down_uneven_bins = aggregate_downsample(ts, time_bin_size=[2, 1, 1]*time_bin_incr, 147 time_bin_start=time_bin_start) 148 u.isclose(down_uneven_bins.time_bin_size, [2, 1, 1]*time_bin_incr) 149 assert_equal(down_uneven_bins.time_bin_start.isot, Time(['2016-03-22T12:30:31.000', 150 '2016-03-22T12:30:33.000', 151 '2016-03-22T12:30:34.000'])) 152 assert_equal(down_uneven_bins["a"].data.data, np.array([1, 3, 4])) 153 154 # Uncontiguous bins with even bin sizes: `time_bin_start` and `time_bin_end` are both arrays 155 down_time_array = aggregate_downsample(ts, time_bin_start=Time(['2016-03-22T12:30:31.000', 156 '2016-03-22T12:30:34.000']), 157 time_bin_end=Time(['2016-03-22T12:30:32.000', 158 '2016-03-22T12:30:35.000'])) 159 u.isclose(down_time_array.time_bin_size, [1, 1]*u.second) 160 assert_equal(down_time_array.time_bin_start.isot, Time(['2016-03-22T12:30:31.000', 161 '2016-03-22T12:30:34.000'])) 162 assert_equal(down_time_array["a"].data.data, np.array([1, 4])) 163 164 # Overlapping bins 165 with pytest.warns(AstropyUserWarning, match="Overlapping bins should be avoided since they " 166 "can lead to double-counting of data during binning."): 167 down_overlap_bins = aggregate_downsample(ts, time_bin_start=Time(['2016-03-22T12:30:31.000', 168 '2016-03-22T12:30:33.000']), 169 time_bin_end=Time(['2016-03-22T12:30:34', 170 '2016-03-22T12:30:36.000'])) 171 assert_equal(down_overlap_bins["a"].data, np.array([2, 5])) 172