1import copy
2import json
3import os
4import ssl
5import sys
6import subprocess
7import urllib
8
9import html5lib
10import py
11import pytest
12
13from wptserver import WPTServer
14
15HERE = os.path.dirname(os.path.abspath(__file__))
16WPT_ROOT = os.path.normpath(os.path.join(HERE, '..', '..'))
17HARNESS = os.path.join(HERE, 'harness.html')
18TEST_TYPES = ('functional', 'unit')
19DEFAULT_VARIANTS = ["?default"]
20
21sys.path.insert(0, os.path.normpath(os.path.join(WPT_ROOT, "tools")))
22import localpaths
23
24sys.path.insert(0, os.path.normpath(os.path.join(WPT_ROOT, "tools", "webdriver")))
25import webdriver
26
27
28def pytest_addoption(parser):
29    parser.addoption("--binary", action="store", default=None, help="path to browser binary")
30    parser.addoption("--headless", action="store_true", default=False, help="run browser in headless mode")
31
32
33def pytest_collect_file(file_path, path, parent):
34    if file_path.suffix.lower() != '.html':
35        return
36
37    # Tests are organized in directories by type
38    test_type = os.path.relpath(str(file_path), HERE)
39    if os.path.sep not in test_type or ".." in test_type:
40        # HTML files in this directory are not tests
41        return
42    test_type = test_type.split(os.path.sep)[1]
43
44    # Handle the deprecation of Node construction in pytest6
45    # https://docs.pytest.org/en/stable/deprecations.html#node-construction-changed-to-node-from-parent
46    if hasattr(HTMLItem, "from_parent"):
47        return HTMLItem.from_parent(parent, filename=str(file_path), test_type=test_type)
48    return HTMLItem(parent, str(file_path), test_type)
49
50
51def pytest_configure(config):
52    config.proc = subprocess.Popen(["geckodriver"])
53    config.add_cleanup(config.proc.kill)
54
55    capabilities = {"alwaysMatch": {"acceptInsecureCerts": True, "moz:firefoxOptions": {}}}
56    if config.getoption("--binary"):
57        capabilities["alwaysMatch"]["moz:firefoxOptions"]["binary"] = config.getoption("--binary")
58    if config.getoption("--headless"):
59        capabilities["alwaysMatch"]["moz:firefoxOptions"]["args"] = ["--headless"]
60
61    config.driver = webdriver.Session("localhost", 4444,
62                                      capabilities=capabilities)
63    config.add_cleanup(config.driver.end)
64
65    # Although the name of the `_create_unverified_context` method suggests
66    # that it is not intended for external consumption, the standard library's
67    # documentation explicitly endorses its use:
68    #
69    # > To revert to the previous, unverified, behavior
70    # > ssl._create_unverified_context() can be passed to the context
71    # > parameter.
72    #
73    # https://docs.python.org/2/library/httplib.html#httplib.HTTPSConnection
74    config.ssl_context = ssl._create_unverified_context()
75
76    config.server = WPTServer(WPT_ROOT)
77    config.server.start(config.ssl_context)
78    config.add_cleanup(config.server.stop)
79
80
81def resolve_uri(context, uri):
82    if uri.startswith('/'):
83        base = WPT_ROOT
84        path = uri[1:]
85    else:
86        base = os.path.dirname(context)
87        path = uri
88
89    return os.path.exists(os.path.join(base, path))
90
91
92def _summarize(actual):
93    def _scrub_stack(test_obj):
94        copy = dict(test_obj)
95        del copy['stack']
96        return copy
97
98    def _expand_status(status_obj):
99        for key, value in [item for item in status_obj.items()]:
100            # In "status" and "test" objects, the "status" value enum
101            # definitions are interspersed with properties for unrelated
102            # metadata. The following condition is a best-effort attempt to
103            # ignore non-enum properties.
104            if key != key.upper() or not isinstance(value, int):
105                continue
106
107            del status_obj[key]
108
109            if status_obj['status'] == value:
110                status_obj[u'status_string'] = key
111
112        del status_obj['status']
113
114        return status_obj
115
116    def _summarize_test(test_obj):
117        del test_obj['index']
118
119        assert 'phase' in test_obj
120        assert 'phases' in test_obj
121        assert 'COMPLETE' in test_obj['phases']
122        assert test_obj['phase'] == test_obj['phases']['COMPLETE']
123        del test_obj['phases']
124        del test_obj['phase']
125
126        return _expand_status(_scrub_stack(test_obj))
127
128    def _summarize_status(status_obj):
129        return _expand_status(_scrub_stack(status_obj))
130
131
132    summarized = {}
133
134    summarized[u'summarized_status'] = _summarize_status(actual['status'])
135    summarized[u'summarized_tests'] = [
136        _summarize_test(test) for test in actual['tests']]
137    summarized[u'summarized_tests'].sort(key=lambda test_obj: test_obj.get('name'))
138    summarized[u'summarized_asserts'] = [
139        {"assert_name": assert_item["assert_name"],
140        "test": assert_item["test"]["name"] if assert_item["test"] else None,
141        "args": assert_item["args"],
142        "status": assert_item["status"]} for assert_item in actual["asserts"]]
143    summarized[u'type'] = actual['type']
144
145    return summarized
146
147
148class HTMLItem(pytest.Item, pytest.Collector):
149    def __init__(self, parent, filename, test_type):
150        self.url = parent.session.config.server.url(filename)
151        self.type = test_type
152        self.variants = []
153        # Some tests are reliant on the WPT servers substitution functionality,
154        # so tests must be retrieved from the server rather than read from the
155        # file system directly.
156        handle = urllib.request.urlopen(self.url,
157                                        context=parent.session.config.ssl_context)
158        try:
159            markup = handle.read()
160        finally:
161            handle.close()
162
163        if test_type not in TEST_TYPES:
164            raise ValueError('Unrecognized test type: "%s"' % test_type)
165
166        parsed = html5lib.parse(markup, namespaceHTMLElements=False)
167        name = None
168        includes_variants_script = False
169        self.expected = None
170
171        for element in parsed.iter():
172            if not name and element.tag == 'title':
173                name = element.text
174                continue
175            if element.tag == 'meta' and element.attrib.get('name') == 'variant':
176                self.variants.append(element.attrib.get('content'))
177                continue
178            if element.tag == 'script':
179                if element.attrib.get('id') == 'expected':
180                    try:
181                        self.expected = json.loads(element.text)
182                    except ValueError:
183                        print("Failed parsing JSON in %s" % filename)
184                        raise
185
186                src = element.attrib.get('src', '')
187
188                if 'variants.js' in src:
189                    includes_variants_script = True
190                    if not resolve_uri(filename, src):
191                        raise ValueError('Could not resolve path "%s" from %s' % (src, filename))
192
193        if not name:
194            raise ValueError('No name found in %s add a <title> element' % filename)
195        elif self.type == 'functional':
196            if not self.expected:
197                raise ValueError('Functional tests must specify expected report data')
198            if not includes_variants_script:
199                raise ValueError('No variants script found in file %s add '
200                                 '\'<script src="../../variants.js"></script>\'' % filename)
201            if len(self.variants) == 0:
202                self.variants = DEFAULT_VARIANTS
203        elif self.type == 'unit' and self.expected:
204            raise ValueError('Unit tests must not specify expected report data')
205
206        # Ensure that distinct items have distinct fspath attributes.
207        # This is necessary because pytest has an internal cache keyed on it,
208        # and only the first test with any given fspath will be run.
209        #
210        # This cannot use super(HTMLItem, self).__init__(..) because only the
211        # Collector constructor takes the fspath argument.
212        pytest.Item.__init__(self, name, parent)
213        pytest.Collector.__init__(self, name, parent, fspath=py.path.local(filename))
214
215
216    def reportinfo(self):
217        return self.fspath, None, self.url
218
219    def repr_failure(self, excinfo):
220        return pytest.Collector.repr_failure(self, excinfo)
221
222    def runtest(self):
223        if self.type == 'unit':
224            self._run_unit_test()
225        elif self.type == 'functional':
226            self._run_functional_test()
227        else:
228            raise NotImplementedError
229
230    def _run_unit_test(self):
231        driver = self.session.config.driver
232        server = self.session.config.server
233
234        driver.url = server.url(HARNESS)
235
236        actual = driver.execute_async_script(
237            'runTest("%s", "foo", arguments[0])' % self.url
238        )
239
240        summarized = _summarize(copy.deepcopy(actual))
241
242        print(json.dumps(summarized, indent=2))
243
244        assert summarized[u'summarized_status'][u'status_string'] == u'OK', summarized[u'summarized_status'][u'message']
245        for test in summarized[u'summarized_tests']:
246            msg = "%s\n%s" % (test[u'name'], test[u'message'])
247            assert test[u'status_string'] == u'PASS', msg
248
249    def _run_functional_test(self):
250        for variant in self.variants:
251            self._run_functional_test_variant(variant)
252
253    def _run_functional_test_variant(self, variant):
254        driver = self.session.config.driver
255        server = self.session.config.server
256
257        driver.url = server.url(HARNESS)
258
259        test_url = self.url + variant
260        actual = driver.execute_async_script('runTest("%s", "foo", arguments[0])' % test_url)
261
262        print(json.dumps(actual, indent=2))
263
264        summarized = _summarize(copy.deepcopy(actual))
265
266        print(json.dumps(summarized, indent=2))
267
268        # Test object ordering is not guaranteed. This weak assertion verifies
269        # that the indices are unique and sequential
270        indices = [test_obj.get('index') for test_obj in actual['tests']]
271        self._assert_sequence(indices)
272
273        self.expected[u'summarized_tests'].sort(key=lambda test_obj: test_obj.get('name'))
274
275        # Make asserts opt-in for now
276        if "summarized_asserts" not in self.expected:
277            del summarized["summarized_asserts"]
278        else:
279            # We can't be sure of the order of asserts even within the same test
280            # although we could also check for the failing assert being the final
281            # one
282            for obj in [summarized, self.expected]:
283                obj["summarized_asserts"].sort(
284                    key=lambda x: (x["test"] or "", x["status"], x["assert_name"], tuple(x["args"])))
285
286        assert summarized == self.expected
287
288    @staticmethod
289    def _assert_sequence(nums):
290        if nums and len(nums) > 0:
291            assert nums == list(range(1, nums[-1] + 1))
292