1# Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2#  This source code is licensed under both the GPLv2 (found in the
3#  COPYING file in the root directory) and Apache 2.0 License
4#  (found in the LICENSE.Apache file in the root directory).
5
6from abc import abstractmethod
7from advisor.db_log_parser import DataSource
8from enum import Enum
9import math
10
11
12NO_ENTITY = 'ENTITY_PLACEHOLDER'
13
14
15class TimeSeriesData(DataSource):
16    class Behavior(Enum):
17        bursty = 1
18        evaluate_expression = 2
19
20    class AggregationOperator(Enum):
21        avg = 1
22        max = 2
23        min = 3
24        latest = 4
25        oldest = 5
26
27    def __init__(self):
28        super().__init__(DataSource.Type.TIME_SERIES)
29        self.keys_ts = None  # Dict[entity, Dict[key, Dict[timestamp, value]]]
30        self.stats_freq_sec = None
31
32    @abstractmethod
33    def get_keys_from_conditions(self, conditions):
34        # This method takes in a list of time-series conditions; for each
35        # condition it manipulates the 'keys' in the way that is supported by
36        # the subclass implementing this method
37        pass
38
39    @abstractmethod
40    def fetch_timeseries(self, required_statistics):
41        # this method takes in a list of statistics and fetches the timeseries
42        # for each of them and populates the 'keys_ts' dictionary
43        pass
44
45    def fetch_burst_epochs(
46        self, entities, statistic, window_sec, threshold, percent
47    ):
48        # type: (str, int, float, bool) -> Dict[str, Dict[int, float]]
49        # this method calculates the (percent) rate change in the 'statistic'
50        # for each entity (over 'window_sec' seconds) and returns the epochs
51        # where this rate change is greater than or equal to the 'threshold'
52        # value
53        if self.stats_freq_sec == 0:
54            # not time series data, cannot check for bursty behavior
55            return
56        if window_sec < self.stats_freq_sec:
57            window_sec = self.stats_freq_sec
58        # 'window_samples' is the number of windows to go back to
59        # compare the current window with, while calculating rate change.
60        window_samples = math.ceil(window_sec / self.stats_freq_sec)
61        burst_epochs = {}
62        # if percent = False:
63        # curr_val = value at window for which rate change is being calculated
64        # prev_val = value at window that is window_samples behind curr_window
65        # Then rate_without_percent =
66        # ((curr_val-prev_val)*duration_sec)/(curr_timestamp-prev_timestamp)
67        # if percent = True:
68        # rate_with_percent = (rate_without_percent * 100) / prev_val
69        # These calculations are in line with the rate() transform supported
70        # by ODS
71        for entity in entities:
72            if statistic not in self.keys_ts[entity]:
73                continue
74            timestamps = sorted(list(self.keys_ts[entity][statistic].keys()))
75            for ix in range(window_samples, len(timestamps), 1):
76                first_ts = timestamps[ix - window_samples]
77                last_ts = timestamps[ix]
78                first_val = self.keys_ts[entity][statistic][first_ts]
79                last_val = self.keys_ts[entity][statistic][last_ts]
80                diff = last_val - first_val
81                if percent:
82                    diff = diff * 100 / first_val
83                rate = (diff * self.duration_sec) / (last_ts - first_ts)
84                # if the rate change is greater than the provided threshold,
85                # then the condition is triggered for entity at time 'last_ts'
86                if rate >= threshold:
87                    if entity not in burst_epochs:
88                        burst_epochs[entity] = {}
89                    burst_epochs[entity][last_ts] = rate
90        return burst_epochs
91
92    def fetch_aggregated_values(self, entity, statistics, aggregation_op):
93        # type: (str, AggregationOperator) -> Dict[str, float]
94        # this method performs the aggregation specified by 'aggregation_op'
95        # on the timeseries of 'statistics' for 'entity' and returns:
96        # Dict[statistic, aggregated_value]
97        result = {}
98        for stat in statistics:
99            if stat not in self.keys_ts[entity]:
100                continue
101            agg_val = None
102            if aggregation_op is self.AggregationOperator.latest:
103                latest_timestamp = max(list(self.keys_ts[entity][stat].keys()))
104                agg_val = self.keys_ts[entity][stat][latest_timestamp]
105            elif aggregation_op is self.AggregationOperator.oldest:
106                oldest_timestamp = min(list(self.keys_ts[entity][stat].keys()))
107                agg_val = self.keys_ts[entity][stat][oldest_timestamp]
108            elif aggregation_op is self.AggregationOperator.max:
109                agg_val = max(list(self.keys_ts[entity][stat].values()))
110            elif aggregation_op is self.AggregationOperator.min:
111                agg_val = min(list(self.keys_ts[entity][stat].values()))
112            elif aggregation_op is self.AggregationOperator.avg:
113                values = list(self.keys_ts[entity][stat].values())
114                agg_val = sum(values) / len(values)
115            result[stat] = agg_val
116        return result
117
118    def check_and_trigger_conditions(self, conditions):
119        # get the list of statistics that need to be fetched
120        reqd_keys = self.get_keys_from_conditions(conditions)
121        # fetch the required statistics and populate the map 'keys_ts'
122        self.fetch_timeseries(reqd_keys)
123        # Trigger the appropriate conditions
124        for cond in conditions:
125            complete_keys = self.get_keys_from_conditions([cond])
126            # Get the entities that have all statistics required by 'cond':
127            # an entity is checked for a given condition only if we possess all
128            # of the condition's 'keys' for that entity
129            entities_with_stats = []
130            for entity in self.keys_ts:
131                stat_missing = False
132                for stat in complete_keys:
133                    if stat not in self.keys_ts[entity]:
134                        stat_missing = True
135                        break
136                if not stat_missing:
137                    entities_with_stats.append(entity)
138            if not entities_with_stats:
139                continue
140            if cond.behavior is self.Behavior.bursty:
141                # for a condition that checks for bursty behavior, only one key
142                # should be present in the condition's 'keys' field
143                result = self.fetch_burst_epochs(
144                    entities_with_stats,
145                    complete_keys[0],  # there should be only one key
146                    cond.window_sec,
147                    cond.rate_threshold,
148                    True
149                )
150                # Trigger in this case is:
151                # Dict[entity_name, Dict[timestamp, rate_change]]
152                # where the inner dictionary contains rate_change values when
153                # the rate_change >= threshold provided, with the
154                # corresponding timestamps
155                if result:
156                    cond.set_trigger(result)
157            elif cond.behavior is self.Behavior.evaluate_expression:
158                self.handle_evaluate_expression(
159                    cond,
160                    complete_keys,
161                    entities_with_stats
162                )
163
164    def handle_evaluate_expression(self, condition, statistics, entities):
165        trigger = {}
166        # check 'condition' for each of these entities
167        for entity in entities:
168            if hasattr(condition, 'aggregation_op'):
169                # in this case, the aggregation operation is performed on each
170                # of the condition's 'keys' and then with aggregated values
171                # condition's 'expression' is evaluated; if it evaluates to
172                # True, then list of the keys values is added to the
173                # condition's trigger: Dict[entity_name, List[stats]]
174                result = self.fetch_aggregated_values(
175                        entity, statistics, condition.aggregation_op
176                )
177                keys = [result[key] for key in statistics]
178                try:
179                    if eval(condition.expression):
180                        trigger[entity] = keys
181                except Exception as e:
182                    print(
183                        'WARNING(TimeSeriesData) check_and_trigger: ' + str(e)
184                    )
185            else:
186                # assumption: all stats have same series of timestamps
187                # this is similar to the above but 'expression' is evaluated at
188                # each timestamp, since there is no aggregation, and all the
189                # epochs are added to the trigger when the condition's
190                # 'expression' evaluated to true; so trigger is:
191                # Dict[entity, Dict[timestamp, List[stats]]]
192                for epoch in self.keys_ts[entity][statistics[0]].keys():
193                    keys = [
194                        self.keys_ts[entity][key][epoch]
195                        for key in statistics
196                    ]
197                    try:
198                        if eval(condition.expression):
199                            if entity not in trigger:
200                                trigger[entity] = {}
201                            trigger[entity][epoch] = keys
202                    except Exception as e:
203                        print(
204                            'WARNING(TimeSeriesData) check_and_trigger: ' +
205                            str(e)
206                        )
207        if trigger:
208            condition.set_trigger(trigger)
209