1# -------------------------------------------------------------------------------------------- 2# Copyright (c) Microsoft Corporation. All rights reserved. 3# Licensed under the MIT License. See License.txt in the project root for license information. 4# -------------------------------------------------------------------------------------------- 5 6from .MetricAlertConditionListener import MetricAlertConditionListener 7 8 9op_conversion = { 10 '=': 'Equals', 11 '!=': 'NotEquals', 12 '>': 'GreaterThan', 13 '>=': 'GreaterThanOrEqual', 14 '<': 'LessThan', 15 '<=': 'LessThanOrEqual', 16 '><': 'GreaterOrLessThan' 17} 18 19agg_conversion = { 20 'avg': 'Average', 21 'min': 'Minimum', 22 'max': 'Maximum', 23 'total': 'Total', 24 'count': 'Count' 25} 26 27sens_conversion = { 28 'low': 'Low', 29 'medium': 'Medium', 30 'high': 'High', 31} 32 33dim_op_conversion = { 34 'includes': 'Include', 35 'excludes': 'Exclude' 36} 37 38 39# This class defines a complete listener for a parse tree produced by MetricAlertConditionParser. 40class MetricAlertConditionValidator(MetricAlertConditionListener): 41 42 def __init__(self): 43 super(MetricAlertConditionValidator, self).__init__() 44 self.parameters = {} 45 self._dimension_index = 0 46 47 # Exit a parse tree produced by MetricAlertConditionParser#aggregation. 48 def exitAggregation(self, ctx): 49 aggregation = agg_conversion[ctx.getText().strip()] 50 self.parameters['time_aggregation'] = aggregation 51 52 # Exit a parse tree produced by MetricAlertConditionParser#namespace. 53 def exitNamespace(self, ctx): 54 self.parameters['metric_namespace'] = ctx.getText().strip() 55 56 # Exit a parse tree produced by MetricAlertConditionParser#metric. 57 def exitMetric(self, ctx): 58 self.parameters['metric_name'] = ctx.getText().strip() 59 60 # Exit a parse tree produced by MetricAlertConditionParser#operator. 61 def exitOperator(self, ctx): 62 self.parameters['operator'] = op_conversion[ctx.getText().strip()] 63 64 # Exit a parse tree produced by MetricAlertConditionParser#threshold. 65 def exitThreshold(self, ctx): 66 self.parameters['threshold'] = ctx.getText().strip() 67 68 def exitDynamic(self, ctx): 69 self.parameters['failing_periods'] = {} 70 71 def exitDyn_sensitivity(self, ctx): 72 sensitivity = sens_conversion[ctx.getText().strip().lower()] 73 self.parameters['alert_sensitivity'] = sensitivity 74 75 def exitDyn_violations(self, ctx): 76 min_failing_periods_to_alert = float(ctx.getText().strip()) 77 if min_failing_periods_to_alert < 1 or min_failing_periods_to_alert > 6: 78 message = "Violations {} should be 1-6." 79 raise ValueError(message.format(min_failing_periods_to_alert)) 80 self.parameters['failing_periods']['min_failing_periods_to_alert'] = min_failing_periods_to_alert 81 82 def exitDyn_windows(self, ctx): 83 number_of_evaluation_periods = float(ctx.getText().strip()) 84 min_failing_periods_to_alert = self.parameters['failing_periods']['min_failing_periods_to_alert'] 85 if number_of_evaluation_periods < 1 or number_of_evaluation_periods > 6: 86 message = "Windows {} should be 1-6." 87 raise ValueError(message.format(number_of_evaluation_periods)) 88 if min_failing_periods_to_alert > number_of_evaluation_periods: 89 message = "Violations {} should be smaller or equal to windows {}." 90 raise ValueError(message.format(min_failing_periods_to_alert, number_of_evaluation_periods)) 91 self.parameters['failing_periods']['number_of_evaluation_periods'] = number_of_evaluation_periods 92 93 def exitDyn_datetime(self, ctx): 94 from msrest.serialization import Deserializer 95 from msrest.exceptions import DeserializationError 96 datetime_str = ctx.getText().strip() 97 try: 98 self.parameters['ignore_data_before'] = Deserializer.deserialize_iso(datetime_str) 99 except DeserializationError: 100 message = "Datetime {} is not a valid ISO-8601 format" 101 raise ValueError(message.format(datetime_str)) 102 103 # Enter a parse tree produced by MetricAlertConditionParser#dimensions. 104 def enterDimensions(self, ctx): 105 self.parameters['dimensions'] = [] 106 107 # Enter a parse tree produced by MetricAlertConditionParser#dimension. 108 def enterDimension(self, ctx): 109 self.parameters['dimensions'].append({}) 110 111 # Exit a parse tree produced by MetricAlertConditionParser#dimension. 112 def exitDimension(self, ctx): 113 self._dimension_index = self._dimension_index + 1 114 115 # Exit a parse tree produced by MetricAlertConditionParser#dname. 116 def exitDim_name(self, ctx): 117 self.parameters['dimensions'][self._dimension_index]['name'] = ctx.getText().strip() 118 119 # Exit a parse tree produced by MetricAlertConditionParser#dop. 120 def exitDim_operator(self, ctx): 121 op_text = ctx.getText().strip() 122 self.parameters['dimensions'][self._dimension_index]['operator'] = dim_op_conversion[op_text.lower()] 123 124 # Exit a parse tree produced by MetricAlertConditionParser#dvalues. 125 def exitDim_values(self, ctx): 126 dvalues = ctx.getText().strip().split(' ') 127 self.parameters['dimensions'][self._dimension_index]['values'] = [x for x in dvalues if x not in ['', 'or']] 128 129 def result(self): 130 from azure.mgmt.monitor.models import MetricCriteria, MetricDimension, DynamicMetricCriteria, \ 131 DynamicThresholdFailingPeriods 132 dim_params = self.parameters.get('dimensions', []) 133 dimensions = [] 134 for dim in dim_params: 135 dimensions.append(MetricDimension(**dim)) 136 self.parameters['dimensions'] = dimensions 137 self.parameters['name'] = '' # will be auto-populated later 138 139 if 'failing_periods' in self.parameters: 140 # dynamic metric criteria 141 failing_periods = DynamicThresholdFailingPeriods(**self.parameters['failing_periods']) 142 self.parameters['failing_periods'] = failing_periods 143 return DynamicMetricCriteria(**self.parameters) 144 145 # static metric criteria 146 return MetricCriteria(**self.parameters) 147