1# Copyright (c) 2011-present, Facebook, Inc. All rights reserved. 2# This source code is licensed under both the GPLv2 (found in the 3# COPYING file in the root directory) and Apache 2.0 License 4# (found in the LICENSE.Apache file in the root directory). 5 6from abc import ABC, abstractmethod 7from advisor.db_log_parser import DataSource, NO_COL_FAMILY 8from advisor.db_timeseries_parser import TimeSeriesData 9from enum import Enum 10from advisor.ini_parser import IniParser 11import re 12 13 14class Section(ABC): 15 def __init__(self, name): 16 self.name = name 17 18 @abstractmethod 19 def set_parameter(self, key, value): 20 pass 21 22 @abstractmethod 23 def perform_checks(self): 24 pass 25 26 27class Rule(Section): 28 def __init__(self, name): 29 super().__init__(name) 30 self.conditions = None 31 self.suggestions = None 32 self.overlap_time_seconds = None 33 self.trigger_entities = None 34 self.trigger_column_families = None 35 36 def set_parameter(self, key, value): 37 # If the Rule is associated with a single suggestion/condition, then 38 # value will be a string and not a list. Hence, convert it to a single 39 # element list before storing it in self.suggestions or 40 # self.conditions. 41 if key == 'conditions': 42 if isinstance(value, str): 43 self.conditions = [value] 44 else: 45 self.conditions = value 46 elif key == 'suggestions': 47 if isinstance(value, str): 48 self.suggestions = [value] 49 else: 50 self.suggestions = value 51 elif key == 'overlap_time_period': 52 self.overlap_time_seconds = value 53 54 def get_suggestions(self): 55 return self.suggestions 56 57 def perform_checks(self): 58 if not self.conditions or len(self.conditions) < 1: 59 raise ValueError( 60 self.name + ': rule must have at least one condition' 61 ) 62 if not self.suggestions or len(self.suggestions) < 1: 63 raise ValueError( 64 self.name + ': rule must have at least one suggestion' 65 ) 66 if self.overlap_time_seconds: 67 if len(self.conditions) != 2: 68 raise ValueError( 69 self.name + ": rule must be associated with 2 conditions\ 70 in order to check for a time dependency between them" 71 ) 72 time_format = '^\d+[s|m|h|d]$' 73 if ( 74 not 75 re.match(time_format, self.overlap_time_seconds, re.IGNORECASE) 76 ): 77 raise ValueError( 78 self.name + ": overlap_time_seconds format: \d+[s|m|h|d]" 79 ) 80 else: # convert to seconds 81 in_seconds = int(self.overlap_time_seconds[:-1]) 82 if self.overlap_time_seconds[-1] == 'm': 83 in_seconds *= 60 84 elif self.overlap_time_seconds[-1] == 'h': 85 in_seconds *= (60 * 60) 86 elif self.overlap_time_seconds[-1] == 'd': 87 in_seconds *= (24 * 60 * 60) 88 self.overlap_time_seconds = in_seconds 89 90 def get_overlap_timestamps(self, key1_trigger_epochs, key2_trigger_epochs): 91 # this method takes in 2 timeseries i.e. timestamps at which the 92 # rule's 2 TIME_SERIES conditions were triggered and it finds 93 # (if present) the first pair of timestamps at which the 2 conditions 94 # were triggered within 'overlap_time_seconds' of each other 95 key1_lower_bounds = [ 96 epoch - self.overlap_time_seconds 97 for epoch in key1_trigger_epochs 98 ] 99 key1_lower_bounds.sort() 100 key2_trigger_epochs.sort() 101 trigger_ix = 0 102 overlap_pair = None 103 for key1_lb in key1_lower_bounds: 104 while ( 105 key2_trigger_epochs[trigger_ix] < key1_lb and 106 trigger_ix < len(key2_trigger_epochs) 107 ): 108 trigger_ix += 1 109 if trigger_ix >= len(key2_trigger_epochs): 110 break 111 if ( 112 key2_trigger_epochs[trigger_ix] <= 113 key1_lb + (2 * self.overlap_time_seconds) 114 ): 115 overlap_pair = ( 116 key2_trigger_epochs[trigger_ix], 117 key1_lb + self.overlap_time_seconds 118 ) 119 break 120 return overlap_pair 121 122 def get_trigger_entities(self): 123 return self.trigger_entities 124 125 def get_trigger_column_families(self): 126 return self.trigger_column_families 127 128 def is_triggered(self, conditions_dict, column_families): 129 if self.overlap_time_seconds: 130 condition1 = conditions_dict[self.conditions[0]] 131 condition2 = conditions_dict[self.conditions[1]] 132 if not ( 133 condition1.get_data_source() is DataSource.Type.TIME_SERIES and 134 condition2.get_data_source() is DataSource.Type.TIME_SERIES 135 ): 136 raise ValueError(self.name + ': need 2 timeseries conditions') 137 138 map1 = condition1.get_trigger() 139 map2 = condition2.get_trigger() 140 if not (map1 and map2): 141 return False 142 143 self.trigger_entities = {} 144 is_triggered = False 145 entity_intersection = ( 146 set(map1.keys()).intersection(set(map2.keys())) 147 ) 148 for entity in entity_intersection: 149 overlap_timestamps_pair = ( 150 self.get_overlap_timestamps( 151 list(map1[entity].keys()), list(map2[entity].keys()) 152 ) 153 ) 154 if overlap_timestamps_pair: 155 self.trigger_entities[entity] = overlap_timestamps_pair 156 is_triggered = True 157 if is_triggered: 158 self.trigger_column_families = set(column_families) 159 return is_triggered 160 else: 161 all_conditions_triggered = True 162 self.trigger_column_families = set(column_families) 163 for cond_name in self.conditions: 164 cond = conditions_dict[cond_name] 165 if not cond.get_trigger(): 166 all_conditions_triggered = False 167 break 168 if ( 169 cond.get_data_source() is DataSource.Type.LOG or 170 cond.get_data_source() is DataSource.Type.DB_OPTIONS 171 ): 172 cond_col_fam = set(cond.get_trigger().keys()) 173 if NO_COL_FAMILY in cond_col_fam: 174 cond_col_fam = set(column_families) 175 self.trigger_column_families = ( 176 self.trigger_column_families.intersection(cond_col_fam) 177 ) 178 elif cond.get_data_source() is DataSource.Type.TIME_SERIES: 179 cond_entities = set(cond.get_trigger().keys()) 180 if self.trigger_entities is None: 181 self.trigger_entities = cond_entities 182 else: 183 self.trigger_entities = ( 184 self.trigger_entities.intersection(cond_entities) 185 ) 186 if not (self.trigger_entities or self.trigger_column_families): 187 all_conditions_triggered = False 188 break 189 if not all_conditions_triggered: # clean up if rule not triggered 190 self.trigger_column_families = None 191 self.trigger_entities = None 192 return all_conditions_triggered 193 194 def __repr__(self): 195 # Append conditions 196 rule_string = "Rule: " + self.name + " has conditions:: " 197 is_first = True 198 for cond in self.conditions: 199 if is_first: 200 rule_string += cond 201 is_first = False 202 else: 203 rule_string += (" AND " + cond) 204 # Append suggestions 205 rule_string += "\nsuggestions:: " 206 is_first = True 207 for sugg in self.suggestions: 208 if is_first: 209 rule_string += sugg 210 is_first = False 211 else: 212 rule_string += (", " + sugg) 213 if self.trigger_entities: 214 rule_string += (', entities:: ' + str(self.trigger_entities)) 215 if self.trigger_column_families: 216 rule_string += (', col_fam:: ' + str(self.trigger_column_families)) 217 # Return constructed string 218 return rule_string 219 220 221class Suggestion(Section): 222 class Action(Enum): 223 set = 1 224 increase = 2 225 decrease = 3 226 227 def __init__(self, name): 228 super().__init__(name) 229 self.option = None 230 self.action = None 231 self.suggested_values = None 232 self.description = None 233 234 def set_parameter(self, key, value): 235 if key == 'option': 236 # Note: 237 # case 1: 'option' is supported by Rocksdb OPTIONS file; in this 238 # case the option belongs to one of the sections in the config 239 # file and it's name is prefixed by "<section_type>." 240 # case 2: 'option' is not supported by Rocksdb OPTIONS file; the 241 # option is not expected to have the character '.' in its name 242 self.option = value 243 elif key == 'action': 244 if self.option and not value: 245 raise ValueError(self.name + ': provide action for option') 246 self.action = self.Action[value] 247 elif key == 'suggested_values': 248 if isinstance(value, str): 249 self.suggested_values = [value] 250 else: 251 self.suggested_values = value 252 elif key == 'description': 253 self.description = value 254 255 def perform_checks(self): 256 if not self.description: 257 if not self.option: 258 raise ValueError(self.name + ': provide option or description') 259 if not self.action: 260 raise ValueError(self.name + ': provide action for option') 261 if self.action is self.Action.set and not self.suggested_values: 262 raise ValueError( 263 self.name + ': provide suggested value for option' 264 ) 265 266 def __repr__(self): 267 sugg_string = "Suggestion: " + self.name 268 if self.description: 269 sugg_string += (' description : ' + self.description) 270 else: 271 sugg_string += ( 272 ' option : ' + self.option + ' action : ' + self.action.name 273 ) 274 if self.suggested_values: 275 sugg_string += ( 276 ' suggested_values : ' + str(self.suggested_values) 277 ) 278 return sugg_string 279 280 281class Condition(Section): 282 def __init__(self, name): 283 super().__init__(name) 284 self.data_source = None 285 self.trigger = None 286 287 def perform_checks(self): 288 if not self.data_source: 289 raise ValueError(self.name + ': condition not tied to data source') 290 291 def set_data_source(self, data_source): 292 self.data_source = data_source 293 294 def get_data_source(self): 295 return self.data_source 296 297 def reset_trigger(self): 298 self.trigger = None 299 300 def set_trigger(self, condition_trigger): 301 self.trigger = condition_trigger 302 303 def get_trigger(self): 304 return self.trigger 305 306 def is_triggered(self): 307 if self.trigger: 308 return True 309 return False 310 311 def set_parameter(self, key, value): 312 # must be defined by the subclass 313 raise NotImplementedError(self.name + ': provide source for condition') 314 315 316class LogCondition(Condition): 317 @classmethod 318 def create(cls, base_condition): 319 base_condition.set_data_source(DataSource.Type['LOG']) 320 base_condition.__class__ = cls 321 return base_condition 322 323 def set_parameter(self, key, value): 324 if key == 'regex': 325 self.regex = value 326 327 def perform_checks(self): 328 super().perform_checks() 329 if not self.regex: 330 raise ValueError(self.name + ': provide regex for log condition') 331 332 def __repr__(self): 333 log_cond_str = "LogCondition: " + self.name 334 log_cond_str += (" regex: " + self.regex) 335 # if self.trigger: 336 # log_cond_str += (" trigger: " + str(self.trigger)) 337 return log_cond_str 338 339 340class OptionCondition(Condition): 341 @classmethod 342 def create(cls, base_condition): 343 base_condition.set_data_source(DataSource.Type['DB_OPTIONS']) 344 base_condition.__class__ = cls 345 return base_condition 346 347 def set_parameter(self, key, value): 348 if key == 'options': 349 if isinstance(value, str): 350 self.options = [value] 351 else: 352 self.options = value 353 elif key == 'evaluate': 354 self.eval_expr = value 355 356 def perform_checks(self): 357 super().perform_checks() 358 if not self.options: 359 raise ValueError(self.name + ': options missing in condition') 360 if not self.eval_expr: 361 raise ValueError(self.name + ': expression missing in condition') 362 363 def __repr__(self): 364 opt_cond_str = "OptionCondition: " + self.name 365 opt_cond_str += (" options: " + str(self.options)) 366 opt_cond_str += (" expression: " + self.eval_expr) 367 if self.trigger: 368 opt_cond_str += (" trigger: " + str(self.trigger)) 369 return opt_cond_str 370 371 372class TimeSeriesCondition(Condition): 373 @classmethod 374 def create(cls, base_condition): 375 base_condition.set_data_source(DataSource.Type['TIME_SERIES']) 376 base_condition.__class__ = cls 377 return base_condition 378 379 def set_parameter(self, key, value): 380 if key == 'keys': 381 if isinstance(value, str): 382 self.keys = [value] 383 else: 384 self.keys = value 385 elif key == 'behavior': 386 self.behavior = TimeSeriesData.Behavior[value] 387 elif key == 'rate_threshold': 388 self.rate_threshold = float(value) 389 elif key == 'window_sec': 390 self.window_sec = int(value) 391 elif key == 'evaluate': 392 self.expression = value 393 elif key == 'aggregation_op': 394 self.aggregation_op = TimeSeriesData.AggregationOperator[value] 395 396 def perform_checks(self): 397 if not self.keys: 398 raise ValueError(self.name + ': specify timeseries key') 399 if not self.behavior: 400 raise ValueError(self.name + ': specify triggering behavior') 401 if self.behavior is TimeSeriesData.Behavior.bursty: 402 if not self.rate_threshold: 403 raise ValueError(self.name + ': specify rate burst threshold') 404 if not self.window_sec: 405 self.window_sec = 300 # default window length is 5 minutes 406 if len(self.keys) > 1: 407 raise ValueError(self.name + ': specify only one key') 408 elif self.behavior is TimeSeriesData.Behavior.evaluate_expression: 409 if not (self.expression): 410 raise ValueError(self.name + ': specify evaluation expression') 411 else: 412 raise ValueError(self.name + ': trigger behavior not supported') 413 414 def __repr__(self): 415 ts_cond_str = "TimeSeriesCondition: " + self.name 416 ts_cond_str += (" statistics: " + str(self.keys)) 417 ts_cond_str += (" behavior: " + self.behavior.name) 418 if self.behavior is TimeSeriesData.Behavior.bursty: 419 ts_cond_str += (" rate_threshold: " + str(self.rate_threshold)) 420 ts_cond_str += (" window_sec: " + str(self.window_sec)) 421 if self.behavior is TimeSeriesData.Behavior.evaluate_expression: 422 ts_cond_str += (" expression: " + self.expression) 423 if hasattr(self, 'aggregation_op'): 424 ts_cond_str += (" aggregation_op: " + self.aggregation_op.name) 425 if self.trigger: 426 ts_cond_str += (" trigger: " + str(self.trigger)) 427 return ts_cond_str 428 429 430class RulesSpec: 431 def __init__(self, rules_path): 432 self.file_path = rules_path 433 434 def initialise_fields(self): 435 self.rules_dict = {} 436 self.conditions_dict = {} 437 self.suggestions_dict = {} 438 439 def perform_section_checks(self): 440 for rule in self.rules_dict.values(): 441 rule.perform_checks() 442 for cond in self.conditions_dict.values(): 443 cond.perform_checks() 444 for sugg in self.suggestions_dict.values(): 445 sugg.perform_checks() 446 447 def load_rules_from_spec(self): 448 self.initialise_fields() 449 with open(self.file_path, 'r') as db_rules: 450 curr_section = None 451 for line in db_rules: 452 line = IniParser.remove_trailing_comment(line) 453 if not line: 454 continue 455 element = IniParser.get_element(line) 456 if element is IniParser.Element.comment: 457 continue 458 elif element is not IniParser.Element.key_val: 459 curr_section = element # it's a new IniParser header 460 section_name = IniParser.get_section_name(line) 461 if element is IniParser.Element.rule: 462 new_rule = Rule(section_name) 463 self.rules_dict[section_name] = new_rule 464 elif element is IniParser.Element.cond: 465 new_cond = Condition(section_name) 466 self.conditions_dict[section_name] = new_cond 467 elif element is IniParser.Element.sugg: 468 new_suggestion = Suggestion(section_name) 469 self.suggestions_dict[section_name] = new_suggestion 470 elif element is IniParser.Element.key_val: 471 key, value = IniParser.get_key_value_pair(line) 472 if curr_section is IniParser.Element.rule: 473 new_rule.set_parameter(key, value) 474 elif curr_section is IniParser.Element.cond: 475 if key == 'source': 476 if value == 'LOG': 477 new_cond = LogCondition.create(new_cond) 478 elif value == 'OPTIONS': 479 new_cond = OptionCondition.create(new_cond) 480 elif value == 'TIME_SERIES': 481 new_cond = TimeSeriesCondition.create(new_cond) 482 else: 483 new_cond.set_parameter(key, value) 484 elif curr_section is IniParser.Element.sugg: 485 new_suggestion.set_parameter(key, value) 486 487 def get_rules_dict(self): 488 return self.rules_dict 489 490 def get_conditions_dict(self): 491 return self.conditions_dict 492 493 def get_suggestions_dict(self): 494 return self.suggestions_dict 495 496 def get_triggered_rules(self, data_sources, column_families): 497 self.trigger_conditions(data_sources) 498 triggered_rules = [] 499 for rule in self.rules_dict.values(): 500 if rule.is_triggered(self.conditions_dict, column_families): 501 triggered_rules.append(rule) 502 return triggered_rules 503 504 def trigger_conditions(self, data_sources): 505 for source_type in data_sources: 506 cond_subset = [ 507 cond 508 for cond in self.conditions_dict.values() 509 if cond.get_data_source() is source_type 510 ] 511 if not cond_subset: 512 continue 513 for source in data_sources[source_type]: 514 source.check_and_trigger_conditions(cond_subset) 515 516 def print_rules(self, rules): 517 for rule in rules: 518 print('\nRule: ' + rule.name) 519 for cond_name in rule.conditions: 520 print(repr(self.conditions_dict[cond_name])) 521 for sugg_name in rule.suggestions: 522 print(repr(self.suggestions_dict[sugg_name])) 523 if rule.trigger_entities: 524 print('scope: entities:') 525 print(rule.trigger_entities) 526 if rule.trigger_column_families: 527 print('scope: col_fam:') 528 print(rule.trigger_column_families) 529