1# coding=utf-8 2""" 3© 2014 LinkedIn Corp. All rights reserved. 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 7 8Unless required by applicable law or agreed to in writing, software 9distributed under the License is distributed on an "AS IS" BASIS, 10WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11""" 12from collections import defaultdict 13from copy import copy 14import math 15 16from luminol import exceptions 17from luminol.algorithms.anomaly_detector_algorithms import AnomalyDetectorAlgorithm 18from luminol.constants import * 19from luminol.modules.time_series import TimeSeries 20 21 22class BitmapDetector(AnomalyDetectorAlgorithm): 23 24 """ 25 Bitmap Algorithm. 26 This method breaks time series into chunks and uses the frequency of similar chunks 27 to determine anomaly scores. 28 The ideas are from this paper: 29 Assumption-Free Anomaly Detection in Time Series(http://alumni.cs.ucr.edu/~ratana/SSDBM05.pdf). 30 """ 31 def __init__(self, time_series, baseline_time_series=None, precision=None, lag_window_size=None, 32 future_window_size=None, chunk_size=None): 33 """ 34 Initializer 35 :param TimeSeries time_series: a TimeSeries object. 36 :param TimeSeries baseline_time_series: baseline TimeSeries. 37 :param int precision: how many sections to categorize values. 38 :param int lag_window_size: lagging window size. 39 :param int future_window_size: future window size. 40 :param int chunk_size: chunk size. 41 """ 42 super(BitmapDetector, self).__init__(self.__class__.__name__, time_series, baseline_time_series) 43 self.precision = precision if precision and precision > 0 else DEFAULT_BITMAP_PRECISION 44 self.chunk_size = chunk_size if chunk_size and chunk_size > 0 else DEFAULT_BITMAP_CHUNK_SIZE 45 if lag_window_size: 46 self.lag_window_size = lag_window_size 47 else: 48 self.lag_window_size = int(self.time_series_length * DEFAULT_BITMAP_LAGGING_WINDOW_SIZE_PCT) 49 if future_window_size: 50 self.future_window_size = future_window_size 51 else: 52 self.future_window_size = int(self.time_series_length * DEFAULT_BITMAP_LEADING_WINDOW_SIZE_PCT) 53 self._sanity_check() 54 55 def _sanity_check(self): 56 """ 57 Check if there are enough data points. 58 """ 59 windows = self.lag_window_size + self.future_window_size 60 if (not self.lag_window_size or not self.future_window_size 61 or self.time_series_length < windows or windows < DEFAULT_BITMAP_MINIMAL_POINTS_IN_WINDOWS): 62 raise exceptions.NotEnoughDataPoints 63 64 # If window size is too big, too many data points will be assigned a score of 0 in the first lag window 65 # and the last future window. 66 if self.lag_window_size > DEFAULT_BITMAP_MAXIMAL_POINTS_IN_WINDOWS: 67 self.lag_window_size = DEFAULT_BITMAP_MAXIMAL_POINTS_IN_WINDOWS 68 if self.future_window_size > DEFAULT_BITMAP_MAXIMAL_POINTS_IN_WINDOWS: 69 self.future_window_size = DEFAULT_BITMAP_MAXIMAL_POINTS_IN_WINDOWS 70 71 def _generate_SAX_single(self, sections, value): 72 """ 73 Generate SAX representation(Symbolic Aggregate approXimation) for a single data point. 74 Read more about it here: Assumption-Free Anomaly Detection in Time Series(http://alumni.cs.ucr.edu/~ratana/SSDBM05.pdf). 75 :param dict sections: value sections. 76 :param float value: value to be categorized. 77 :return str: a SAX representation. 78 """ 79 sax = 0 80 for section_number in sections.keys(): 81 section_lower_bound = sections[section_number] 82 if value >= section_lower_bound: 83 sax = section_number 84 else: 85 break 86 return str(sax) 87 88 def _generate_SAX(self): 89 """ 90 Generate SAX representation for all values of the time series. 91 """ 92 sections = {} 93 self.value_min = self.time_series.min() 94 self.value_max = self.time_series.max() 95 # Break the whole value range into different sections. 96 section_height = (self.value_max - self.value_min) / self.precision 97 for section_number in range(self.precision): 98 sections[section_number] = self.value_min + section_number * section_height 99 # Generate SAX representation. 100 self.sax = ''.join(self._generate_SAX_single(sections, value) for value in self.time_series.values) 101 102 def _construct_SAX_chunk_dict(self, sax): 103 """ 104 Form a chunk frequency dictionary from a SAX representation. 105 :param str sax: a SAX representation. 106 :return dict: frequency dictionary for chunks in the SAX representation. 107 """ 108 frequency = defaultdict(int) 109 chunk_size = self.chunk_size 110 length = len(sax) 111 for i in range(length): 112 if i + chunk_size <= length: 113 chunk = sax[i: i + chunk_size] 114 frequency[chunk] += 1 115 return frequency 116 117 def _construct_all_SAX_chunk_dict(self): 118 """ 119 Construct the chunk dicts for lagging window and future window at each index. 120 e.g: Suppose we have a SAX sequence as '1234567890', both window sizes are 3, and the chunk size is 2. 121 The first index that has a lagging window is 3. For index equals 3, the lagging window has sequence '123', 122 the chunk to leave lagging window(lw_leave_chunk) is '12', and the chunk to enter lagging window(lw_enter_chunk) is '34'. 123 Therefore, given chunk dicts at i, to compute chunk dicts at i+1, simply decrement the count for lw_leave_chunk, 124 and increment the count for lw_enter_chunk from chunk dicts at i. Same method applies to future window as well. 125 """ 126 lag_dicts = {} 127 fut_dicts = {} 128 length = self.time_series_length 129 lws = self.lag_window_size 130 fws = self.future_window_size 131 chunk_size = self.chunk_size 132 133 for i in range(length): 134 # If i is too small or too big, there will be no chunk dicts. 135 if i < lws or i > length - fws: 136 lag_dicts[i] = None 137 138 else: 139 # Just enter valid range. 140 if lag_dicts[i - 1] is None: 141 lag_dict = self._construct_SAX_chunk_dict(self.sax[i - lws: i]) 142 lag_dicts[i] = lag_dict 143 lw_leave_chunk = self.sax[0:chunk_size] 144 lw_enter_chunk = self.sax[i - chunk_size + 1: i + 1] 145 146 fut_dict = self._construct_SAX_chunk_dict(self.sax[i: i + fws]) 147 fut_dicts[i] = fut_dict 148 fw_leave_chunk = self.sax[i: i + chunk_size] 149 fw_enter_chunk = self.sax[i + fws + 1 - chunk_size: i + fws + 1] 150 151 else: 152 # Update dicts according to leave_chunks and enter_chunks. 153 lag_dict = copy(lag_dicts[i - 1]) 154 lag_dict[lw_leave_chunk] -= 1 155 lag_dict[lw_enter_chunk] += 1 156 lag_dicts[i] = lag_dict 157 158 fut_dict = copy(fut_dicts[i - 1]) 159 fut_dict[fw_leave_chunk] -= 1 160 fut_dict[fw_enter_chunk] += 1 161 fut_dicts[i] = fut_dict 162 163 # Updata leave_chunks and enter_chunks. 164 lw_leave_chunk = self.sax[i - lws: i - lws + chunk_size] 165 lw_enter_chunk = self.sax[i - chunk_size + 1: i + 1] 166 fw_leave_chunk = self.sax[i: i + chunk_size] 167 fw_enter_chunk = self.sax[i + fws + 1 - chunk_size: i + fws + 1] 168 169 self.lag_dicts = lag_dicts 170 self.fut_dicts = fut_dicts 171 172 def _compute_anom_score_between_two_windows(self, i): 173 """ 174 Compute distance difference between two windows' chunk frequencies, 175 which is then marked as the anomaly score of the data point on the window boundary in the middle. 176 :param int i: index of the data point between two windows. 177 :return float: the anomaly score. 178 """ 179 lag_window_chunk_dict = self.lag_dicts[i] 180 future_window_chunk_dict = self.fut_dicts[i] 181 score = 0 182 for chunk in lag_window_chunk_dict: 183 if chunk in future_window_chunk_dict: 184 score += math.pow(future_window_chunk_dict[chunk] - lag_window_chunk_dict[chunk], 2) 185 else: 186 score += math.pow(lag_window_chunk_dict[chunk], 2) 187 for chunk in future_window_chunk_dict: 188 if chunk not in lag_window_chunk_dict: 189 score += math.pow(future_window_chunk_dict[chunk], 2) 190 return score 191 192 def _set_scores(self): 193 """ 194 Compute anomaly scores for the time series by sliding both lagging window and future window. 195 """ 196 anom_scores = {} 197 self._generate_SAX() 198 self._construct_all_SAX_chunk_dict() 199 length = self.time_series_length 200 lws = self.lag_window_size 201 fws = self.future_window_size 202 203 for i, timestamp in enumerate(self.time_series.timestamps): 204 if i < lws or i > length - fws: 205 anom_scores[timestamp] = 0 206 else: 207 anom_scores[timestamp] = self._compute_anom_score_between_two_windows(i) 208 self.anom_scores = TimeSeries(self._denoise_scores(anom_scores)) 209