1''' 2''' 3# Licensed to the Apache Software Foundation (ASF) under one 4# or more contributor license agreements. See the NOTICE file 5# distributed with this work for additional information 6# regarding copyright ownership. The ASF licenses this file 7# to you under the Apache License, Version 2.0 (the 8# "License"); you may not use this file except in compliance 9# with the License. You may obtain a copy of the License at 10# 11# http://www.apache.org/licenses/LICENSE-2.0 12# 13# Unless required by applicable law or agreed to in writing, software 14# distributed under the License is distributed on an "AS IS" BASIS, 15# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 16# See the License for the specific language governing permissions and 17# limitations under the License. 18 19from autest.testers import Tester, tester 20import autest.common.is_a as is_a 21import hosts.output as host 22 23import re 24 25 26class CurlHeader(Tester): 27 def __init__(self, 28 value, 29 test_value=None, 30 kill_on_failure=False, 31 description_group=None, 32 description=None): 33 34 self._stack = host.getCurrentStack(1) 35 36 if not is_a.Dictionary(value): 37 host.WriteError( 38 "CurlHeader Input: Need to provide a dictionary.", 39 stack=host.getCurrentStack(1) 40 ) 41 42 ops = ("equal", "equal_re") 43 gold_dict = value 44 45 headers = tuple(gold_dict.keys()) 46 if description is None: 47 description = 'Checking that all {} headers exist and have matching values: {}'.format( 48 len(headers), ', '.join(headers) if len(headers) <= 10 else ', '.join(headers[0:10]) + ', etc.') 49 50 # sanity check for input dictionary format 51 for header, target in gold_dict.items(): 52 if not isinstance(header, str): 53 host.WriteError( 54 'CurlHeader Input: Unsupported type for header {}. Header needs to be a string.'.format(header), 55 stack=self._stack 56 ) 57 58 if target is None or isinstance(target, str): 59 continue 60 elif isinstance(target, dict): 61 for op, pos_val in target.items(): 62 if op not in ops: 63 host.WriteError( 64 'CurlHeader Input: Unsupported operation \'{}\' for value at header \'{}\'. The available operations are: {}.'.format( 65 op, header, ', '.join(ops)), stack=self._stack) 66 elif pos_val is None or isinstance(pos_val, str): 67 continue 68 elif isinstance(pos_val, list): 69 for str_ in pos_val: 70 if not isinstance(str_, str) and str_ is not None: 71 host.WriteError( 72 'CurlHeader Input: Value {} has unsupported type \'{}\' for header \'{}\'. Need to provide a string or None.'.format( 73 str_, str_.__class__.__name__, header), stack=self._stack) 74 else: 75 host.WriteError( 76 'CurlHeader Input: Value {} has unsupported type \'{}\' for header \'{}\'. Need to provide a string, a list or None for possible curl values.'.format( 77 pos_val, pos_val.__class__.__name__, header), stack=self._stack) 78 else: 79 host.WriteError( 80 'CurlHeader Input: Value {} has unsupported type \'{}\' for header \'{}\'. Need to provide either a string, a dictionary or None.'.format( 81 target, target.__class__.__name__, header), stack=self._stack) 82 83 super(CurlHeader, self).__init__( 84 value=value, 85 test_value=test_value, 86 kill_on_failure=kill_on_failure, 87 description_group=description_group, 88 description=description) 89 90 def test(self, eventinfo, **kw): 91 # Get curl output file to test against 92 fname = self._GetContent(eventinfo) 93 # do test 94 95 if fname is None: 96 pass 97 try: 98 with open(fname) as val_file: 99 val_content = [v.replace('\r\n', '\n').rstrip('\n') for v in val_file.readlines()] 100 except (OSError, IOError) as e: 101 self.Result = tester.ResultType.Failed 102 self.Reason = str(e) 103 return 104 105 val_dict = {} 106 gold_dict = self.Value 107 108 for line in val_content: 109 # Current version of curl uses ': ' to separate header and value 110 vals = line.split(': ') 111 # headers need to be lowercased to avoid inconsistence in letter case 112 if len(vals) == 1: 113 val_dict[vals[0].lower()] = '' 114 elif len(vals) == 2: 115 val_dict[vals[0].lower()] = vals[1] 116 else: 117 val_dict[vals[0].lower()] = ': '.join(vals[1:]) 118 119 # generate a gold dictionary with lowercase header 120 gold_dict = {k.lower(): v for k, v in gold_dict.items()} 121 122 p_flag = 1 123 reason = '' 124 125 for header in gold_dict.keys(): 126 v = val_dict.get(header) 127 if v is not None: 128 res = self.match(gold_dict, header, v) 129 if res is None: 130 continue 131 else: 132 reason += 'In field: {} \n'.format(header) + res 133 p_flag = 0 134 reason += '\n--------------------------------------------------------------------------------\n' 135 else: 136 reason += 'Nonexistent field: {}'.format(header) 137 p_flag = 0 138 reason += '\n--------------------------------------------------------------------------------\n' 139 140 if p_flag == 1: 141 self.Result = tester.ResultType.Passed 142 self.Reason = "Curl headers and values match" 143 else: 144 self.Result = tester.ResultType.Failed 145 self.Reason = reason 146 if self.KillOnFailure: 147 raise KillOnFailureError 148 host.WriteVerbose( 149 ["testers.CurlHeader", "testers"], 150 "{0} - ".format(tester.ResultType.to_color_string(self.Result)), 151 self.Reason) 152 153 # Optional operations to do: 154 # equal: complete string match 155 # equal_re: regular expression match for the entire string 156 # Given in gold_dict: 157 # a string: equal 158 # a dict: {equal: [e], equal_re: [r]}. Test will pass if either 159 # the equal or equal_re is satisfied, who will be satisfied 160 # if any of the patterns in the list matches. 161 162 def match(self, gold_dictionary, header, val_value): 163 target = gold_dictionary[header] 164 165 if target is None: 166 return None 167 elif isinstance(target, str): 168 # if given a string, check for an exact match 169 return 'Not an exact match.\nExpected : {}\nActual : {}'.format(target, val_value) \ 170 if target != val_value else None 171 172 elif isinstance(target, dict): 173 # if given a dict, check for valid operations indicated by the keys 174 for op, pos_val in target.items(): 175 if pos_val is None: 176 return None 177 elif isinstance(pos_val, list): 178 # print('Need to provide a list of possible values.') 179 # continue 180 if op == 'equal': 181 for str_ in pos_val: 182 if val_value == str_ or str_ is None: 183 return None 184 # return 'No matching strings in the list.' 185 elif op == 'equal_re': 186 for regex in pos_val: 187 if regex is None: 188 return None 189 elif re.fullmatch(regex, val_value) is not None: 190 return None 191 192 elif isinstance(pos_val, str): 193 if op == 'equal': 194 if val_value == pos_val: 195 return None 196 # else 'Not an exact match.\nExpected : {}\nActual : {}'.format(pos_val, val_value) 197 198 elif op == 'equal_re': 199 if re.fullmatch(pos_val, val_value) is not None: 200 return None 201 202 ret = '' 203 ops = {'equal': 'Any of the following strings: ', 204 'equal_re': 'Any of the following regular expression: '} 205 206 for op, pos_val in target.items(): 207 ret += ' {}: \'{}\'\n'.format(ops[op], '\', \''.join(pos_val)) if isinstance(pos_val, list) \ 208 else ' {}: \'{}\'\n'.format(ops[op], pos_val) 209 210 return 'Value \'{}\' matches none of: \n{}'.format(val_value, ret.strip('\n')) 211 212 213AddTester(CurlHeader) 214