1#!/usr/bin/env python3 2# 3# This Source Code Form is subject to the terms of the Mozilla Public 4# License, v. 2.0. If a copy of the MPL was not distributed with this 5# file, You can obtain one at http://mozilla.org/MPL/2.0/. 6 7"""Instrument visualmetrics.py to run in parallel.""" 8 9import argparse 10import json 11import os 12import statistics 13import subprocess 14import sys 15import tarfile 16from concurrent.futures import ProcessPoolExecutor 17from functools import partial 18from multiprocessing import cpu_count 19from pathlib import Path 20 21import attr 22import structlog 23from jsonschema import validate 24from voluptuous import ALLOW_EXTRA, Required, Schema 25 26 27#: The directory where artifacts from this job will be placed. 28OUTPUT_DIR = Path("/", "builds", "worker", "artifacts") 29 30#: A job to process through visualmetrics.py 31@attr.s 32class Job: 33 #: The name of the test. 34 test_name = attr.ib(type=str) 35 36 #: json_path: The path to the ``browsertime.json`` file on disk. 37 json_path = attr.ib(type=Path) 38 39 #: video_path: The path of the video file on disk. 40 video_path = attr.ib(type=Path) 41 42 43#: The schema for validating jobs. 44JOB_SCHEMA = Schema( 45 { 46 Required("jobs"): [ 47 {Required("test_name"): str, Required("browsertime_json_path"): str} 48 ], 49 Required("application"): {Required("name"): str, "version": str}, 50 Required("extra_options"): [str], 51 } 52) 53 54#: A partial schema for browsertime.json files. 55BROWSERTIME_SCHEMA = Schema( 56 [{Required("files"): {Required("video"): [str]}}], extra=ALLOW_EXTRA 57) 58 59with Path("/", "builds", "worker", "performance-artifact-schema.json").open() as f: 60 PERFHERDER_SCHEMA = json.loads(f.read()) 61 62 63def run_command(log, cmd): 64 """Run a command using subprocess.check_output 65 66 Args: 67 log: The structlog logger instance. 68 cmd: the command to run as a list of strings. 69 70 Returns: 71 A tuple of the process' exit status and standard output. 72 """ 73 log.info("Running command", cmd=cmd) 74 try: 75 res = subprocess.check_output(cmd) 76 log.info("Command succeeded", result=res) 77 return 0, res 78 except subprocess.CalledProcessError as e: 79 log.info("Command failed", cmd=cmd, status=e.returncode, output=e.output) 80 return e.returncode, e.output 81 82 83def append_result(log, suites, test_name, name, result): 84 """Appends a ``name`` metrics result in the ``test_name`` suite. 85 86 Args: 87 log: The structlog logger instance. 88 suites: A mapping containing the suites. 89 test_name: The name of the test. 90 name: The name of the metrics. 91 result: The value to append. 92 """ 93 if name.endswith("Progress"): 94 return 95 try: 96 result = int(result) 97 except ValueError: 98 log.error("Could not convert value", name=name) 99 log.error("%s" % result) 100 result = 0 101 if test_name not in suites: 102 suites[test_name] = {"name": test_name, "subtests": {}} 103 104 subtests = suites[test_name]["subtests"] 105 if name not in subtests: 106 subtests[name] = { 107 "name": name, 108 "replicates": [result], 109 "lowerIsBetter": True, 110 "unit": "ms", 111 } 112 else: 113 subtests[name]["replicates"].append(result) 114 115 116def compute_median(subtest): 117 """Adds in the subtest the ``value`` field, which is the average of all 118 replicates. 119 120 Args: 121 subtest: The subtest containing all replicates. 122 123 Returns: 124 The subtest. 125 """ 126 if "replicates" not in subtest: 127 return subtest 128 subtest["value"] = statistics.median(subtest["replicates"]) 129 return subtest 130 131 132def get_suite(suite): 133 """Returns the suite with computed medians in its subtests. 134 135 Args: 136 suite: The suite to convert. 137 138 Returns: 139 The suite. 140 """ 141 suite["subtests"] = [ 142 compute_median(subtest) for subtest in suite["subtests"].values() 143 ] 144 return suite 145 146 147def read_json(json_path, schema): 148 """Read the given json file and verify against the provided schema. 149 150 Args: 151 json_path: Path of json file to parse. 152 schema: A callable to validate the JSON's schema. 153 154 Returns: 155 The contents of the file at ``json_path`` interpreted as JSON. 156 """ 157 try: 158 with open(str(json_path), "r", encoding="utf-8", errors="ignore") as f: 159 data = json.load(f) 160 except Exception: 161 log.error("Could not read JSON file", path=json_path, exc_info=True) 162 raise 163 164 log.info("Loaded JSON from file", path=json_path) 165 166 try: 167 schema(data) 168 except Exception: 169 log.error("JSON failed to validate", exc_info=True) 170 raise 171 172 return data 173 174 175def main(log, args): 176 """Run visualmetrics.py in parallel. 177 178 Args: 179 log: The structlog logger instance. 180 args: The parsed arguments from the argument parser. 181 182 Returns: 183 The return code that the program will exit with. 184 """ 185 fetch_dir = os.getenv("MOZ_FETCHES_DIR") 186 if not fetch_dir: 187 log.error("Expected MOZ_FETCHES_DIR environment variable.") 188 return 1 189 190 fetch_dir = Path(fetch_dir) 191 192 visualmetrics_path = fetch_dir / "visualmetrics.py" 193 if not visualmetrics_path.exists(): 194 log.error( 195 "Could not locate visualmetrics.py", expected_path=str(visualmetrics_path) 196 ) 197 return 1 198 199 browsertime_results_path = fetch_dir / "browsertime-results.tgz" 200 201 try: 202 with tarfile.open(str(browsertime_results_path)) as tar: 203 tar.extractall(path=str(fetch_dir)) 204 except Exception: 205 log.error( 206 "Could not read/extract browsertime results archive", 207 path=browsertime_results_path, 208 exc_info=True 209 ) 210 return 1 211 log.info("Extracted browsertime results", path=browsertime_results_path) 212 213 try: 214 jobs_json_path = fetch_dir / "browsertime-results" / "jobs.json" 215 jobs_json = read_json(jobs_json_path, JOB_SCHEMA) 216 except Exception: 217 log.error( 218 "Could not open the jobs.json file", 219 path=jobs_json_path, 220 exc_info=True 221 ) 222 return 1 223 224 jobs = [] 225 226 for job in jobs_json["jobs"]: 227 browsertime_json_path = fetch_dir / job["browsertime_json_path"] 228 229 try: 230 browsertime_json = read_json(browsertime_json_path, BROWSERTIME_SCHEMA) 231 except Exception: 232 log.error( 233 "Could not open a browsertime.json file", 234 path=browsertime_json_path, 235 exc_info=True 236 ) 237 return 1 238 239 for site in browsertime_json: 240 for video in site["files"]["video"]: 241 jobs.append( 242 Job( 243 test_name=job["test_name"], 244 json_path=browsertime_json_path, 245 video_path=browsertime_json_path.parent / video, 246 ) 247 ) 248 249 failed_runs = 0 250 suites = {} 251 252 with ProcessPoolExecutor(max_workers=cpu_count()) as executor: 253 for job, result in zip( 254 jobs, 255 executor.map( 256 partial( 257 run_visual_metrics, 258 visualmetrics_path=visualmetrics_path, 259 options=args.visual_metrics_options, 260 ), 261 jobs, 262 ), 263 ): 264 returncode, res = result 265 if returncode != 0: 266 log.error( 267 "Failed to run visualmetrics.py", 268 video_path=job.video_path, 269 error=res, 270 ) 271 failed_runs += 1 272 else: 273 # Python 3.5 requires a str object (not 3.6+) 274 res = json.loads(res.decode("utf8")) 275 for name, value in res.items(): 276 append_result(log, suites, job.test_name, name, value) 277 278 suites = [get_suite(suite) for suite in suites.values()] 279 280 perf_data = { 281 "framework": {"name": "browsertime"}, 282 "application": jobs_json["application"], 283 "type": "vismet", 284 "suites": suites, 285 } 286 for entry in suites: 287 entry["extraOptions"] = jobs_json["extra_options"] 288 289 # Try to get the similarity for all possible tests, this means that we 290 # will also get a comparison of recorded vs. live sites to check 291 # the on-going quality of our recordings. 292 try: 293 from similarity import calculate_similarity 294 for name, value in calculate_similarity(jobs_json, fetch_dir, OUTPUT_DIR).items(): 295 if value is None: 296 continue 297 suites[0]["subtests"].append({ 298 "name": name, 299 "value": value, 300 "replicates": [value], 301 "lowerIsBetter": False, 302 "unit": "a.u.", 303 }) 304 except Exception: 305 log.info("Failed to calculate similarity score", exc_info=True) 306 307 # Validates the perf data complies with perfherder schema. 308 # The perfherder schema uses jsonschema so we can't use voluptuous here. 309 validate(perf_data, PERFHERDER_SCHEMA) 310 311 raw_perf_data = json.dumps(perf_data) 312 with Path(OUTPUT_DIR, "perfherder-data.json").open("w") as f: 313 f.write(raw_perf_data) 314 # Prints the data in logs for Perfherder to pick it up. 315 log.info("PERFHERDER_DATA: %s" % raw_perf_data) 316 317 # Lists the number of processed jobs, failures, and successes. 318 with Path(OUTPUT_DIR, "summary.json").open("w") as f: 319 json.dump( 320 { 321 "total_jobs": len(jobs), 322 "successful_runs": len(jobs) - failed_runs, 323 "failed_runs": failed_runs, 324 }, 325 f, 326 ) 327 328 # If there's one failure along the way, we want to return > 0 329 # to trigger a red job in TC. 330 return failed_runs 331 332 333def run_visual_metrics(job, visualmetrics_path, options): 334 """Run visualmetrics.py on the input job. 335 336 Returns: 337 A returncode and a string containing the output of visualmetrics.py 338 """ 339 cmd = ["/usr/bin/python", str(visualmetrics_path), "--video", str(job.video_path)] 340 cmd.extend(options) 341 return run_command(log, cmd) 342 343 344if __name__ == "__main__": 345 structlog.configure( 346 processors=[ 347 structlog.processors.TimeStamper(fmt="iso"), 348 structlog.processors.format_exc_info, 349 structlog.dev.ConsoleRenderer(colors=False), 350 ], 351 cache_logger_on_first_use=True, 352 ) 353 354 parser = argparse.ArgumentParser( 355 description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter 356 ) 357 358 parser.add_argument( 359 "visual_metrics_options", 360 type=str, 361 metavar="VISUAL-METRICS-OPTIONS", 362 help="Options to pass to visualmetrics.py", 363 nargs="*", 364 ) 365 366 args = parser.parse_args() 367 log = structlog.get_logger() 368 369 try: 370 sys.exit(main(log, args)) 371 except Exception as e: 372 log.error("Unhandled exception: %s" % e, exc_info=True) 373 sys.exit(1) 374