1# Licensed under a 3-clause BSD style license - see LICENSE.rst
2"""
3Contains a class to handle a validation result for a single VOTable
4file.
5"""
6
7
8# STDLIB
9from xml.parsers.expat import ExpatError
10import hashlib
11import os
12import shutil
13import socket
14import subprocess
15import warnings
16import pickle
17import urllib.request
18import urllib.error
19import http.client
20
21# VO
22from astropy.io.votable import table
23from astropy.io.votable import exceptions
24from astropy.io.votable import xmlutil
25
26
27class Result:
28    def __init__(self, url, root='results', timeout=10):
29        self.url = url
30        m = hashlib.md5()
31        m.update(url)
32        self._hash = m.hexdigest()
33        self._root = root
34        self._path = os.path.join(
35            self._hash[0:2], self._hash[2:4], self._hash[4:])
36        if not os.path.exists(self.get_dirpath()):
37            os.makedirs(self.get_dirpath())
38        self.timeout = timeout
39        self.load_attributes()
40
41    def __enter__(self):
42        return self
43
44    def __exit__(self, *args):
45        self.save_attributes()
46
47    def get_dirpath(self):
48        return os.path.join(self._root, self._path)
49
50    def get_htmlpath(self):
51        return self._path
52
53    def get_attribute_path(self):
54        return os.path.join(self.get_dirpath(), "values.dat")
55
56    def get_vo_xml_path(self):
57        return os.path.join(self.get_dirpath(), "vo.xml")
58
59    # ATTRIBUTES
60
61    def load_attributes(self):
62        path = self.get_attribute_path()
63        if os.path.exists(path):
64            try:
65                with open(path, 'rb') as fd:
66                    self._attributes = pickle.load(fd)
67            except Exception:
68                shutil.rmtree(self.get_dirpath())
69                os.makedirs(self.get_dirpath())
70                self._attributes = {}
71        else:
72            self._attributes = {}
73
74    def save_attributes(self):
75        path = self.get_attribute_path()
76        with open(path, 'wb') as fd:
77            pickle.dump(self._attributes, fd)
78
79    def __getitem__(self, key):
80        return self._attributes[key]
81
82    def __setitem__(self, key, val):
83        self._attributes[key] = val
84
85    def __contains__(self, key):
86        return key in self._attributes
87
88    # VO XML
89
90    def download_xml_content(self):
91        path = self.get_vo_xml_path()
92
93        if 'network_error' not in self._attributes:
94            self['network_error'] = None
95
96        if os.path.exists(path):
97            return
98
99        def fail(reason):
100            reason = str(reason)
101            with open(path, 'wb') as fd:
102                fd.write(f'FAILED: {reason}\n'.encode('utf-8'))
103            self['network_error'] = reason
104
105        r = None
106        try:
107            r = urllib.request.urlopen(
108                self.url.decode('ascii'), timeout=self.timeout)
109        except urllib.error.URLError as e:
110            if hasattr(e, 'reason'):
111                reason = e.reason
112            else:
113                reason = e.code
114            fail(reason)
115            return
116        except http.client.HTTPException as e:
117            fail(f"HTTPException: {str(e)}")
118            return
119        except (socket.timeout, socket.error) as e:
120            fail("Timeout")
121            return
122
123        if r is None:
124            fail("Invalid URL")
125            return
126
127        try:
128            content = r.read()
129        except socket.timeout as e:
130            fail("Timeout")
131            return
132        else:
133            r.close()
134
135        with open(path, 'wb') as fd:
136            fd.write(content)
137
138    def get_xml_content(self):
139        path = self.get_vo_xml_path()
140        if not os.path.exists(path):
141            self.download_xml_content()
142        with open(path, 'rb') as fd:
143            content = fd.read()
144        return content
145
146    def validate_vo(self):
147        path = self.get_vo_xml_path()
148        if not os.path.exists(path):
149            self.download_xml_content()
150        self['version'] = ''
151        if 'network_error' in self and self['network_error'] is not None:
152            self['nwarnings'] = 0
153            self['nexceptions'] = 0
154            self['warnings'] = []
155            self['xmllint'] = None
156            self['warning_types'] = set()
157            return
158
159        nexceptions = 0
160        nwarnings = 0
161        t = None
162        lines = []
163        with open(path, 'rb') as input:
164            with warnings.catch_warnings(record=True) as warning_lines:
165                try:
166                    t = table.parse(input, verify='warn', filename=path)
167                except (ValueError, TypeError, ExpatError) as e:
168                    lines.append(str(e))
169                    nexceptions += 1
170        lines = [str(x.message) for x in warning_lines] + lines
171
172        if t is not None:
173            self['version'] = version = t.version
174        else:
175            self['version'] = version = "1.0"
176
177        if 'xmllint' not in self:
178            # Now check the VO schema based on the version in
179            # the file.
180            try:
181                success, stdout, stderr = xmlutil.validate_schema(path, version)
182            # OSError is raised when XML file eats all memory and
183            # system sends kill signal.
184            except OSError as e:
185                self['xmllint'] = None
186                self['xmllint_content'] = str(e)
187            else:
188                self['xmllint'] = (success == 0)
189                self['xmllint_content'] = stderr
190
191        warning_types = set()
192        for line in lines:
193            w = exceptions.parse_vowarning(line)
194            if w['is_warning']:
195                nwarnings += 1
196            if w['is_exception']:
197                nexceptions += 1
198            warning_types.add(w['warning'])
199
200        self['nwarnings'] = nwarnings
201        self['nexceptions'] = nexceptions
202        self['warnings'] = lines
203        self['warning_types'] = warning_types
204
205    def has_warning(self, warning_code):
206        return warning_code in self['warning_types']
207
208    def match_expectations(self):
209        if 'network_error' not in self:
210            self['network_error'] = None
211
212        if self['expected'] == 'good':
213            return (not self['network_error'] and
214                    self['nwarnings'] == 0 and
215                    self['nexceptions'] == 0)
216        elif self['expected'] == 'incorrect':
217            return (not self['network_error'] and
218                    (self['nwarnings'] > 0 or
219                     self['nexceptions'] > 0))
220        elif self['expected'] == 'broken':
221            return self['network_error'] is not None
222
223    def validate_with_votlint(self, path_to_stilts_jar):
224        filename = self.get_vo_xml_path()
225        p = subprocess.Popen(
226            f"java -jar {path_to_stilts_jar} votlint validate=false {filename}",
227            shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
228        stdout, stderr = p.communicate()
229        if len(stdout) or p.returncode:
230            self['votlint'] = False
231        else:
232            self['votlint'] = True
233        self['votlint_content'] = stdout
234
235
236def get_result_subsets(results, root, s=None):
237    all_results = []
238    correct = []
239    not_expected = []
240    fail_schema = []
241    schema_mismatch = []
242    fail_votlint = []
243    votlint_mismatch = []
244    network_failures = []
245    version_10 = []
246    version_11 = []
247    version_12 = []
248    version_unknown = []
249    has_warnings = []
250    warning_set = {}
251    has_exceptions = []
252    exception_set = {}
253
254    for url in results:
255        if s:
256            next(s)
257
258        if isinstance(url, Result):
259            x = url
260        else:
261            x = Result(url, root=root)
262
263        all_results.append(x)
264        if (x['nwarnings'] == 0 and
265                x['nexceptions'] == 0 and
266                x['xmllint'] is True):
267            correct.append(x)
268        if not x.match_expectations():
269            not_expected.append(x)
270        if x['xmllint'] is False:
271            fail_schema.append(x)
272        if (x['xmllint'] is False and
273                x['nwarnings'] == 0 and
274                x['nexceptions'] == 0):
275            schema_mismatch.append(x)
276        if 'votlint' in x and x['votlint'] is False:
277            fail_votlint.append(x)
278            if 'network_error' not in x:
279                x['network_error'] = None
280            if (x['nwarnings'] == 0 and
281                    x['nexceptions'] == 0 and
282                    x['network_error'] is None):
283                votlint_mismatch.append(x)
284        if 'network_error' in x and x['network_error'] is not None:
285            network_failures.append(x)
286        version = x['version']
287        if version == '1.0':
288            version_10.append(x)
289        elif version == '1.1':
290            version_11.append(x)
291        elif version == '1.2':
292            version_12.append(x)
293        else:
294            version_unknown.append(x)
295        if x['nwarnings'] > 0:
296            has_warnings.append(x)
297            for warning in x['warning_types']:
298                if (warning is not None and
299                        len(warning) == 3 and
300                        warning.startswith('W')):
301                    warning_set.setdefault(warning, [])
302                    warning_set[warning].append(x)
303        if x['nexceptions'] > 0:
304            has_exceptions.append(x)
305            for exc in x['warning_types']:
306                if exc is not None and len(exc) == 3 and exc.startswith('E'):
307                    exception_set.setdefault(exc, [])
308                    exception_set[exc].append(x)
309
310    warning_set = list(warning_set.items())
311    warning_set.sort()
312    exception_set = list(exception_set.items())
313    exception_set.sort()
314
315    tables = [
316        ('all', 'All tests', all_results),
317        ('correct', 'Correct', correct),
318        ('unexpected', 'Unexpected', not_expected),
319        ('schema', 'Invalid against schema', fail_schema),
320        ('schema_mismatch', 'Invalid against schema/Passed vo.table',
321         schema_mismatch, ['ul']),
322        ('fail_votlint', 'Failed votlint', fail_votlint),
323        ('votlint_mismatch', 'Failed votlint/Passed vo.table',
324         votlint_mismatch, ['ul']),
325        ('network_failures', 'Network failures', network_failures),
326        ('version1.0', 'Version 1.0', version_10),
327        ('version1.1', 'Version 1.1', version_11),
328        ('version1.2', 'Version 1.2', version_12),
329        ('version_unknown', 'Version unknown', version_unknown),
330        ('warnings', 'Warnings', has_warnings)]
331    for warning_code, warning in warning_set:
332        if s:
333            next(s)
334
335        warning_class = getattr(exceptions, warning_code, None)
336        if warning_class:
337            warning_descr = warning_class.get_short_name()
338            tables.append(
339                (warning_code,
340                 f'{warning_code}: {warning_descr}',
341                 warning, ['ul', 'li']))
342    tables.append(
343        ('exceptions', 'Exceptions', has_exceptions))
344    for exception_code, exc in exception_set:
345        if s:
346            next(s)
347
348        exception_class = getattr(exceptions, exception_code, None)
349        if exception_class:
350            exception_descr = exception_class.get_short_name()
351            tables.append(
352                (exception_code,
353                 f'{exception_code}: {exception_descr}',
354                 exc, ['ul', 'li']))
355
356    return tables
357