1# coding=utf-8 2""" 3© 2014 LinkedIn Corp. All rights reserved. 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 7 8Unless required by applicable law or agreed to in writing, software 9distributed under the License is distributed on an "AS IS" BASIS, 10WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11""" 12import numpy 13 14from luminol import utils 15from luminol.algorithms.anomaly_detector_algorithms import AnomalyDetectorAlgorithm 16from luminol.constants import * 17from luminol.modules.time_series import TimeSeries 18 19 20class DerivativeDetector(AnomalyDetectorAlgorithm): 21 22 ''' 23 Derivative Algorithm. 24 This method is the derivative version of Method 1. 25 Instead of data point value, it uses the derivative of the data point. 26 ''' 27 def __init__(self, time_series, baseline_time_series=None, smoothing_factor=None): 28 """ 29 Initializer 30 :param TimeSeries time_series: a TimeSeries object. 31 :param TimeSeries baseline_time_series: baseline TimeSeries. 32 :param float smoothing_factor: smoothing factor. 33 """ 34 super(DerivativeDetector, self).__init__(self.__class__.__name__, time_series, baseline_time_series) 35 self.smoothing_factor = (smoothing_factor or DEFAULT_DERI_SMOOTHING_FACTOR) 36 self.time_series_items = self.time_series.items() 37 38 def _compute_derivatives(self): 39 """ 40 Compute derivatives of the time series. 41 """ 42 derivatives = [] 43 for i, (timestamp, value) in enumerate(self.time_series_items): 44 if i > 0: 45 pre_item = self.time_series_items[i - 1] 46 pre_timestamp = pre_item[0] 47 pre_value = pre_item[1] 48 td = timestamp - pre_timestamp 49 derivative = (value - pre_value) / td if td != 0 else value - pre_value 50 derivative = abs(derivative) 51 derivatives.append(derivative) 52 # First timestamp is assigned the same derivative as the second timestamp. 53 if derivatives: 54 derivatives.insert(0, derivatives[0]) 55 self.derivatives = derivatives 56 57 def _set_scores(self): 58 """ 59 Compute anomaly scores for the time series. 60 """ 61 anom_scores = {} 62 self._compute_derivatives() 63 derivatives_ema = utils.compute_ema(self.smoothing_factor, self.derivatives) 64 for i, (timestamp, value) in enumerate(self.time_series_items): 65 anom_scores[timestamp] = abs(self.derivatives[i] - derivatives_ema[i]) 66 stdev = numpy.std(anom_scores.values()) 67 if stdev: 68 for timestamp in anom_scores.keys(): 69 anom_scores[timestamp] /= stdev 70 self.anom_scores = TimeSeries(self._denoise_scores(anom_scores)) 71