1# Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2#  This source code is licensed under both the GPLv2 (found in the
3#  COPYING file in the root directory) and Apache 2.0 License
4#  (found in the LICENSE.Apache file in the root directory).
5
6from abc import ABC, abstractmethod
7from advisor.db_log_parser import DataSource, NO_COL_FAMILY
8from advisor.db_timeseries_parser import TimeSeriesData
9from enum import Enum
10from advisor.ini_parser import IniParser
11import re
12
13
14class Section(ABC):
15    def __init__(self, name):
16        self.name = name
17
18    @abstractmethod
19    def set_parameter(self, key, value):
20        pass
21
22    @abstractmethod
23    def perform_checks(self):
24        pass
25
26
27class Rule(Section):
28    def __init__(self, name):
29        super().__init__(name)
30        self.conditions = None
31        self.suggestions = None
32        self.overlap_time_seconds = None
33        self.trigger_entities = None
34        self.trigger_column_families = None
35
36    def set_parameter(self, key, value):
37        # If the Rule is associated with a single suggestion/condition, then
38        # value will be a string and not a list. Hence, convert it to a single
39        # element list before storing it in self.suggestions or
40        # self.conditions.
41        if key == 'conditions':
42            if isinstance(value, str):
43                self.conditions = [value]
44            else:
45                self.conditions = value
46        elif key == 'suggestions':
47            if isinstance(value, str):
48                self.suggestions = [value]
49            else:
50                self.suggestions = value
51        elif key == 'overlap_time_period':
52            self.overlap_time_seconds = value
53
54    def get_suggestions(self):
55        return self.suggestions
56
57    def perform_checks(self):
58        if not self.conditions or len(self.conditions) < 1:
59            raise ValueError(
60                self.name + ': rule must have at least one condition'
61            )
62        if not self.suggestions or len(self.suggestions) < 1:
63            raise ValueError(
64                self.name + ': rule must have at least one suggestion'
65            )
66        if self.overlap_time_seconds:
67            if len(self.conditions) != 2:
68                raise ValueError(
69                    self.name + ": rule must be associated with 2 conditions\
70                    in order to check for a time dependency between them"
71                )
72            time_format = '^\d+[s|m|h|d]$'
73            if (
74                not
75                re.match(time_format, self.overlap_time_seconds, re.IGNORECASE)
76            ):
77                raise ValueError(
78                    self.name + ": overlap_time_seconds format: \d+[s|m|h|d]"
79                )
80            else:  # convert to seconds
81                in_seconds = int(self.overlap_time_seconds[:-1])
82                if self.overlap_time_seconds[-1] == 'm':
83                    in_seconds *= 60
84                elif self.overlap_time_seconds[-1] == 'h':
85                    in_seconds *= (60 * 60)
86                elif self.overlap_time_seconds[-1] == 'd':
87                    in_seconds *= (24 * 60 * 60)
88                self.overlap_time_seconds = in_seconds
89
90    def get_overlap_timestamps(self, key1_trigger_epochs, key2_trigger_epochs):
91        # this method takes in 2 timeseries i.e. timestamps at which the
92        # rule's 2 TIME_SERIES conditions were triggered and it finds
93        # (if present) the first pair of timestamps at which the 2 conditions
94        # were triggered within 'overlap_time_seconds' of each other
95        key1_lower_bounds = [
96            epoch - self.overlap_time_seconds
97            for epoch in key1_trigger_epochs
98        ]
99        key1_lower_bounds.sort()
100        key2_trigger_epochs.sort()
101        trigger_ix = 0
102        overlap_pair = None
103        for key1_lb in key1_lower_bounds:
104            while (
105                key2_trigger_epochs[trigger_ix] < key1_lb and
106                trigger_ix < len(key2_trigger_epochs)
107            ):
108                trigger_ix += 1
109            if trigger_ix >= len(key2_trigger_epochs):
110                break
111            if (
112                key2_trigger_epochs[trigger_ix] <=
113                key1_lb + (2 * self.overlap_time_seconds)
114            ):
115                overlap_pair = (
116                    key2_trigger_epochs[trigger_ix],
117                    key1_lb + self.overlap_time_seconds
118                )
119                break
120        return overlap_pair
121
122    def get_trigger_entities(self):
123        return self.trigger_entities
124
125    def get_trigger_column_families(self):
126        return self.trigger_column_families
127
128    def is_triggered(self, conditions_dict, column_families):
129        if self.overlap_time_seconds:
130            condition1 = conditions_dict[self.conditions[0]]
131            condition2 = conditions_dict[self.conditions[1]]
132            if not (
133                condition1.get_data_source() is DataSource.Type.TIME_SERIES and
134                condition2.get_data_source() is DataSource.Type.TIME_SERIES
135            ):
136                raise ValueError(self.name + ': need 2 timeseries conditions')
137
138            map1 = condition1.get_trigger()
139            map2 = condition2.get_trigger()
140            if not (map1 and map2):
141                return False
142
143            self.trigger_entities = {}
144            is_triggered = False
145            entity_intersection = (
146                set(map1.keys()).intersection(set(map2.keys()))
147            )
148            for entity in entity_intersection:
149                overlap_timestamps_pair = (
150                    self.get_overlap_timestamps(
151                        list(map1[entity].keys()), list(map2[entity].keys())
152                    )
153                )
154                if overlap_timestamps_pair:
155                    self.trigger_entities[entity] = overlap_timestamps_pair
156                    is_triggered = True
157            if is_triggered:
158                self.trigger_column_families = set(column_families)
159            return is_triggered
160        else:
161            all_conditions_triggered = True
162            self.trigger_column_families = set(column_families)
163            for cond_name in self.conditions:
164                cond = conditions_dict[cond_name]
165                if not cond.get_trigger():
166                    all_conditions_triggered = False
167                    break
168                if (
169                    cond.get_data_source() is DataSource.Type.LOG or
170                    cond.get_data_source() is DataSource.Type.DB_OPTIONS
171                ):
172                    cond_col_fam = set(cond.get_trigger().keys())
173                    if NO_COL_FAMILY in cond_col_fam:
174                        cond_col_fam = set(column_families)
175                    self.trigger_column_families = (
176                        self.trigger_column_families.intersection(cond_col_fam)
177                    )
178                elif cond.get_data_source() is DataSource.Type.TIME_SERIES:
179                    cond_entities = set(cond.get_trigger().keys())
180                    if self.trigger_entities is None:
181                        self.trigger_entities = cond_entities
182                    else:
183                        self.trigger_entities = (
184                            self.trigger_entities.intersection(cond_entities)
185                        )
186                if not (self.trigger_entities or self.trigger_column_families):
187                    all_conditions_triggered = False
188                    break
189            if not all_conditions_triggered:  # clean up if rule not triggered
190                self.trigger_column_families = None
191                self.trigger_entities = None
192            return all_conditions_triggered
193
194    def __repr__(self):
195        # Append conditions
196        rule_string = "Rule: " + self.name + " has conditions:: "
197        is_first = True
198        for cond in self.conditions:
199            if is_first:
200                rule_string += cond
201                is_first = False
202            else:
203                rule_string += (" AND " + cond)
204        # Append suggestions
205        rule_string += "\nsuggestions:: "
206        is_first = True
207        for sugg in self.suggestions:
208            if is_first:
209                rule_string += sugg
210                is_first = False
211            else:
212                rule_string += (", " + sugg)
213        if self.trigger_entities:
214            rule_string += (', entities:: ' + str(self.trigger_entities))
215        if self.trigger_column_families:
216            rule_string += (', col_fam:: ' + str(self.trigger_column_families))
217        # Return constructed string
218        return rule_string
219
220
221class Suggestion(Section):
222    class Action(Enum):
223        set = 1
224        increase = 2
225        decrease = 3
226
227    def __init__(self, name):
228        super().__init__(name)
229        self.option = None
230        self.action = None
231        self.suggested_values = None
232        self.description = None
233
234    def set_parameter(self, key, value):
235        if key == 'option':
236            # Note:
237            # case 1: 'option' is supported by Rocksdb OPTIONS file; in this
238            # case the option belongs to one of the sections in the config
239            # file and it's name is prefixed by "<section_type>."
240            # case 2: 'option' is not supported by Rocksdb OPTIONS file; the
241            # option is not expected to have the character '.' in its name
242            self.option = value
243        elif key == 'action':
244            if self.option and not value:
245                raise ValueError(self.name + ': provide action for option')
246            self.action = self.Action[value]
247        elif key == 'suggested_values':
248            if isinstance(value, str):
249                self.suggested_values = [value]
250            else:
251                self.suggested_values = value
252        elif key == 'description':
253            self.description = value
254
255    def perform_checks(self):
256        if not self.description:
257            if not self.option:
258                raise ValueError(self.name + ': provide option or description')
259            if not self.action:
260                raise ValueError(self.name + ': provide action for option')
261            if self.action is self.Action.set and not self.suggested_values:
262                raise ValueError(
263                    self.name + ': provide suggested value for option'
264                )
265
266    def __repr__(self):
267        sugg_string = "Suggestion: " + self.name
268        if self.description:
269            sugg_string += (' description : ' + self.description)
270        else:
271            sugg_string += (
272                ' option : ' + self.option + ' action : ' + self.action.name
273            )
274            if self.suggested_values:
275                sugg_string += (
276                    ' suggested_values : ' + str(self.suggested_values)
277                )
278        return sugg_string
279
280
281class Condition(Section):
282    def __init__(self, name):
283        super().__init__(name)
284        self.data_source = None
285        self.trigger = None
286
287    def perform_checks(self):
288        if not self.data_source:
289            raise ValueError(self.name + ': condition not tied to data source')
290
291    def set_data_source(self, data_source):
292        self.data_source = data_source
293
294    def get_data_source(self):
295        return self.data_source
296
297    def reset_trigger(self):
298        self.trigger = None
299
300    def set_trigger(self, condition_trigger):
301        self.trigger = condition_trigger
302
303    def get_trigger(self):
304        return self.trigger
305
306    def is_triggered(self):
307        if self.trigger:
308            return True
309        return False
310
311    def set_parameter(self, key, value):
312        # must be defined by the subclass
313        raise NotImplementedError(self.name + ': provide source for condition')
314
315
316class LogCondition(Condition):
317    @classmethod
318    def create(cls, base_condition):
319        base_condition.set_data_source(DataSource.Type['LOG'])
320        base_condition.__class__ = cls
321        return base_condition
322
323    def set_parameter(self, key, value):
324        if key == 'regex':
325            self.regex = value
326
327    def perform_checks(self):
328        super().perform_checks()
329        if not self.regex:
330            raise ValueError(self.name + ': provide regex for log condition')
331
332    def __repr__(self):
333        log_cond_str = "LogCondition: " + self.name
334        log_cond_str += (" regex: " + self.regex)
335        # if self.trigger:
336        #     log_cond_str += (" trigger: " + str(self.trigger))
337        return log_cond_str
338
339
340class OptionCondition(Condition):
341    @classmethod
342    def create(cls, base_condition):
343        base_condition.set_data_source(DataSource.Type['DB_OPTIONS'])
344        base_condition.__class__ = cls
345        return base_condition
346
347    def set_parameter(self, key, value):
348        if key == 'options':
349            if isinstance(value, str):
350                self.options = [value]
351            else:
352                self.options = value
353        elif key == 'evaluate':
354            self.eval_expr = value
355
356    def perform_checks(self):
357        super().perform_checks()
358        if not self.options:
359            raise ValueError(self.name + ': options missing in condition')
360        if not self.eval_expr:
361            raise ValueError(self.name + ': expression missing in condition')
362
363    def __repr__(self):
364        opt_cond_str = "OptionCondition: " + self.name
365        opt_cond_str += (" options: " + str(self.options))
366        opt_cond_str += (" expression: " + self.eval_expr)
367        if self.trigger:
368            opt_cond_str += (" trigger: " + str(self.trigger))
369        return opt_cond_str
370
371
372class TimeSeriesCondition(Condition):
373    @classmethod
374    def create(cls, base_condition):
375        base_condition.set_data_source(DataSource.Type['TIME_SERIES'])
376        base_condition.__class__ = cls
377        return base_condition
378
379    def set_parameter(self, key, value):
380        if key == 'keys':
381            if isinstance(value, str):
382                self.keys = [value]
383            else:
384                self.keys = value
385        elif key == 'behavior':
386            self.behavior = TimeSeriesData.Behavior[value]
387        elif key == 'rate_threshold':
388            self.rate_threshold = float(value)
389        elif key == 'window_sec':
390            self.window_sec = int(value)
391        elif key == 'evaluate':
392            self.expression = value
393        elif key == 'aggregation_op':
394            self.aggregation_op = TimeSeriesData.AggregationOperator[value]
395
396    def perform_checks(self):
397        if not self.keys:
398            raise ValueError(self.name + ': specify timeseries key')
399        if not self.behavior:
400            raise ValueError(self.name + ': specify triggering behavior')
401        if self.behavior is TimeSeriesData.Behavior.bursty:
402            if not self.rate_threshold:
403                raise ValueError(self.name + ': specify rate burst threshold')
404            if not self.window_sec:
405                self.window_sec = 300  # default window length is 5 minutes
406            if len(self.keys) > 1:
407                raise ValueError(self.name + ': specify only one key')
408        elif self.behavior is TimeSeriesData.Behavior.evaluate_expression:
409            if not (self.expression):
410                raise ValueError(self.name + ': specify evaluation expression')
411        else:
412            raise ValueError(self.name + ': trigger behavior not supported')
413
414    def __repr__(self):
415        ts_cond_str = "TimeSeriesCondition: " + self.name
416        ts_cond_str += (" statistics: " + str(self.keys))
417        ts_cond_str += (" behavior: " + self.behavior.name)
418        if self.behavior is TimeSeriesData.Behavior.bursty:
419            ts_cond_str += (" rate_threshold: " + str(self.rate_threshold))
420            ts_cond_str += (" window_sec: " + str(self.window_sec))
421        if self.behavior is TimeSeriesData.Behavior.evaluate_expression:
422            ts_cond_str += (" expression: " + self.expression)
423            if hasattr(self, 'aggregation_op'):
424                ts_cond_str += (" aggregation_op: " + self.aggregation_op.name)
425        if self.trigger:
426            ts_cond_str += (" trigger: " + str(self.trigger))
427        return ts_cond_str
428
429
430class RulesSpec:
431    def __init__(self, rules_path):
432        self.file_path = rules_path
433
434    def initialise_fields(self):
435        self.rules_dict = {}
436        self.conditions_dict = {}
437        self.suggestions_dict = {}
438
439    def perform_section_checks(self):
440        for rule in self.rules_dict.values():
441            rule.perform_checks()
442        for cond in self.conditions_dict.values():
443            cond.perform_checks()
444        for sugg in self.suggestions_dict.values():
445            sugg.perform_checks()
446
447    def load_rules_from_spec(self):
448        self.initialise_fields()
449        with open(self.file_path, 'r') as db_rules:
450            curr_section = None
451            for line in db_rules:
452                line = IniParser.remove_trailing_comment(line)
453                if not line:
454                    continue
455                element = IniParser.get_element(line)
456                if element is IniParser.Element.comment:
457                    continue
458                elif element is not IniParser.Element.key_val:
459                    curr_section = element  # it's a new IniParser header
460                    section_name = IniParser.get_section_name(line)
461                    if element is IniParser.Element.rule:
462                        new_rule = Rule(section_name)
463                        self.rules_dict[section_name] = new_rule
464                    elif element is IniParser.Element.cond:
465                        new_cond = Condition(section_name)
466                        self.conditions_dict[section_name] = new_cond
467                    elif element is IniParser.Element.sugg:
468                        new_suggestion = Suggestion(section_name)
469                        self.suggestions_dict[section_name] = new_suggestion
470                elif element is IniParser.Element.key_val:
471                    key, value = IniParser.get_key_value_pair(line)
472                    if curr_section is IniParser.Element.rule:
473                        new_rule.set_parameter(key, value)
474                    elif curr_section is IniParser.Element.cond:
475                        if key == 'source':
476                            if value == 'LOG':
477                                new_cond = LogCondition.create(new_cond)
478                            elif value == 'OPTIONS':
479                                new_cond = OptionCondition.create(new_cond)
480                            elif value == 'TIME_SERIES':
481                                new_cond = TimeSeriesCondition.create(new_cond)
482                        else:
483                            new_cond.set_parameter(key, value)
484                    elif curr_section is IniParser.Element.sugg:
485                        new_suggestion.set_parameter(key, value)
486
487    def get_rules_dict(self):
488        return self.rules_dict
489
490    def get_conditions_dict(self):
491        return self.conditions_dict
492
493    def get_suggestions_dict(self):
494        return self.suggestions_dict
495
496    def get_triggered_rules(self, data_sources, column_families):
497        self.trigger_conditions(data_sources)
498        triggered_rules = []
499        for rule in self.rules_dict.values():
500            if rule.is_triggered(self.conditions_dict, column_families):
501                triggered_rules.append(rule)
502        return triggered_rules
503
504    def trigger_conditions(self, data_sources):
505        for source_type in data_sources:
506            cond_subset = [
507                cond
508                for cond in self.conditions_dict.values()
509                if cond.get_data_source() is source_type
510            ]
511            if not cond_subset:
512                continue
513            for source in data_sources[source_type]:
514                source.check_and_trigger_conditions(cond_subset)
515
516    def print_rules(self, rules):
517        for rule in rules:
518            print('\nRule: ' + rule.name)
519            for cond_name in rule.conditions:
520                print(repr(self.conditions_dict[cond_name]))
521            for sugg_name in rule.suggestions:
522                print(repr(self.suggestions_dict[sugg_name]))
523            if rule.trigger_entities:
524                print('scope: entities:')
525                print(rule.trigger_entities)
526            if rule.trigger_column_families:
527                print('scope: col_fam:')
528                print(rule.trigger_column_families)
529