1'''
2'''
3#  Licensed to the Apache Software Foundation (ASF) under one
4#  or more contributor license agreements.  See the NOTICE file
5#  distributed with this work for additional information
6#  regarding copyright ownership.  The ASF licenses this file
7#  to you under the Apache License, Version 2.0 (the
8#  "License"); you may not use this file except in compliance
9#  with the License.  You may obtain a copy of the License at
10#
11#      http://www.apache.org/licenses/LICENSE-2.0
12#
13#  Unless required by applicable law or agreed to in writing, software
14#  distributed under the License is distributed on an "AS IS" BASIS,
15#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16#  See the License for the specific language governing permissions and
17#  limitations under the License.
18
19from autest.testers import Tester, tester
20import autest.common.is_a as is_a
21import hosts.output as host
22
23import re
24
25
26class CurlHeader(Tester):
27    def __init__(self,
28                 value,
29                 test_value=None,
30                 kill_on_failure=False,
31                 description_group=None,
32                 description=None):
33
34        self._stack = host.getCurrentStack(1)
35
36        if not is_a.Dictionary(value):
37            host.WriteError(
38                "CurlHeader Input: Need to provide a dictionary.",
39                stack=host.getCurrentStack(1)
40            )
41
42        ops = ("equal", "equal_re")
43        gold_dict = value
44
45        headers = tuple(gold_dict.keys())
46        if description is None:
47            description = 'Checking that all {} headers exist and have matching values: {}'.format(
48                len(headers), ', '.join(headers) if len(headers) <= 10 else ', '.join(headers[0:10]) + ', etc.')
49
50        # sanity check for input dictionary format
51        for header, target in gold_dict.items():
52            if not isinstance(header, str):
53                host.WriteError(
54                    'CurlHeader Input: Unsupported type for header {}. Header needs to be a string.'.format(header),
55                    stack=self._stack
56                )
57
58            if target is None or isinstance(target, str):
59                continue
60            elif isinstance(target, dict):
61                for op, pos_val in target.items():
62                    if op not in ops:
63                        host.WriteError(
64                            'CurlHeader Input: Unsupported operation \'{}\' for value at header \'{}\'. The available operations are: {}.'.format(
65                                op, header, ', '.join(ops)), stack=self._stack)
66                    elif pos_val is None or isinstance(pos_val, str):
67                        continue
68                    elif isinstance(pos_val, list):
69                        for str_ in pos_val:
70                            if not isinstance(str_, str) and str_ is not None:
71                                host.WriteError(
72                                    'CurlHeader Input: Value {} has unsupported type \'{}\' for header \'{}\'. Need to provide a string or None.'.format(
73                                        str_, str_.__class__.__name__, header), stack=self._stack)
74                    else:
75                        host.WriteError(
76                            'CurlHeader Input: Value {} has unsupported type \'{}\' for header \'{}\'. Need to provide a string, a list or None for possible curl values.'.format(
77                                pos_val, pos_val.__class__.__name__, header), stack=self._stack)
78            else:
79                host.WriteError(
80                    'CurlHeader Input: Value {} has unsupported type \'{}\' for header \'{}\'. Need to provide either a string, a dictionary or None.'.format(
81                        target, target.__class__.__name__, header), stack=self._stack)
82
83        super(CurlHeader, self).__init__(
84            value=value,
85            test_value=test_value,
86            kill_on_failure=kill_on_failure,
87            description_group=description_group,
88            description=description)
89
90    def test(self, eventinfo, **kw):
91        # Get curl output file to test against
92        fname = self._GetContent(eventinfo)
93        # do test
94
95        if fname is None:
96            pass
97        try:
98            with open(fname) as val_file:
99                val_content = [v.replace('\r\n', '\n').rstrip('\n') for v in val_file.readlines()]
100        except (OSError, IOError) as e:
101            self.Result = tester.ResultType.Failed
102            self.Reason = str(e)
103            return
104
105        val_dict = {}
106        gold_dict = self.Value
107
108        for line in val_content:
109            # Current version of curl uses ': ' to separate header and value
110            vals = line.split(': ')
111            # headers need to be lowercased to avoid inconsistence in letter case
112            if len(vals) == 1:
113                val_dict[vals[0].lower()] = ''
114            elif len(vals) == 2:
115                val_dict[vals[0].lower()] = vals[1]
116            else:
117                val_dict[vals[0].lower()] = ': '.join(vals[1:])
118
119        # generate a gold dictionary with lowercase header
120        gold_dict = {k.lower(): v for k, v in gold_dict.items()}
121
122        p_flag = 1
123        reason = ''
124
125        for header in gold_dict.keys():
126            v = val_dict.get(header)
127            if v is not None:
128                res = self.match(gold_dict, header, v)
129                if res is None:
130                    continue
131                else:
132                    reason += 'In field: {} \n'.format(header) + res
133                    p_flag = 0
134                    reason += '\n--------------------------------------------------------------------------------\n'
135            else:
136                reason += 'Nonexistent field: {}'.format(header)
137                p_flag = 0
138                reason += '\n--------------------------------------------------------------------------------\n'
139
140        if p_flag == 1:
141            self.Result = tester.ResultType.Passed
142            self.Reason = "Curl headers and values match"
143        else:
144            self.Result = tester.ResultType.Failed
145            self.Reason = reason
146            if self.KillOnFailure:
147                raise KillOnFailureError
148        host.WriteVerbose(
149            ["testers.CurlHeader", "testers"],
150            "{0} - ".format(tester.ResultType.to_color_string(self.Result)),
151            self.Reason)
152
153    # Optional operations to do:
154    #       equal: complete string match
155    #       equal_re: regular expression match for the entire string
156    # Given in gold_dict:
157    #       a string: equal
158    #       a dict: {equal: [e], equal_re: [r]}. Test will pass if either
159    #               the equal or equal_re is satisfied, who will be satisfied
160    #               if any of the patterns in the list matches.
161
162    def match(self, gold_dictionary, header, val_value):
163        target = gold_dictionary[header]
164
165        if target is None:
166            return None
167        elif isinstance(target, str):
168            # if given a string, check for an exact match
169            return 'Not an exact match.\nExpected : {}\nActual   : {}'.format(target, val_value) \
170                if target != val_value else None
171
172        elif isinstance(target, dict):
173            # if given a dict, check for valid operations indicated by the keys
174            for op, pos_val in target.items():
175                if pos_val is None:
176                    return None
177                elif isinstance(pos_val, list):
178                    # print('Need to provide a list of possible values.')
179                    # continue
180                    if op == 'equal':
181                        for str_ in pos_val:
182                            if val_value == str_ or str_ is None:
183                                return None
184                        # return 'No matching strings in the list.'
185                    elif op == 'equal_re':
186                        for regex in pos_val:
187                            if regex is None:
188                                return None
189                            elif re.fullmatch(regex, val_value) is not None:
190                                return None
191
192                elif isinstance(pos_val, str):
193                    if op == 'equal':
194                        if val_value == pos_val:
195                            return None
196                            # else 'Not an exact match.\nExpected : {}\nActual   : {}'.format(pos_val, val_value)
197
198                    elif op == 'equal_re':
199                        if re.fullmatch(pos_val, val_value) is not None:
200                            return None
201
202            ret = ''
203            ops = {'equal': 'Any of the following strings: ',
204                   'equal_re': 'Any of the following regular expression: '}
205
206            for op, pos_val in target.items():
207                ret += '    {}: \'{}\'\n'.format(ops[op], '\', \''.join(pos_val)) if isinstance(pos_val, list) \
208                    else '    {}: \'{}\'\n'.format(ops[op], pos_val)
209
210            return 'Value \'{}\' matches none of: \n{}'.format(val_value, ret.strip('\n'))
211
212
213AddTester(CurlHeader)
214