1# Licensed under a 3-clause BSD style license - see LICENSE.rst 2 3import warnings 4 5import numpy as np 6from astropy import units as u 7from astropy.time import Time, TimeDelta 8from astropy.utils.exceptions import AstropyUserWarning 9 10from astropy.timeseries.sampled import TimeSeries 11from astropy.timeseries.binned import BinnedTimeSeries 12 13__all__ = ['aggregate_downsample'] 14 15 16def reduceat(array, indices, function): 17 """ 18 Manual reduceat functionality for cases where Numpy functions don't have a reduceat. 19 It will check if the input function has a reduceat and call that if it does. 20 """ 21 if hasattr(function, 'reduceat'): 22 return np.array(function.reduceat(array, indices)) 23 else: 24 result = [] 25 for i in range(len(indices) - 1): 26 if indices[i+1] <= indices[i]+1: 27 result.append(function(array[indices[i]])) 28 else: 29 result.append(function(array[indices[i]:indices[i+1]])) 30 result.append(function(array[indices[-1]:])) 31 return np.array(result) 32 33 34def aggregate_downsample(time_series, *, time_bin_size=None, time_bin_start=None, 35 time_bin_end=None, n_bins=None, aggregate_func=None): 36 """ 37 Downsample a time series by binning values into bins with a fixed size or 38 custom sizes, using a single function to combine the values in the bin. 39 40 Parameters 41 ---------- 42 time_series : :class:`~astropy.timeseries.TimeSeries` 43 The time series to downsample. 44 time_bin_size : `~astropy.units.Quantity` or `~astropy.time.TimeDelta` ['time'], optional 45 The time interval for the binned time series - this is either a scalar 46 value (in which case all time bins will be assumed to have the same 47 duration) or as an array of values (in which case each time bin can 48 have a different duration). If this argument is provided, 49 ``time_bin_end`` should not be provided. 50 time_bin_start : `~astropy.time.Time` or iterable, optional 51 The start time for the binned time series - this can be either given 52 directly as a `~astropy.time.Time` array or as any iterable that 53 initializes the `~astropy.time.Time` class. This can also be a scalar 54 value if ``time_bin_size`` or ``time_bin_end`` is provided. 55 Defaults to the first time in the sampled time series. 56 time_bin_end : `~astropy.time.Time` or iterable, optional 57 The times of the end of each bin - this can be either given directly as 58 a `~astropy.time.Time` array or as any iterable that initializes the 59 `~astropy.time.Time` class. This can only be given if ``time_bin_start`` 60 is provided or its default is used. If ``time_bin_end`` is scalar and 61 ``time_bin_start`` is an array, time bins are assumed to be contiguous; 62 the end of each bin is the start of the next one, and ``time_bin_end`` gives 63 the end time for the last bin. If ``time_bin_end`` is an array and 64 ``time_bin_start`` is scalar, bins will be contiguous. If both ``time_bin_end`` 65 and ``time_bin_start`` are arrays, bins do not need to be contiguous. 66 If this argument is provided, ``time_bin_size`` should not be provided. 67 n_bins : int, optional 68 The number of bins to use. Defaults to the number needed to fit all 69 the original points. If both ``time_bin_start`` and ``time_bin_size`` 70 are provided and are scalar values, this determines the total bins 71 within that interval. 72 aggregate_func : callable, optional 73 The function to use for combining points in the same bin. Defaults 74 to np.nanmean. 75 76 Returns 77 ------- 78 binned_time_series : :class:`~astropy.timeseries.BinnedTimeSeries` 79 The downsampled time series. 80 """ 81 82 if not isinstance(time_series, TimeSeries): 83 raise TypeError("time_series should be a TimeSeries") 84 85 if time_bin_size is not None and not isinstance(time_bin_size, (u.Quantity, TimeDelta)): 86 raise TypeError("'time_bin_size' should be a Quantity or a TimeDelta") 87 88 if time_bin_start is not None and not isinstance(time_bin_start, (Time, TimeDelta)): 89 time_bin_start = Time(time_bin_start) 90 91 if time_bin_end is not None and not isinstance(time_bin_end, (Time, TimeDelta)): 92 time_bin_end = Time(time_bin_end) 93 94 # Use the table sorted by time 95 ts_sorted = time_series.iloc[:] 96 97 # If start time is not provided, it is assumed to be the start of the timeseries 98 if time_bin_start is None: 99 time_bin_start = ts_sorted.time[0] 100 101 # Total duration of the timeseries is needed for determining either 102 # `time_bin_size` or `nbins` in the case of scalar `time_bin_start` 103 if time_bin_start.isscalar: 104 time_duration = (ts_sorted.time[-1] - time_bin_start).sec 105 106 if time_bin_size is None and time_bin_end is None: 107 if time_bin_start.isscalar: 108 if n_bins is None: 109 raise TypeError("With single 'time_bin_start' either 'n_bins', " 110 "'time_bin_size' or time_bin_end' must be provided") 111 else: 112 # `nbins` defaults to the number needed to fit all points 113 time_bin_size = time_duration / n_bins * u.s 114 else: 115 time_bin_end = ts_sorted.time[-1] 116 117 if time_bin_start.isscalar: 118 if time_bin_size is not None: 119 if time_bin_size.isscalar: 120 # Determine the number of bins 121 if n_bins is None: 122 bin_size_sec = time_bin_size.to_value(u.s) 123 n_bins = int(np.ceil(time_duration/bin_size_sec)) 124 elif time_bin_end is not None: 125 if not time_bin_end.isscalar: 126 # Convert start time to an array and populate using `time_bin_end` 127 scalar_start_time = time_bin_start 128 time_bin_start = time_bin_end.replicate(copy=True) 129 time_bin_start[0] = scalar_start_time 130 time_bin_start[1:] = time_bin_end[:-1] 131 132 # Check for overlapping bins, and warn if they are present 133 if time_bin_end is not None: 134 if (not time_bin_end.isscalar and not time_bin_start.isscalar and 135 np.any(time_bin_start[1:] < time_bin_end[:-1])): 136 warnings.warn("Overlapping bins should be avoided since they " 137 "can lead to double-counting of data during binning.", 138 AstropyUserWarning) 139 140 binned = BinnedTimeSeries(time_bin_size=time_bin_size, 141 time_bin_start=time_bin_start, 142 time_bin_end=time_bin_end, 143 n_bins=n_bins) 144 145 if aggregate_func is None: 146 aggregate_func = np.nanmean 147 148 # Start and end times of the binned timeseries 149 bin_start = binned.time_bin_start 150 bin_end = binned.time_bin_end 151 152 # Assign `n_bins` since it is needed later 153 if n_bins is None: 154 n_bins = len(bin_start) 155 156 # Find the subset of the table that is inside the union of all bins 157 keep = ((ts_sorted.time >= bin_start[0]) & (ts_sorted.time <= bin_end[-1])) 158 159 # Find out indices to be removed because of uncontiguous bins 160 for ind in range(n_bins-1): 161 delete_indices = np.where(np.logical_and(ts_sorted.time > bin_end[ind], 162 ts_sorted.time < bin_start[ind+1])) 163 keep[delete_indices] = False 164 165 subset = ts_sorted[keep] 166 167 # Figure out which bin each row falls in by sorting with respect 168 # to the bin end times 169 indices = np.searchsorted(bin_end, ts_sorted.time[keep]) 170 # For time == bin_start[i+1] == bin_end[i], let bin_start takes precedence 171 if np.all(bin_start[1:] >= bin_end[:-1]): 172 indices_start = np.searchsorted(ts_sorted.time[keep], 173 np.minimum(bin_start, ts_sorted.time[-1])) 174 indices[indices_start] = np.arange(len(indices_start)) 175 176 # Determine rows where values are defined 177 groups = np.hstack([0, np.nonzero(np.diff(indices))[0] + 1]) 178 179 # Find unique indices to determine which rows in the final time series 180 # will not be empty. 181 unique_indices = np.unique(indices) 182 183 # Add back columns 184 185 for colname in subset.colnames: 186 187 if colname == 'time': 188 continue 189 190 values = subset[colname] 191 192 # FIXME: figure out how to avoid the following, if possible 193 if not isinstance(values, (np.ndarray, u.Quantity)): 194 warnings.warn("Skipping column {0} since it has a mix-in type", AstropyUserWarning) 195 continue 196 197 if isinstance(values, u.Quantity): 198 data = u.Quantity(np.repeat(np.nan, n_bins), unit=values.unit) 199 data[unique_indices] = u.Quantity(reduceat(values.value, groups, aggregate_func), 200 values.unit, copy=False) 201 else: 202 data = np.ma.zeros(n_bins, dtype=values.dtype) 203 data.mask = 1 204 data[unique_indices] = reduceat(values, groups, aggregate_func) 205 data.mask[unique_indices] = 0 206 207 binned[colname] = data 208 209 return binned 210