1# Copyright (c) 2011-present, Facebook, Inc. All rights reserved. 2# This source code is licensed under both the GPLv2 (found in the 3# COPYING file in the root directory) and Apache 2.0 License 4# (found in the LICENSE.Apache file in the root directory). 5 6from abc import abstractmethod 7from advisor.db_log_parser import DataSource 8from enum import Enum 9import math 10 11 12NO_ENTITY = 'ENTITY_PLACEHOLDER' 13 14 15class TimeSeriesData(DataSource): 16 class Behavior(Enum): 17 bursty = 1 18 evaluate_expression = 2 19 20 class AggregationOperator(Enum): 21 avg = 1 22 max = 2 23 min = 3 24 latest = 4 25 oldest = 5 26 27 def __init__(self): 28 super().__init__(DataSource.Type.TIME_SERIES) 29 self.keys_ts = None # Dict[entity, Dict[key, Dict[timestamp, value]]] 30 self.stats_freq_sec = None 31 32 @abstractmethod 33 def get_keys_from_conditions(self, conditions): 34 # This method takes in a list of time-series conditions; for each 35 # condition it manipulates the 'keys' in the way that is supported by 36 # the subclass implementing this method 37 pass 38 39 @abstractmethod 40 def fetch_timeseries(self, required_statistics): 41 # this method takes in a list of statistics and fetches the timeseries 42 # for each of them and populates the 'keys_ts' dictionary 43 pass 44 45 def fetch_burst_epochs( 46 self, entities, statistic, window_sec, threshold, percent 47 ): 48 # type: (str, int, float, bool) -> Dict[str, Dict[int, float]] 49 # this method calculates the (percent) rate change in the 'statistic' 50 # for each entity (over 'window_sec' seconds) and returns the epochs 51 # where this rate change is greater than or equal to the 'threshold' 52 # value 53 if self.stats_freq_sec == 0: 54 # not time series data, cannot check for bursty behavior 55 return 56 if window_sec < self.stats_freq_sec: 57 window_sec = self.stats_freq_sec 58 # 'window_samples' is the number of windows to go back to 59 # compare the current window with, while calculating rate change. 60 window_samples = math.ceil(window_sec / self.stats_freq_sec) 61 burst_epochs = {} 62 # if percent = False: 63 # curr_val = value at window for which rate change is being calculated 64 # prev_val = value at window that is window_samples behind curr_window 65 # Then rate_without_percent = 66 # ((curr_val-prev_val)*duration_sec)/(curr_timestamp-prev_timestamp) 67 # if percent = True: 68 # rate_with_percent = (rate_without_percent * 100) / prev_val 69 # These calculations are in line with the rate() transform supported 70 # by ODS 71 for entity in entities: 72 if statistic not in self.keys_ts[entity]: 73 continue 74 timestamps = sorted(list(self.keys_ts[entity][statistic].keys())) 75 for ix in range(window_samples, len(timestamps), 1): 76 first_ts = timestamps[ix - window_samples] 77 last_ts = timestamps[ix] 78 first_val = self.keys_ts[entity][statistic][first_ts] 79 last_val = self.keys_ts[entity][statistic][last_ts] 80 diff = last_val - first_val 81 if percent: 82 diff = diff * 100 / first_val 83 rate = (diff * self.duration_sec) / (last_ts - first_ts) 84 # if the rate change is greater than the provided threshold, 85 # then the condition is triggered for entity at time 'last_ts' 86 if rate >= threshold: 87 if entity not in burst_epochs: 88 burst_epochs[entity] = {} 89 burst_epochs[entity][last_ts] = rate 90 return burst_epochs 91 92 def fetch_aggregated_values(self, entity, statistics, aggregation_op): 93 # type: (str, AggregationOperator) -> Dict[str, float] 94 # this method performs the aggregation specified by 'aggregation_op' 95 # on the timeseries of 'statistics' for 'entity' and returns: 96 # Dict[statistic, aggregated_value] 97 result = {} 98 for stat in statistics: 99 if stat not in self.keys_ts[entity]: 100 continue 101 agg_val = None 102 if aggregation_op is self.AggregationOperator.latest: 103 latest_timestamp = max(list(self.keys_ts[entity][stat].keys())) 104 agg_val = self.keys_ts[entity][stat][latest_timestamp] 105 elif aggregation_op is self.AggregationOperator.oldest: 106 oldest_timestamp = min(list(self.keys_ts[entity][stat].keys())) 107 agg_val = self.keys_ts[entity][stat][oldest_timestamp] 108 elif aggregation_op is self.AggregationOperator.max: 109 agg_val = max(list(self.keys_ts[entity][stat].values())) 110 elif aggregation_op is self.AggregationOperator.min: 111 agg_val = min(list(self.keys_ts[entity][stat].values())) 112 elif aggregation_op is self.AggregationOperator.avg: 113 values = list(self.keys_ts[entity][stat].values()) 114 agg_val = sum(values) / len(values) 115 result[stat] = agg_val 116 return result 117 118 def check_and_trigger_conditions(self, conditions): 119 # get the list of statistics that need to be fetched 120 reqd_keys = self.get_keys_from_conditions(conditions) 121 # fetch the required statistics and populate the map 'keys_ts' 122 self.fetch_timeseries(reqd_keys) 123 # Trigger the appropriate conditions 124 for cond in conditions: 125 complete_keys = self.get_keys_from_conditions([cond]) 126 # Get the entities that have all statistics required by 'cond': 127 # an entity is checked for a given condition only if we possess all 128 # of the condition's 'keys' for that entity 129 entities_with_stats = [] 130 for entity in self.keys_ts: 131 stat_missing = False 132 for stat in complete_keys: 133 if stat not in self.keys_ts[entity]: 134 stat_missing = True 135 break 136 if not stat_missing: 137 entities_with_stats.append(entity) 138 if not entities_with_stats: 139 continue 140 if cond.behavior is self.Behavior.bursty: 141 # for a condition that checks for bursty behavior, only one key 142 # should be present in the condition's 'keys' field 143 result = self.fetch_burst_epochs( 144 entities_with_stats, 145 complete_keys[0], # there should be only one key 146 cond.window_sec, 147 cond.rate_threshold, 148 True 149 ) 150 # Trigger in this case is: 151 # Dict[entity_name, Dict[timestamp, rate_change]] 152 # where the inner dictionary contains rate_change values when 153 # the rate_change >= threshold provided, with the 154 # corresponding timestamps 155 if result: 156 cond.set_trigger(result) 157 elif cond.behavior is self.Behavior.evaluate_expression: 158 self.handle_evaluate_expression( 159 cond, 160 complete_keys, 161 entities_with_stats 162 ) 163 164 def handle_evaluate_expression(self, condition, statistics, entities): 165 trigger = {} 166 # check 'condition' for each of these entities 167 for entity in entities: 168 if hasattr(condition, 'aggregation_op'): 169 # in this case, the aggregation operation is performed on each 170 # of the condition's 'keys' and then with aggregated values 171 # condition's 'expression' is evaluated; if it evaluates to 172 # True, then list of the keys values is added to the 173 # condition's trigger: Dict[entity_name, List[stats]] 174 result = self.fetch_aggregated_values( 175 entity, statistics, condition.aggregation_op 176 ) 177 keys = [result[key] for key in statistics] 178 try: 179 if eval(condition.expression): 180 trigger[entity] = keys 181 except Exception as e: 182 print( 183 'WARNING(TimeSeriesData) check_and_trigger: ' + str(e) 184 ) 185 else: 186 # assumption: all stats have same series of timestamps 187 # this is similar to the above but 'expression' is evaluated at 188 # each timestamp, since there is no aggregation, and all the 189 # epochs are added to the trigger when the condition's 190 # 'expression' evaluated to true; so trigger is: 191 # Dict[entity, Dict[timestamp, List[stats]]] 192 for epoch in self.keys_ts[entity][statistics[0]].keys(): 193 keys = [ 194 self.keys_ts[entity][key][epoch] 195 for key in statistics 196 ] 197 try: 198 if eval(condition.expression): 199 if entity not in trigger: 200 trigger[entity] = {} 201 trigger[entity][epoch] = keys 202 except Exception as e: 203 print( 204 'WARNING(TimeSeriesData) check_and_trigger: ' + 205 str(e) 206 ) 207 if trigger: 208 condition.set_trigger(trigger) 209