1# Licensed under a 3-clause BSD style license - see LICENSE.rst
2
3import warnings
4
5import numpy as np
6from astropy import units as u
7from astropy.time import Time, TimeDelta
8from astropy.utils.exceptions import AstropyUserWarning
9
10from astropy.timeseries.sampled import TimeSeries
11from astropy.timeseries.binned import BinnedTimeSeries
12
13__all__ = ['aggregate_downsample']
14
15
16def reduceat(array, indices, function):
17    """
18    Manual reduceat functionality for cases where Numpy functions don't have a reduceat.
19    It will check if the input function has a reduceat and call that if it does.
20    """
21    if hasattr(function, 'reduceat'):
22        return np.array(function.reduceat(array, indices))
23    else:
24        result = []
25        for i in range(len(indices) - 1):
26            if indices[i+1] <= indices[i]+1:
27                result.append(function(array[indices[i]]))
28            else:
29                result.append(function(array[indices[i]:indices[i+1]]))
30        result.append(function(array[indices[-1]:]))
31        return np.array(result)
32
33
34def aggregate_downsample(time_series, *, time_bin_size=None, time_bin_start=None,
35                         time_bin_end=None, n_bins=None, aggregate_func=None):
36    """
37    Downsample a time series by binning values into bins with a fixed size or
38    custom sizes, using a single function to combine the values in the bin.
39
40    Parameters
41    ----------
42    time_series : :class:`~astropy.timeseries.TimeSeries`
43        The time series to downsample.
44    time_bin_size : `~astropy.units.Quantity` or `~astropy.time.TimeDelta` ['time'], optional
45        The time interval for the binned time series - this is either a scalar
46        value (in which case all time bins will be assumed to have the same
47        duration) or as an array of values (in which case each time bin can
48        have a different duration). If this argument is provided,
49        ``time_bin_end`` should not be provided.
50    time_bin_start : `~astropy.time.Time` or iterable, optional
51        The start time for the binned time series - this can be either given
52        directly as a `~astropy.time.Time` array or as any iterable that
53        initializes the `~astropy.time.Time` class. This can also be a scalar
54        value if ``time_bin_size`` or ``time_bin_end`` is provided.
55        Defaults to the first time in the sampled time series.
56    time_bin_end : `~astropy.time.Time` or iterable, optional
57        The times of the end of each bin - this can be either given directly as
58        a `~astropy.time.Time` array or as any iterable that initializes the
59        `~astropy.time.Time` class. This can only be given if ``time_bin_start``
60        is provided or its default is used. If ``time_bin_end`` is scalar and
61        ``time_bin_start`` is an array, time bins are assumed to be contiguous;
62        the end of each bin is the start of the next one, and ``time_bin_end`` gives
63        the end time for the last bin.  If ``time_bin_end`` is an array and
64        ``time_bin_start`` is scalar, bins will be contiguous. If both ``time_bin_end``
65        and ``time_bin_start`` are arrays, bins do not need to be contiguous.
66        If this argument is provided, ``time_bin_size`` should not be provided.
67    n_bins : int, optional
68        The number of bins to use. Defaults to the number needed to fit all
69        the original points. If both ``time_bin_start`` and ``time_bin_size``
70        are provided and are scalar values, this determines the total bins
71        within that interval.
72    aggregate_func : callable, optional
73        The function to use for combining points in the same bin. Defaults
74        to np.nanmean.
75
76    Returns
77    -------
78    binned_time_series : :class:`~astropy.timeseries.BinnedTimeSeries`
79        The downsampled time series.
80    """
81
82    if not isinstance(time_series, TimeSeries):
83        raise TypeError("time_series should be a TimeSeries")
84
85    if time_bin_size is not None and not isinstance(time_bin_size, (u.Quantity, TimeDelta)):
86        raise TypeError("'time_bin_size' should be a Quantity or a TimeDelta")
87
88    if time_bin_start is not None and not isinstance(time_bin_start, (Time, TimeDelta)):
89        time_bin_start = Time(time_bin_start)
90
91    if time_bin_end is not None and not isinstance(time_bin_end, (Time, TimeDelta)):
92        time_bin_end = Time(time_bin_end)
93
94    # Use the table sorted by time
95    ts_sorted = time_series.iloc[:]
96
97    # If start time is not provided, it is assumed to be the start of the timeseries
98    if time_bin_start is None:
99        time_bin_start = ts_sorted.time[0]
100
101    # Total duration of the timeseries is needed for determining either
102    # `time_bin_size` or `nbins` in the case of scalar `time_bin_start`
103    if time_bin_start.isscalar:
104        time_duration = (ts_sorted.time[-1] - time_bin_start).sec
105
106    if time_bin_size is None and time_bin_end is None:
107        if time_bin_start.isscalar:
108            if n_bins is None:
109                raise TypeError("With single 'time_bin_start' either 'n_bins', "
110                                "'time_bin_size' or time_bin_end' must be provided")
111            else:
112                # `nbins` defaults to the number needed to fit all points
113                time_bin_size = time_duration / n_bins * u.s
114        else:
115            time_bin_end = ts_sorted.time[-1]
116
117    if time_bin_start.isscalar:
118        if time_bin_size is not None:
119            if time_bin_size.isscalar:
120                # Determine the number of bins
121                if n_bins is None:
122                    bin_size_sec = time_bin_size.to_value(u.s)
123                    n_bins = int(np.ceil(time_duration/bin_size_sec))
124        elif time_bin_end is not None:
125            if not time_bin_end.isscalar:
126                # Convert start time to an array and populate using `time_bin_end`
127                scalar_start_time = time_bin_start
128                time_bin_start = time_bin_end.replicate(copy=True)
129                time_bin_start[0] = scalar_start_time
130                time_bin_start[1:] = time_bin_end[:-1]
131
132    # Check for overlapping bins, and warn if they are present
133    if time_bin_end is not None:
134        if (not time_bin_end.isscalar and not time_bin_start.isscalar and
135                np.any(time_bin_start[1:] < time_bin_end[:-1])):
136            warnings.warn("Overlapping bins should be avoided since they "
137                          "can lead to double-counting of data during binning.",
138                          AstropyUserWarning)
139
140    binned = BinnedTimeSeries(time_bin_size=time_bin_size,
141                              time_bin_start=time_bin_start,
142                              time_bin_end=time_bin_end,
143                              n_bins=n_bins)
144
145    if aggregate_func is None:
146        aggregate_func = np.nanmean
147
148    # Start and end times of the binned timeseries
149    bin_start = binned.time_bin_start
150    bin_end = binned.time_bin_end
151
152    # Assign `n_bins` since it is needed later
153    if n_bins is None:
154        n_bins = len(bin_start)
155
156    # Find the subset of the table that is inside the union of all bins
157    keep = ((ts_sorted.time >= bin_start[0]) & (ts_sorted.time <= bin_end[-1]))
158
159    # Find out indices to be removed because of uncontiguous bins
160    for ind in range(n_bins-1):
161        delete_indices = np.where(np.logical_and(ts_sorted.time > bin_end[ind],
162                                                 ts_sorted.time < bin_start[ind+1]))
163        keep[delete_indices] = False
164
165    subset = ts_sorted[keep]
166
167    # Figure out which bin each row falls in by sorting with respect
168    # to the bin end times
169    indices = np.searchsorted(bin_end, ts_sorted.time[keep])
170    # For time == bin_start[i+1] == bin_end[i], let bin_start takes precedence
171    if np.all(bin_start[1:] >= bin_end[:-1]):
172        indices_start = np.searchsorted(ts_sorted.time[keep],
173                                        np.minimum(bin_start, ts_sorted.time[-1]))
174        indices[indices_start] = np.arange(len(indices_start))
175
176    # Determine rows where values are defined
177    groups = np.hstack([0, np.nonzero(np.diff(indices))[0] + 1])
178
179    # Find unique indices to determine which rows in the final time series
180    # will not be empty.
181    unique_indices = np.unique(indices)
182
183    # Add back columns
184
185    for colname in subset.colnames:
186
187        if colname == 'time':
188            continue
189
190        values = subset[colname]
191
192        # FIXME: figure out how to avoid the following, if possible
193        if not isinstance(values, (np.ndarray, u.Quantity)):
194            warnings.warn("Skipping column {0} since it has a mix-in type", AstropyUserWarning)
195            continue
196
197        if isinstance(values, u.Quantity):
198            data = u.Quantity(np.repeat(np.nan,  n_bins), unit=values.unit)
199            data[unique_indices] = u.Quantity(reduceat(values.value, groups, aggregate_func),
200                                              values.unit, copy=False)
201        else:
202            data = np.ma.zeros(n_bins, dtype=values.dtype)
203            data.mask = 1
204            data[unique_indices] = reduceat(values, groups, aggregate_func)
205            data.mask[unique_indices] = 0
206
207        binned[colname] = data
208
209    return binned
210