1# coding=utf-8 2""" 3© 2014 LinkedIn Corp. All rights reserved. 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 7 8Unless required by applicable law or agreed to in writing, software 9distributed under the License is distributed on an "AS IS" BASIS, 10WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11""" 12import numpy 13 14 15class TimeSeries(object): 16 17 def __init__(self, series): 18 self.timestamps = [] 19 self.values = [] 20 21 # Clean the time series by removing null values. 22 for ts in sorted(series): 23 if series[ts] is not None: 24 self.timestamps.append(int(ts)) 25 self.values.append(float(series[ts])) 26 27 @property 28 def start(self): 29 """ 30 Return the earliest timestamp in the time series. 31 """ 32 return min(self.timestamps) if self.timestamps else None 33 34 @property 35 def end(self): 36 """ 37 Return the latest timestamp in the time series. 38 """ 39 return max(self.timestamps) if self.timestamps else None 40 41 @property 42 def timestamps_ms(self): 43 """ 44 Return list of timestamp values in order by milliseconds since epoch. 45 """ 46 return map(lambda ts: ts * 1000, self.timestamps) 47 48 def __repr__(self): 49 return 'TimeSeries<start={0}, end={1}>'.format(repr(self.start), repr(self.end)) 50 51 def __str__(self): 52 """ 53 :return string: Return string representation of time series 54 """ 55 string_rep = '' 56 for item in self.iteritems(): 57 string_rep += str(item) 58 return string_rep 59 60 def __nonzero__(self): 61 return len(self.timestamps) > 0 62 63 def __getitem__(self, key): 64 if key in self.timestamps: 65 pos = self.timestamps.index(key) 66 return self.values[pos] 67 else: 68 raise ValueError('Timestamp does not exist in TimeSeries object') 69 70 def __setitem__(self, key, val): 71 if key in self.timestamps: 72 pos = self.timestamps.index(key) 73 if val is None: 74 del self.timestamps[pos] 75 del self.values[pos] 76 else: 77 self.values[pos] = val 78 else: 79 self.timestamps = sorted(self.timestamps + [key]) 80 pos = self.timestamps.index(key) 81 self.values.insert(pos, val) 82 83 def __delitem__(self, key): 84 if key in self.timestamps: 85 pos = self.timestamps.index(key) 86 del self.timestamps[pos] 87 del self.values[pos] 88 89 def __contains__(self, item): 90 return item in self.timestamps 91 92 def __iter__(self): 93 for key in self.timestamps: 94 yield key 95 96 def __len__(self): 97 return len(self.timestamps) 98 99 def __eq__(self, other): 100 if len(self.timestamps) != len(other.timestamps): 101 return False 102 103 for pos, ts in enumerate(self.timestamps): 104 if ts != other.timestamps[pos] or self.values[pos] != other.values[pos]: 105 return False 106 else: 107 return True 108 109 def __add__(self, other): 110 return self._generic_binary_op(other, self._get_value_type(other).__add__) 111 112 def __sub__(self, other): 113 return self._generic_binary_op(other, self._get_value_type(other).__sub__) 114 115 def __mul__(self, other): 116 return self._generic_binary_op(other, self._get_value_type(other).__mul__) 117 118 def __div__(self, other): 119 return self._generic_binary_op(other, self._get_value_type(other).__div__) 120 121 __radd__ = __add__ 122 __rmul__ = __mul__ 123 124 def __rsub__(self, other): 125 return self._generic_binary_op(other, self._get_value_type(other).__rsub__) 126 127 def __rdiv__(self, other): 128 return self._generic_binary_op(other, self._get_value_type(other).__rdiv__) 129 130 def items(self): 131 return [(ts, self.values[pos]) for pos, ts in enumerate(self.timestamps)] 132 133 def iterkeys(self): 134 for key in self.timestamps: 135 yield key 136 137 def itervalues(self): 138 for value in self.values: 139 yield value 140 141 def iteritems(self): 142 for item in self.items(): 143 yield item 144 145 def iteritems_silent(self): 146 for item in self.items(): 147 yield item 148 yield None 149 150 def _generic_binary_op(self, other, op): 151 """ 152 Perform the method operation specified in the op parameter on the values 153 within the instance's time series values and either another time series 154 or a constant number value. 155 156 :param other: Time series of values or a constant number to use in calculations with instance's time series. 157 :param func op: The method to perform the calculation between the values. 158 :return: :class:`TimeSeries` object. 159 """ 160 output = {} 161 if isinstance(other, TimeSeries): 162 for key, value in self.items(): 163 if key in other: 164 try: 165 result = op(value, other[key]) 166 if result is NotImplemented: 167 other_type = type(other[key]) 168 other_op = vars(other_type).get(op.__name__) 169 if other_op: 170 output[key] = other_op(other_type(value), other[key]) 171 else: 172 output[key] = result 173 except ZeroDivisionError: 174 continue 175 else: 176 for key, value in self.items(): 177 try: 178 result = op(value, other) 179 if result is NotImplemented: 180 other_type = type(other) 181 other_op = vars(other_type).get(op.__name__) 182 if other_op: 183 output[key] = other_op(other_type(value), other) 184 else: 185 output[key] = result 186 except ZeroDivisionError: 187 continue 188 189 if output: 190 return TimeSeries(output) 191 else: 192 raise ValueError('TimeSeries data was empty or invalid.') 193 194 def _get_value_type(self, other): 195 """ 196 Get the object type of the value within the values portion of the time series. 197 198 :return: `type` of object 199 """ 200 if self.values: 201 return type(self.values[0]) 202 elif isinstance(other, TimeSeries) and other.values: 203 return type(other.values[0]) 204 else: 205 raise ValueError('Cannot perform arithmetic on empty time series.') 206 207 def align(self, other): 208 """ 209 Align two time series so that len(self) == len(other) and self.timstamps == other.timestamps. 210 211 :return: :tuple:(`TimeSeries` object(the aligned self), `TimeSeries` object(the aligned other)) 212 """ 213 if isinstance(other, TimeSeries): 214 aligned, other_aligned = {}, {} 215 i, other_i = self.iteritems_silent(), other.iteritems_silent() 216 item, other_item = i.next(), other_i.next() 217 218 while item and other_item: 219 # Unpack timestamps and values. 220 timestamp, value = item 221 other_timestamp, other_value = other_item 222 if timestamp == other_timestamp: 223 aligned[timestamp] = value 224 other_aligned[other_timestamp] = other_value 225 item = i.next() 226 other_item = other_i.next() 227 elif timestamp < other_timestamp: 228 aligned[timestamp] = value 229 other_aligned[timestamp] = other_value 230 item = i.next() 231 else: 232 aligned[other_timestamp] = value 233 other_aligned[other_timestamp] = other_value 234 other_item = other_i.next() 235 # Align remaining items. 236 while item: 237 timestamp, value = item 238 aligned[timestamp] = value 239 other_aligned[timestamp] = other.values[-1] 240 item = i.next() 241 while other_item: 242 other_timestamp, other_value = other_item 243 aligned[other_timestamp] = self.values[-1] 244 other_aligned[other_timestamp] = other_value 245 other_item = other_i.next() 246 return TimeSeries(aligned), TimeSeries(other_aligned) 247 248 def smooth(self, smoothing_factor): 249 """ 250 return a new time series which is a exponential smoothed version of the original data series. 251 soomth forward once, backward once, and then take the average. 252 253 :param float smoothing_factor: smoothing factor 254 :return: :class:`TimeSeries` object. 255 """ 256 forward_smooth = {} 257 backward_smooth = {} 258 output = {} 259 260 if self: 261 pre = self.values[0] 262 next = self.values[-1] 263 for key, value in self.items(): 264 forward_smooth[key] = smoothing_factor * pre + (1 - smoothing_factor) * value 265 pre = forward_smooth[key] 266 for key, value in reversed(self.items()): 267 backward_smooth[key] = smoothing_factor * next + (1 - smoothing_factor) * value 268 next = backward_smooth[key] 269 for key in forward_smooth.keys(): 270 output[key] = (forward_smooth[key] + backward_smooth[key]) / 2 271 272 return TimeSeries(output) 273 274 def add_offset(self, offset): 275 """ 276 Return a new time series with all timestamps incremented by some offset. 277 278 :param int offset: The number of seconds to offset the time series. 279 :return: `None` 280 """ 281 self.timestamps = map(lambda ts: ts + offset, self.timestamps) 282 283 def normalize(self): 284 """ 285 Return a new time series with all values normalized to 0 to 1. 286 287 :return: `None` 288 """ 289 maximum = self.max() 290 if maximum: 291 self.values = map(lambda value: value / maximum, self.values) 292 293 def crop(self, start_timestamp, end_timestamp): 294 """ 295 Return a new TimeSeries object contains all the timstamps and values within 296 the specified range. 297 298 :param int start_timestamp: the start timestamp value 299 :param int end_timestamp: the end timestamp value 300 :return: :class:`TimeSeries` object. 301 """ 302 output = {} 303 for key, value in self.items(): 304 if key >= start_timestamp and key <= end_timestamp: 305 output[key] = value 306 307 if output: 308 return TimeSeries(output) 309 else: 310 raise ValueError('TimeSeries data was empty or invalid.') 311 312 def average(self, default=None): 313 """ 314 Calculate the average value over the time series. 315 316 :param default: Value to return as a default should the calculation not be possible. 317 :return: Float representing the average value or `None`. 318 """ 319 return numpy.asscalar(numpy.average(self.values)) if self.values else default 320 321 def median(self, default=None): 322 """ 323 Calculate the median value over the time series. 324 325 :param default: Value to return as a default should the calculation not be possible. 326 :return: Float representing the median value or `None`. 327 """ 328 return numpy.asscalar(numpy.median(self.values)) if self.values else default 329 330 def max(self, default=None): 331 """ 332 Calculate the maximum value over the time series. 333 334 :param default: Value to return as a default should the calculation not be possible. 335 :return: Float representing the maximum value or `None`. 336 """ 337 return numpy.asscalar(numpy.max(self.values)) if self.values else default 338 339 def min(self, default=None): 340 """ 341 Calculate the minimum value over the time series. 342 343 :param default: Value to return as a default should the calculation not be possible. 344 :return: Float representing the maximum value or `None`. 345 """ 346 return numpy.asscalar(numpy.min(self.values)) if self.values else default 347 348 def percentile(self, n, default=None): 349 """ 350 Calculate the Nth Percentile value over the time series. 351 352 :param int n: Integer value of the percentile to calculate. 353 :param default: Value to return as a default should the calculation not be possible. 354 :return: Float representing the Nth percentile value or `None`. 355 """ 356 return numpy.asscalar(numpy.percentile(self.values, n)) if self.values else default 357 358 def stdev(self, default=None): 359 """ 360 Calculate the standard deviation of the time series. 361 362 :param default: Value to return as a default should the calculation not be possible. 363 :return: Float representing the standard deviation value or `None`. 364 """ 365 return numpy.asscalar(numpy.std(self.values)) if self.values else default 366 367 def sum(self, default=None): 368 """ 369 Calculate the sum of all the values in the times series. 370 371 :param default: Value to return as a default should the calculation not be possible. 372 :return: Float representing the sum or `None`. 373 """ 374 return numpy.asscalar(numpy.sum(self.values)) if self.values else default 375