1#!/usr/bin/env python3
2#
3# This Source Code Form is subject to the terms of the Mozilla Public
4# License, v. 2.0. If a copy of the MPL was not distributed with this
5# file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7"""Instrument visualmetrics.py to run in parallel."""
8
9import argparse
10import json
11import os
12import statistics
13import subprocess
14import sys
15import tarfile
16from concurrent.futures import ProcessPoolExecutor
17from functools import partial
18from multiprocessing import cpu_count
19from pathlib import Path
20
21import attr
22import structlog
23from jsonschema import validate
24from voluptuous import ALLOW_EXTRA, Required, Schema
25
26
27#: The directory where artifacts from this job will be placed.
28OUTPUT_DIR = Path("/", "builds", "worker", "artifacts")
29
30#: A job to process through visualmetrics.py
31@attr.s
32class Job:
33    #: The name of the test.
34    test_name = attr.ib(type=str)
35
36    #: json_path: The path to the ``browsertime.json`` file on disk.
37    json_path = attr.ib(type=Path)
38
39    #: video_path: The path of the video file on disk.
40    video_path = attr.ib(type=Path)
41
42
43#: The schema for validating jobs.
44JOB_SCHEMA = Schema(
45    {
46        Required("jobs"): [
47            {Required("test_name"): str, Required("browsertime_json_path"): str}
48        ],
49        Required("application"): {Required("name"): str, "version": str},
50        Required("extra_options"): [str],
51    }
52)
53
54#: A partial schema for browsertime.json files.
55BROWSERTIME_SCHEMA = Schema(
56    [{Required("files"): {Required("video"): [str]}}], extra=ALLOW_EXTRA
57)
58
59with Path("/", "builds", "worker", "performance-artifact-schema.json").open() as f:
60    PERFHERDER_SCHEMA = json.loads(f.read())
61
62
63def run_command(log, cmd):
64    """Run a command using subprocess.check_output
65
66    Args:
67        log: The structlog logger instance.
68        cmd: the command to run as a list of strings.
69
70    Returns:
71        A tuple of the process' exit status and standard output.
72    """
73    log.info("Running command", cmd=cmd)
74    try:
75        res = subprocess.check_output(cmd)
76        log.info("Command succeeded", result=res)
77        return 0, res
78    except subprocess.CalledProcessError as e:
79        log.info("Command failed", cmd=cmd, status=e.returncode, output=e.output)
80        return e.returncode, e.output
81
82
83def append_result(log, suites, test_name, name, result):
84    """Appends a ``name`` metrics result in the ``test_name`` suite.
85
86    Args:
87        log: The structlog logger instance.
88        suites: A mapping containing the suites.
89        test_name: The name of the test.
90        name: The name of the metrics.
91        result: The value to append.
92    """
93    if name.endswith("Progress"):
94        return
95    try:
96        result = int(result)
97    except ValueError:
98        log.error("Could not convert value", name=name)
99        log.error("%s" % result)
100        result = 0
101    if test_name not in suites:
102        suites[test_name] = {"name": test_name, "subtests": {}}
103
104    subtests = suites[test_name]["subtests"]
105    if name not in subtests:
106        subtests[name] = {
107            "name": name,
108            "replicates": [result],
109            "lowerIsBetter": True,
110            "unit": "ms",
111        }
112    else:
113        subtests[name]["replicates"].append(result)
114
115
116def compute_median(subtest):
117    """Adds in the subtest the ``value`` field, which is the average of all
118    replicates.
119
120    Args:
121        subtest: The subtest containing all replicates.
122
123    Returns:
124        The subtest.
125    """
126    if "replicates" not in subtest:
127        return subtest
128    subtest["value"] = statistics.median(subtest["replicates"])
129    return subtest
130
131
132def get_suite(suite):
133    """Returns the suite with computed medians in its subtests.
134
135    Args:
136        suite: The suite to convert.
137
138    Returns:
139        The suite.
140    """
141    suite["subtests"] = [
142        compute_median(subtest) for subtest in suite["subtests"].values()
143    ]
144    return suite
145
146
147def read_json(json_path, schema):
148    """Read the given json file and verify against the provided schema.
149
150    Args:
151        json_path: Path of json file to parse.
152        schema: A callable to validate the JSON's schema.
153
154    Returns:
155        The contents of the file at ``json_path`` interpreted as JSON.
156    """
157    try:
158        with open(str(json_path),  "r", encoding="utf-8", errors="ignore") as f:
159            data = json.load(f)
160    except Exception:
161        log.error("Could not read JSON file", path=json_path, exc_info=True)
162        raise
163
164    log.info("Loaded JSON from file", path=json_path)
165
166    try:
167        schema(data)
168    except Exception:
169        log.error("JSON failed to validate", exc_info=True)
170        raise
171
172    return data
173
174
175def main(log, args):
176    """Run visualmetrics.py in parallel.
177
178    Args:
179        log: The structlog logger instance.
180        args: The parsed arguments from the argument parser.
181
182    Returns:
183        The return code that the program will exit with.
184    """
185    fetch_dir = os.getenv("MOZ_FETCHES_DIR")
186    if not fetch_dir:
187        log.error("Expected MOZ_FETCHES_DIR environment variable.")
188        return 1
189
190    fetch_dir = Path(fetch_dir)
191
192    visualmetrics_path = fetch_dir / "visualmetrics.py"
193    if not visualmetrics_path.exists():
194        log.error(
195            "Could not locate visualmetrics.py", expected_path=str(visualmetrics_path)
196        )
197        return 1
198
199    browsertime_results_path = fetch_dir / "browsertime-results.tgz"
200
201    try:
202        with tarfile.open(str(browsertime_results_path)) as tar:
203            tar.extractall(path=str(fetch_dir))
204    except Exception:
205        log.error(
206            "Could not read/extract browsertime results archive",
207            path=browsertime_results_path,
208            exc_info=True
209        )
210        return 1
211    log.info("Extracted browsertime results", path=browsertime_results_path)
212
213    try:
214        jobs_json_path = fetch_dir / "browsertime-results" / "jobs.json"
215        jobs_json = read_json(jobs_json_path, JOB_SCHEMA)
216    except Exception:
217        log.error(
218            "Could not open the jobs.json file",
219            path=jobs_json_path,
220            exc_info=True
221        )
222        return 1
223
224    jobs = []
225
226    for job in jobs_json["jobs"]:
227        browsertime_json_path = fetch_dir / job["browsertime_json_path"]
228
229        try:
230            browsertime_json = read_json(browsertime_json_path, BROWSERTIME_SCHEMA)
231        except Exception:
232            log.error(
233                "Could not open a browsertime.json file",
234                path=browsertime_json_path,
235                exc_info=True
236            )
237            return 1
238
239        for site in browsertime_json:
240            for video in site["files"]["video"]:
241                jobs.append(
242                    Job(
243                        test_name=job["test_name"],
244                        json_path=browsertime_json_path,
245                        video_path=browsertime_json_path.parent / video,
246                    )
247                )
248
249    failed_runs = 0
250    suites = {}
251
252    with ProcessPoolExecutor(max_workers=cpu_count()) as executor:
253        for job, result in zip(
254            jobs,
255            executor.map(
256                partial(
257                    run_visual_metrics,
258                    visualmetrics_path=visualmetrics_path,
259                    options=args.visual_metrics_options,
260                ),
261                jobs,
262            ),
263        ):
264            returncode, res = result
265            if returncode != 0:
266                log.error(
267                    "Failed to run visualmetrics.py",
268                    video_path=job.video_path,
269                    error=res,
270                )
271                failed_runs += 1
272            else:
273                # Python 3.5 requires a str object (not 3.6+)
274                res = json.loads(res.decode("utf8"))
275                for name, value in res.items():
276                    append_result(log, suites, job.test_name, name, value)
277
278    suites = [get_suite(suite) for suite in suites.values()]
279
280    perf_data = {
281        "framework": {"name": "browsertime"},
282        "application": jobs_json["application"],
283        "type": "vismet",
284        "suites": suites,
285    }
286    for entry in suites:
287        entry["extraOptions"] = jobs_json["extra_options"]
288
289    # Try to get the similarity for all possible tests, this means that we
290    # will also get a comparison of recorded vs. live sites to check
291    # the on-going quality of our recordings.
292    try:
293        from similarity import calculate_similarity
294        for name, value in calculate_similarity(jobs_json, fetch_dir, OUTPUT_DIR).items():
295            if value is None:
296                continue
297            suites[0]["subtests"].append({
298                "name": name,
299                "value": value,
300                "replicates": [value],
301                "lowerIsBetter": False,
302                "unit": "a.u.",
303            })
304    except Exception:
305        log.info("Failed to calculate similarity score", exc_info=True)
306
307    # Validates the perf data complies with perfherder schema.
308    # The perfherder schema uses jsonschema so we can't use voluptuous here.
309    validate(perf_data, PERFHERDER_SCHEMA)
310
311    raw_perf_data = json.dumps(perf_data)
312    with Path(OUTPUT_DIR, "perfherder-data.json").open("w") as f:
313        f.write(raw_perf_data)
314    # Prints the data in logs for Perfherder to pick it up.
315    log.info("PERFHERDER_DATA: %s" % raw_perf_data)
316
317    # Lists the number of processed jobs, failures, and successes.
318    with Path(OUTPUT_DIR, "summary.json").open("w") as f:
319        json.dump(
320            {
321                "total_jobs": len(jobs),
322                "successful_runs": len(jobs) - failed_runs,
323                "failed_runs": failed_runs,
324            },
325            f,
326        )
327
328    # If there's one failure along the way, we want to return > 0
329    # to trigger a red job in TC.
330    return failed_runs
331
332
333def run_visual_metrics(job, visualmetrics_path, options):
334    """Run visualmetrics.py on the input job.
335
336    Returns:
337       A returncode and a string containing the output of visualmetrics.py
338    """
339    cmd = ["/usr/bin/python", str(visualmetrics_path), "--video", str(job.video_path)]
340    cmd.extend(options)
341    return run_command(log, cmd)
342
343
344if __name__ == "__main__":
345    structlog.configure(
346        processors=[
347            structlog.processors.TimeStamper(fmt="iso"),
348            structlog.processors.format_exc_info,
349            structlog.dev.ConsoleRenderer(colors=False),
350        ],
351        cache_logger_on_first_use=True,
352    )
353
354    parser = argparse.ArgumentParser(
355        description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
356    )
357
358    parser.add_argument(
359        "visual_metrics_options",
360        type=str,
361        metavar="VISUAL-METRICS-OPTIONS",
362        help="Options to pass to visualmetrics.py",
363        nargs="*",
364    )
365
366    args = parser.parse_args()
367    log = structlog.get_logger()
368
369    try:
370        sys.exit(main(log, args))
371    except Exception as e:
372        log.error("Unhandled exception: %s" % e, exc_info=True)
373        sys.exit(1)
374