1# coding=utf-8
2"""
3© 2014 LinkedIn Corp. All rights reserved.
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at  http://www.apache.org/licenses/LICENSE-2.0
7
8Unless required by applicable law or agreed to in writing, software
9distributed under the License is distributed on an "AS IS" BASIS,
10WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11"""
12import numpy
13
14
15class TimeSeries(object):
16
17  def __init__(self, series):
18    self.timestamps = []
19    self.values = []
20
21    # Clean the time series by removing null values.
22    for ts in sorted(series):
23      if series[ts] is not None:
24        self.timestamps.append(int(ts))
25        self.values.append(float(series[ts]))
26
27  @property
28  def start(self):
29    """
30    Return the earliest timestamp in the time series.
31    """
32    return min(self.timestamps) if self.timestamps else None
33
34  @property
35  def end(self):
36    """
37    Return the latest timestamp in the time series.
38    """
39    return max(self.timestamps) if self.timestamps else None
40
41  @property
42  def timestamps_ms(self):
43    """
44    Return list of timestamp values in order by milliseconds since epoch.
45    """
46    return map(lambda ts: ts * 1000, self.timestamps)
47
48  def __repr__(self):
49    return 'TimeSeries<start={0}, end={1}>'.format(repr(self.start), repr(self.end))
50
51  def __str__(self):
52    """
53    :return string: Return string representation of time series
54    """
55    string_rep = ''
56    for item in self.iteritems():
57      string_rep += str(item)
58    return string_rep
59
60  def __nonzero__(self):
61    return len(self.timestamps) > 0
62
63  def __getitem__(self, key):
64    if key in self.timestamps:
65      pos = self.timestamps.index(key)
66      return self.values[pos]
67    else:
68      raise ValueError('Timestamp does not exist in TimeSeries object')
69
70  def __setitem__(self, key, val):
71    if key in self.timestamps:
72      pos = self.timestamps.index(key)
73      if val is None:
74        del self.timestamps[pos]
75        del self.values[pos]
76      else:
77        self.values[pos] = val
78    else:
79      self.timestamps = sorted(self.timestamps + [key])
80      pos = self.timestamps.index(key)
81      self.values.insert(pos, val)
82
83  def __delitem__(self, key):
84    if key in self.timestamps:
85      pos = self.timestamps.index(key)
86      del self.timestamps[pos]
87      del self.values[pos]
88
89  def __contains__(self, item):
90    return item in self.timestamps
91
92  def __iter__(self):
93    for key in self.timestamps:
94      yield key
95
96  def __len__(self):
97    return len(self.timestamps)
98
99  def __eq__(self, other):
100    if len(self.timestamps) != len(other.timestamps):
101      return False
102
103    for pos, ts in enumerate(self.timestamps):
104      if ts != other.timestamps[pos] or self.values[pos] != other.values[pos]:
105        return False
106    else:
107      return True
108
109  def __add__(self, other):
110    return self._generic_binary_op(other, self._get_value_type(other).__add__)
111
112  def __sub__(self, other):
113    return self._generic_binary_op(other, self._get_value_type(other).__sub__)
114
115  def __mul__(self, other):
116    return self._generic_binary_op(other, self._get_value_type(other).__mul__)
117
118  def __div__(self, other):
119    return self._generic_binary_op(other, self._get_value_type(other).__div__)
120
121  __radd__ = __add__
122  __rmul__ = __mul__
123
124  def __rsub__(self, other):
125    return self._generic_binary_op(other, self._get_value_type(other).__rsub__)
126
127  def __rdiv__(self, other):
128    return self._generic_binary_op(other, self._get_value_type(other).__rdiv__)
129
130  def items(self):
131    return [(ts, self.values[pos]) for pos, ts in enumerate(self.timestamps)]
132
133  def iterkeys(self):
134    for key in self.timestamps:
135      yield key
136
137  def itervalues(self):
138    for value in self.values:
139      yield value
140
141  def iteritems(self):
142    for item in self.items():
143      yield item
144
145  def iteritems_silent(self):
146    for item in self.items():
147      yield item
148    yield None
149
150  def _generic_binary_op(self, other, op):
151    """
152    Perform the method operation specified in the op parameter on the values
153    within the instance's time series values and either another time series
154    or a constant number value.
155
156    :param other: Time series of values or a constant number to use in calculations with instance's time series.
157    :param func op: The method to perform the calculation between the values.
158    :return: :class:`TimeSeries` object.
159    """
160    output = {}
161    if isinstance(other, TimeSeries):
162      for key, value in self.items():
163        if key in other:
164          try:
165            result = op(value, other[key])
166            if result is NotImplemented:
167              other_type = type(other[key])
168              other_op = vars(other_type).get(op.__name__)
169              if other_op:
170                output[key] = other_op(other_type(value), other[key])
171            else:
172              output[key] = result
173          except ZeroDivisionError:
174            continue
175    else:
176      for key, value in self.items():
177        try:
178          result = op(value, other)
179          if result is NotImplemented:
180            other_type = type(other)
181            other_op = vars(other_type).get(op.__name__)
182            if other_op:
183              output[key] = other_op(other_type(value), other)
184          else:
185            output[key] = result
186        except ZeroDivisionError:
187          continue
188
189    if output:
190      return TimeSeries(output)
191    else:
192      raise ValueError('TimeSeries data was empty or invalid.')
193
194  def _get_value_type(self, other):
195    """
196    Get the object type of the value within the values portion of the time series.
197
198    :return: `type` of object
199    """
200    if self.values:
201      return type(self.values[0])
202    elif isinstance(other, TimeSeries) and other.values:
203      return type(other.values[0])
204    else:
205      raise ValueError('Cannot perform arithmetic on empty time series.')
206
207  def align(self, other):
208    """
209    Align two time series so that len(self) == len(other) and self.timstamps == other.timestamps.
210
211    :return: :tuple:(`TimeSeries` object(the aligned self), `TimeSeries` object(the aligned other))
212    """
213    if isinstance(other, TimeSeries):
214      aligned, other_aligned = {}, {}
215      i, other_i = self.iteritems_silent(), other.iteritems_silent()
216      item, other_item = i.next(), other_i.next()
217
218      while item and other_item:
219        # Unpack timestamps and values.
220        timestamp, value = item
221        other_timestamp, other_value = other_item
222        if timestamp == other_timestamp:
223          aligned[timestamp] = value
224          other_aligned[other_timestamp] = other_value
225          item = i.next()
226          other_item = other_i.next()
227        elif timestamp < other_timestamp:
228          aligned[timestamp] = value
229          other_aligned[timestamp] = other_value
230          item = i.next()
231        else:
232          aligned[other_timestamp] = value
233          other_aligned[other_timestamp] = other_value
234          other_item = other_i.next()
235      # Align remaining items.
236      while item:
237        timestamp, value = item
238        aligned[timestamp] = value
239        other_aligned[timestamp] = other.values[-1]
240        item = i.next()
241      while other_item:
242        other_timestamp, other_value = other_item
243        aligned[other_timestamp] = self.values[-1]
244        other_aligned[other_timestamp] = other_value
245        other_item = other_i.next()
246      return TimeSeries(aligned), TimeSeries(other_aligned)
247
248  def smooth(self, smoothing_factor):
249    """
250    return a new time series which is a exponential smoothed version of the original data series.
251    soomth forward once, backward once, and then take the average.
252
253    :param float smoothing_factor: smoothing factor
254    :return: :class:`TimeSeries` object.
255    """
256    forward_smooth = {}
257    backward_smooth = {}
258    output = {}
259
260    if self:
261      pre = self.values[0]
262      next = self.values[-1]
263      for key, value in self.items():
264        forward_smooth[key] = smoothing_factor * pre + (1 - smoothing_factor) * value
265        pre = forward_smooth[key]
266      for key, value in reversed(self.items()):
267        backward_smooth[key] = smoothing_factor * next + (1 - smoothing_factor) * value
268        next = backward_smooth[key]
269      for key in forward_smooth.keys():
270        output[key] = (forward_smooth[key] + backward_smooth[key]) / 2
271
272    return TimeSeries(output)
273
274  def add_offset(self, offset):
275    """
276    Return a new time series with all timestamps incremented by some offset.
277
278    :param int offset: The number of seconds to offset the time series.
279    :return: `None`
280    """
281    self.timestamps = map(lambda ts: ts + offset, self.timestamps)
282
283  def normalize(self):
284    """
285    Return a new time series with all values normalized to 0 to 1.
286
287    :return: `None`
288    """
289    maximum = self.max()
290    if maximum:
291      self.values = map(lambda value: value / maximum, self.values)
292
293  def crop(self, start_timestamp, end_timestamp):
294    """
295    Return a new TimeSeries object contains all the timstamps and values within
296    the specified range.
297
298    :param int start_timestamp: the start timestamp value
299    :param int end_timestamp: the end timestamp value
300    :return: :class:`TimeSeries` object.
301    """
302    output = {}
303    for key, value in self.items():
304      if key >= start_timestamp and key <= end_timestamp:
305        output[key] = value
306
307    if output:
308      return TimeSeries(output)
309    else:
310      raise ValueError('TimeSeries data was empty or invalid.')
311
312  def average(self, default=None):
313    """
314    Calculate the average value over the time series.
315
316    :param default: Value to return as a default should the calculation not be possible.
317    :return: Float representing the average value or `None`.
318    """
319    return numpy.asscalar(numpy.average(self.values)) if self.values else default
320
321  def median(self, default=None):
322    """
323    Calculate the median value over the time series.
324
325    :param default: Value to return as a default should the calculation not be possible.
326    :return: Float representing the median value or `None`.
327    """
328    return numpy.asscalar(numpy.median(self.values)) if self.values else default
329
330  def max(self, default=None):
331    """
332    Calculate the maximum value over the time series.
333
334    :param default: Value to return as a default should the calculation not be possible.
335    :return: Float representing the maximum value or `None`.
336    """
337    return numpy.asscalar(numpy.max(self.values)) if self.values else default
338
339  def min(self, default=None):
340    """
341    Calculate the minimum value over the time series.
342
343    :param default: Value to return as a default should the calculation not be possible.
344    :return: Float representing the maximum value or `None`.
345    """
346    return numpy.asscalar(numpy.min(self.values)) if self.values else default
347
348  def percentile(self, n, default=None):
349    """
350    Calculate the Nth Percentile value over the time series.
351
352    :param int n: Integer value of the percentile to calculate.
353    :param default: Value to return as a default should the calculation not be possible.
354    :return: Float representing the Nth percentile value or `None`.
355    """
356    return numpy.asscalar(numpy.percentile(self.values, n)) if self.values else default
357
358  def stdev(self, default=None):
359    """
360    Calculate the standard deviation of the time series.
361
362    :param default: Value to return as a default should the calculation not be possible.
363    :return: Float representing the standard deviation value or `None`.
364    """
365    return numpy.asscalar(numpy.std(self.values)) if self.values else default
366
367  def sum(self, default=None):
368    """
369    Calculate the sum of all the values in the times series.
370
371    :param default: Value to return as a default should the calculation not be possible.
372    :return: Float representing the sum or `None`.
373    """
374    return numpy.asscalar(numpy.sum(self.values)) if self.values else default
375