1# coding=utf-8
2"""
3© 2014 LinkedIn Corp. All rights reserved.
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at  http://www.apache.org/licenses/LICENSE-2.0
7
8Unless required by applicable law or agreed to in writing, software
9distributed under the License is distributed on an "AS IS" BASIS,
10WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11"""
12from collections import defaultdict
13from copy import copy
14import math
15
16from luminol import exceptions
17from luminol.algorithms.anomaly_detector_algorithms import AnomalyDetectorAlgorithm
18from luminol.constants import *
19from luminol.modules.time_series import TimeSeries
20
21
22class BitmapDetector(AnomalyDetectorAlgorithm):
23
24  """
25  Bitmap Algorithm.
26  This method breaks time series into chunks and uses the frequency of similar chunks
27  to determine anomaly scores.
28  The ideas are from this paper:
29  Assumption-Free Anomaly Detection in Time Series(http://alumni.cs.ucr.edu/~ratana/SSDBM05.pdf).
30  """
31  def __init__(self, time_series, baseline_time_series=None, precision=None, lag_window_size=None,
32    future_window_size=None, chunk_size=None):
33    """
34    Initializer
35    :param TimeSeries time_series: a TimeSeries object.
36    :param TimeSeries baseline_time_series: baseline TimeSeries.
37    :param int precision: how many sections to categorize values.
38    :param int lag_window_size: lagging window size.
39    :param int future_window_size: future window size.
40    :param int chunk_size: chunk size.
41    """
42    super(BitmapDetector, self).__init__(self.__class__.__name__, time_series, baseline_time_series)
43    self.precision = precision if precision and precision > 0 else DEFAULT_BITMAP_PRECISION
44    self.chunk_size = chunk_size if chunk_size and chunk_size > 0 else DEFAULT_BITMAP_CHUNK_SIZE
45    if lag_window_size:
46      self.lag_window_size = lag_window_size
47    else:
48      self.lag_window_size = int(self.time_series_length * DEFAULT_BITMAP_LAGGING_WINDOW_SIZE_PCT)
49    if future_window_size:
50      self.future_window_size = future_window_size
51    else:
52      self.future_window_size = int(self.time_series_length * DEFAULT_BITMAP_LEADING_WINDOW_SIZE_PCT)
53    self._sanity_check()
54
55  def _sanity_check(self):
56    """
57    Check if there are enough data points.
58    """
59    windows = self.lag_window_size + self.future_window_size
60    if (not self.lag_window_size or not self.future_window_size
61      or self.time_series_length < windows or windows < DEFAULT_BITMAP_MINIMAL_POINTS_IN_WINDOWS):
62        raise exceptions.NotEnoughDataPoints
63
64    # If window size is too big, too many data points will be assigned a score of 0 in the first lag window
65    # and the last future window.
66    if self.lag_window_size > DEFAULT_BITMAP_MAXIMAL_POINTS_IN_WINDOWS:
67      self.lag_window_size = DEFAULT_BITMAP_MAXIMAL_POINTS_IN_WINDOWS
68    if self.future_window_size > DEFAULT_BITMAP_MAXIMAL_POINTS_IN_WINDOWS:
69      self.future_window_size = DEFAULT_BITMAP_MAXIMAL_POINTS_IN_WINDOWS
70
71  def _generate_SAX_single(self, sections, value):
72    """
73    Generate SAX representation(Symbolic Aggregate approXimation) for a single data point.
74    Read more about it here: Assumption-Free Anomaly Detection in Time Series(http://alumni.cs.ucr.edu/~ratana/SSDBM05.pdf).
75    :param dict sections: value sections.
76    :param float value: value to be categorized.
77    :return str: a SAX representation.
78    """
79    sax = 0
80    for section_number in sections.keys():
81      section_lower_bound = sections[section_number]
82      if value >= section_lower_bound:
83        sax = section_number
84      else:
85        break
86    return str(sax)
87
88  def _generate_SAX(self):
89    """
90    Generate SAX representation for all values of the time series.
91    """
92    sections = {}
93    self.value_min = self.time_series.min()
94    self.value_max = self.time_series.max()
95    # Break the whole value range into different sections.
96    section_height = (self.value_max - self.value_min) / self.precision
97    for section_number in range(self.precision):
98      sections[section_number] = self.value_min + section_number * section_height
99    # Generate SAX representation.
100    self.sax = ''.join(self._generate_SAX_single(sections, value) for value in self.time_series.values)
101
102  def _construct_SAX_chunk_dict(self, sax):
103    """
104    Form a chunk frequency dictionary from a SAX representation.
105    :param str sax: a SAX representation.
106    :return dict: frequency dictionary for chunks in the SAX representation.
107    """
108    frequency = defaultdict(int)
109    chunk_size = self.chunk_size
110    length = len(sax)
111    for i in range(length):
112      if i + chunk_size <= length:
113        chunk = sax[i: i + chunk_size]
114        frequency[chunk] += 1
115    return frequency
116
117  def _construct_all_SAX_chunk_dict(self):
118    """
119    Construct the chunk dicts for lagging window and future window at each index.
120     e.g: Suppose we have a SAX sequence as '1234567890', both window sizes are 3, and the chunk size is 2.
121     The first index that has a lagging window is 3. For index equals 3, the lagging window has sequence '123',
122     the chunk to leave lagging window(lw_leave_chunk) is '12', and the chunk to enter lagging window(lw_enter_chunk) is '34'.
123     Therefore, given chunk dicts at i, to compute chunk dicts at i+1, simply decrement the count for lw_leave_chunk,
124     and increment the count for lw_enter_chunk from chunk dicts at i. Same method applies to future window as well.
125    """
126    lag_dicts = {}
127    fut_dicts = {}
128    length = self.time_series_length
129    lws = self.lag_window_size
130    fws = self.future_window_size
131    chunk_size = self.chunk_size
132
133    for i in range(length):
134      # If i is too small or too big, there will be no chunk dicts.
135      if i < lws or i > length - fws:
136        lag_dicts[i] = None
137
138      else:
139        # Just enter valid range.
140        if lag_dicts[i - 1] is None:
141          lag_dict = self._construct_SAX_chunk_dict(self.sax[i - lws: i])
142          lag_dicts[i] = lag_dict
143          lw_leave_chunk = self.sax[0:chunk_size]
144          lw_enter_chunk = self.sax[i - chunk_size + 1: i + 1]
145
146          fut_dict = self._construct_SAX_chunk_dict(self.sax[i: i + fws])
147          fut_dicts[i] = fut_dict
148          fw_leave_chunk = self.sax[i: i + chunk_size]
149          fw_enter_chunk = self.sax[i + fws + 1 - chunk_size: i + fws + 1]
150
151        else:
152          # Update dicts according to leave_chunks and enter_chunks.
153          lag_dict = copy(lag_dicts[i - 1])
154          lag_dict[lw_leave_chunk] -= 1
155          lag_dict[lw_enter_chunk] += 1
156          lag_dicts[i] = lag_dict
157
158          fut_dict = copy(fut_dicts[i - 1])
159          fut_dict[fw_leave_chunk] -= 1
160          fut_dict[fw_enter_chunk] += 1
161          fut_dicts[i] = fut_dict
162
163          # Updata leave_chunks and enter_chunks.
164          lw_leave_chunk = self.sax[i - lws: i - lws + chunk_size]
165          lw_enter_chunk = self.sax[i - chunk_size + 1: i + 1]
166          fw_leave_chunk = self.sax[i: i + chunk_size]
167          fw_enter_chunk = self.sax[i + fws + 1 - chunk_size: i + fws + 1]
168
169    self.lag_dicts = lag_dicts
170    self.fut_dicts = fut_dicts
171
172  def _compute_anom_score_between_two_windows(self, i):
173    """
174    Compute distance difference between two windows' chunk frequencies,
175    which is then marked as the anomaly score of the data point on the window boundary in the middle.
176    :param int i: index of the data point between two windows.
177    :return float: the anomaly score.
178    """
179    lag_window_chunk_dict = self.lag_dicts[i]
180    future_window_chunk_dict = self.fut_dicts[i]
181    score = 0
182    for chunk in lag_window_chunk_dict:
183      if chunk in future_window_chunk_dict:
184        score += math.pow(future_window_chunk_dict[chunk] - lag_window_chunk_dict[chunk], 2)
185      else:
186        score += math.pow(lag_window_chunk_dict[chunk], 2)
187    for chunk in future_window_chunk_dict:
188      if chunk not in lag_window_chunk_dict:
189        score += math.pow(future_window_chunk_dict[chunk], 2)
190    return score
191
192  def _set_scores(self):
193    """
194    Compute anomaly scores for the time series by sliding both lagging window and future window.
195    """
196    anom_scores = {}
197    self._generate_SAX()
198    self._construct_all_SAX_chunk_dict()
199    length = self.time_series_length
200    lws = self.lag_window_size
201    fws = self.future_window_size
202
203    for i, timestamp in enumerate(self.time_series.timestamps):
204      if i < lws or i > length - fws:
205        anom_scores[timestamp] = 0
206      else:
207        anom_scores[timestamp] = self._compute_anom_score_between_two_windows(i)
208    self.anom_scores = TimeSeries(self._denoise_scores(anom_scores))
209