1import copy 2import json 3import os 4import ssl 5import sys 6import subprocess 7import urllib 8 9import html5lib 10import py 11import pytest 12 13from wptserver import WPTServer 14 15HERE = os.path.dirname(os.path.abspath(__file__)) 16WPT_ROOT = os.path.normpath(os.path.join(HERE, '..', '..')) 17HARNESS = os.path.join(HERE, 'harness.html') 18TEST_TYPES = ('functional', 'unit') 19DEFAULT_VARIANTS = ["?default"] 20 21sys.path.insert(0, os.path.normpath(os.path.join(WPT_ROOT, "tools"))) 22import localpaths 23 24sys.path.insert(0, os.path.normpath(os.path.join(WPT_ROOT, "tools", "webdriver"))) 25import webdriver 26 27 28def pytest_addoption(parser): 29 parser.addoption("--binary", action="store", default=None, help="path to browser binary") 30 parser.addoption("--headless", action="store_true", default=False, help="run browser in headless mode") 31 32 33def pytest_collect_file(file_path, path, parent): 34 if file_path.suffix.lower() != '.html': 35 return 36 37 # Tests are organized in directories by type 38 test_type = os.path.relpath(str(file_path), HERE) 39 if os.path.sep not in test_type or ".." in test_type: 40 # HTML files in this directory are not tests 41 return 42 test_type = test_type.split(os.path.sep)[1] 43 44 # Handle the deprecation of Node construction in pytest6 45 # https://docs.pytest.org/en/stable/deprecations.html#node-construction-changed-to-node-from-parent 46 if hasattr(HTMLItem, "from_parent"): 47 return HTMLItem.from_parent(parent, filename=str(file_path), test_type=test_type) 48 return HTMLItem(parent, str(file_path), test_type) 49 50 51def pytest_configure(config): 52 config.proc = subprocess.Popen(["geckodriver"]) 53 config.add_cleanup(config.proc.kill) 54 55 capabilities = {"alwaysMatch": {"acceptInsecureCerts": True, "moz:firefoxOptions": {}}} 56 if config.getoption("--binary"): 57 capabilities["alwaysMatch"]["moz:firefoxOptions"]["binary"] = config.getoption("--binary") 58 if config.getoption("--headless"): 59 capabilities["alwaysMatch"]["moz:firefoxOptions"]["args"] = ["--headless"] 60 61 config.driver = webdriver.Session("localhost", 4444, 62 capabilities=capabilities) 63 config.add_cleanup(config.driver.end) 64 65 # Although the name of the `_create_unverified_context` method suggests 66 # that it is not intended for external consumption, the standard library's 67 # documentation explicitly endorses its use: 68 # 69 # > To revert to the previous, unverified, behavior 70 # > ssl._create_unverified_context() can be passed to the context 71 # > parameter. 72 # 73 # https://docs.python.org/2/library/httplib.html#httplib.HTTPSConnection 74 config.ssl_context = ssl._create_unverified_context() 75 76 config.server = WPTServer(WPT_ROOT) 77 config.server.start(config.ssl_context) 78 config.add_cleanup(config.server.stop) 79 80 81def resolve_uri(context, uri): 82 if uri.startswith('/'): 83 base = WPT_ROOT 84 path = uri[1:] 85 else: 86 base = os.path.dirname(context) 87 path = uri 88 89 return os.path.exists(os.path.join(base, path)) 90 91 92def _summarize(actual): 93 def _scrub_stack(test_obj): 94 copy = dict(test_obj) 95 del copy['stack'] 96 return copy 97 98 def _expand_status(status_obj): 99 for key, value in [item for item in status_obj.items()]: 100 # In "status" and "test" objects, the "status" value enum 101 # definitions are interspersed with properties for unrelated 102 # metadata. The following condition is a best-effort attempt to 103 # ignore non-enum properties. 104 if key != key.upper() or not isinstance(value, int): 105 continue 106 107 del status_obj[key] 108 109 if status_obj['status'] == value: 110 status_obj[u'status_string'] = key 111 112 del status_obj['status'] 113 114 return status_obj 115 116 def _summarize_test(test_obj): 117 del test_obj['index'] 118 119 assert 'phase' in test_obj 120 assert 'phases' in test_obj 121 assert 'COMPLETE' in test_obj['phases'] 122 assert test_obj['phase'] == test_obj['phases']['COMPLETE'] 123 del test_obj['phases'] 124 del test_obj['phase'] 125 126 return _expand_status(_scrub_stack(test_obj)) 127 128 def _summarize_status(status_obj): 129 return _expand_status(_scrub_stack(status_obj)) 130 131 132 summarized = {} 133 134 summarized[u'summarized_status'] = _summarize_status(actual['status']) 135 summarized[u'summarized_tests'] = [ 136 _summarize_test(test) for test in actual['tests']] 137 summarized[u'summarized_tests'].sort(key=lambda test_obj: test_obj.get('name')) 138 summarized[u'summarized_asserts'] = [ 139 {"assert_name": assert_item["assert_name"], 140 "test": assert_item["test"]["name"] if assert_item["test"] else None, 141 "args": assert_item["args"], 142 "status": assert_item["status"]} for assert_item in actual["asserts"]] 143 summarized[u'type'] = actual['type'] 144 145 return summarized 146 147 148class HTMLItem(pytest.Item, pytest.Collector): 149 def __init__(self, parent, filename, test_type): 150 self.url = parent.session.config.server.url(filename) 151 self.type = test_type 152 self.variants = [] 153 # Some tests are reliant on the WPT servers substitution functionality, 154 # so tests must be retrieved from the server rather than read from the 155 # file system directly. 156 handle = urllib.request.urlopen(self.url, 157 context=parent.session.config.ssl_context) 158 try: 159 markup = handle.read() 160 finally: 161 handle.close() 162 163 if test_type not in TEST_TYPES: 164 raise ValueError('Unrecognized test type: "%s"' % test_type) 165 166 parsed = html5lib.parse(markup, namespaceHTMLElements=False) 167 name = None 168 includes_variants_script = False 169 self.expected = None 170 171 for element in parsed.iter(): 172 if not name and element.tag == 'title': 173 name = element.text 174 continue 175 if element.tag == 'meta' and element.attrib.get('name') == 'variant': 176 self.variants.append(element.attrib.get('content')) 177 continue 178 if element.tag == 'script': 179 if element.attrib.get('id') == 'expected': 180 try: 181 self.expected = json.loads(element.text) 182 except ValueError: 183 print("Failed parsing JSON in %s" % filename) 184 raise 185 186 src = element.attrib.get('src', '') 187 188 if 'variants.js' in src: 189 includes_variants_script = True 190 if not resolve_uri(filename, src): 191 raise ValueError('Could not resolve path "%s" from %s' % (src, filename)) 192 193 if not name: 194 raise ValueError('No name found in %s add a <title> element' % filename) 195 elif self.type == 'functional': 196 if not self.expected: 197 raise ValueError('Functional tests must specify expected report data') 198 if not includes_variants_script: 199 raise ValueError('No variants script found in file %s add ' 200 '\'<script src="../../variants.js"></script>\'' % filename) 201 if len(self.variants) == 0: 202 self.variants = DEFAULT_VARIANTS 203 elif self.type == 'unit' and self.expected: 204 raise ValueError('Unit tests must not specify expected report data') 205 206 # Ensure that distinct items have distinct fspath attributes. 207 # This is necessary because pytest has an internal cache keyed on it, 208 # and only the first test with any given fspath will be run. 209 # 210 # This cannot use super(HTMLItem, self).__init__(..) because only the 211 # Collector constructor takes the fspath argument. 212 pytest.Item.__init__(self, name, parent) 213 pytest.Collector.__init__(self, name, parent, fspath=py.path.local(filename)) 214 215 216 def reportinfo(self): 217 return self.fspath, None, self.url 218 219 def repr_failure(self, excinfo): 220 return pytest.Collector.repr_failure(self, excinfo) 221 222 def runtest(self): 223 if self.type == 'unit': 224 self._run_unit_test() 225 elif self.type == 'functional': 226 self._run_functional_test() 227 else: 228 raise NotImplementedError 229 230 def _run_unit_test(self): 231 driver = self.session.config.driver 232 server = self.session.config.server 233 234 driver.url = server.url(HARNESS) 235 236 actual = driver.execute_async_script( 237 'runTest("%s", "foo", arguments[0])' % self.url 238 ) 239 240 summarized = _summarize(copy.deepcopy(actual)) 241 242 print(json.dumps(summarized, indent=2)) 243 244 assert summarized[u'summarized_status'][u'status_string'] == u'OK', summarized[u'summarized_status'][u'message'] 245 for test in summarized[u'summarized_tests']: 246 msg = "%s\n%s" % (test[u'name'], test[u'message']) 247 assert test[u'status_string'] == u'PASS', msg 248 249 def _run_functional_test(self): 250 for variant in self.variants: 251 self._run_functional_test_variant(variant) 252 253 def _run_functional_test_variant(self, variant): 254 driver = self.session.config.driver 255 server = self.session.config.server 256 257 driver.url = server.url(HARNESS) 258 259 test_url = self.url + variant 260 actual = driver.execute_async_script('runTest("%s", "foo", arguments[0])' % test_url) 261 262 print(json.dumps(actual, indent=2)) 263 264 summarized = _summarize(copy.deepcopy(actual)) 265 266 print(json.dumps(summarized, indent=2)) 267 268 # Test object ordering is not guaranteed. This weak assertion verifies 269 # that the indices are unique and sequential 270 indices = [test_obj.get('index') for test_obj in actual['tests']] 271 self._assert_sequence(indices) 272 273 self.expected[u'summarized_tests'].sort(key=lambda test_obj: test_obj.get('name')) 274 275 # Make asserts opt-in for now 276 if "summarized_asserts" not in self.expected: 277 del summarized["summarized_asserts"] 278 else: 279 # We can't be sure of the order of asserts even within the same test 280 # although we could also check for the failing assert being the final 281 # one 282 for obj in [summarized, self.expected]: 283 obj["summarized_asserts"].sort( 284 key=lambda x: (x["test"] or "", x["status"], x["assert_name"], tuple(x["args"]))) 285 286 assert summarized == self.expected 287 288 @staticmethod 289 def _assert_sequence(nums): 290 if nums and len(nums) > 0: 291 assert nums == list(range(1, nums[-1] + 1)) 292