1# This Source Code Form is subject to the terms of the Mozilla Public
2# License, v. 2.0. If a copy of the MPL was not distributed with this
3# file, You can obtain one at http://mozilla.org/MPL/2.0/.
4
5
6from __future__ import absolute_import
7
8import json
9from collections import defaultdict
10
11from .base import BaseFormatter
12
13
14class ErrorSummaryFormatter(BaseFormatter):
15    def __init__(self):
16        self.test_to_group = {}
17        self.groups = defaultdict(
18            lambda: {
19                "status": None,
20                "start": None,
21                "end": None,
22            }
23        )
24        self.line_count = 0
25
26    def __call__(self, data):
27        rv = BaseFormatter.__call__(self, data)
28        self.line_count += 1
29        return rv
30
31    def _output(self, data_type, data):
32        data["action"] = data_type
33        data["line"] = self.line_count
34        return "%s\n" % json.dumps(data)
35
36    def _output_test(self, test, subtest, item):
37        data = {
38            "test": test,
39            "subtest": subtest,
40            "group": self.test_to_group.get(test, ""),
41            "status": item["status"],
42            "expected": item["expected"],
43            "message": item.get("message"),
44            "stack": item.get("stack"),
45            "known_intermittent": item.get("known_intermittent", []),
46        }
47        return self._output("test_result", data)
48
49    def _update_group_result(self, group, item):
50        ginfo = self.groups[group]
51
52        if item["status"] == "SKIP":
53            if ginfo["status"] is None:
54                ginfo["status"] = "SKIP"
55        elif (
56            "expected" not in item
57            or item["status"] == item["expected"]
58            or item["status"] in item.get("known_intermittent", [])
59        ):
60            if ginfo["status"] in (None, "SKIP"):
61                ginfo["status"] = "OK"
62        else:
63            ginfo["status"] = "ERROR"
64
65    def suite_start(self, item):
66        self.test_to_group = {v: k for k in item["tests"] for v in item["tests"][k]}
67        return self._output("test_groups", {"groups": list(item["tests"].keys())})
68
69    def suite_end(self, data):
70        output = []
71        for group, info in self.groups.items():
72            if info["start"] is None or info["end"] is None:
73                duration = None
74            else:
75                duration = info["end"] - info["start"]
76
77            output.append(
78                self._output(
79                    "group_result",
80                    {
81                        "group": group,
82                        "status": info["status"],
83                        "duration": duration,
84                    },
85                )
86            )
87
88        return "".join(output)
89
90    def test_start(self, item):
91        group = self.test_to_group.get(item["test"], None)
92        if group and self.groups[group]["start"] is None:
93            self.groups[group]["start"] = item["time"]
94
95    def test_status(self, item):
96        group = self.test_to_group.get(item["test"], None)
97        if group:
98            self._update_group_result(group, item)
99
100        if "expected" not in item:
101            return
102
103        return self._output_test(item["test"], item["subtest"], item)
104
105    def test_end(self, item):
106        group = self.test_to_group.get(item["test"], None)
107        if group:
108            self._update_group_result(group, item)
109            self.groups[group]["end"] = item["time"]
110
111        if "expected" not in item:
112            return
113
114        return self._output_test(item["test"], None, item)
115
116    def log(self, item):
117        if item["level"] not in ("ERROR", "CRITICAL"):
118            return
119
120        data = {"level": item["level"], "message": item["message"]}
121        return self._output("log", data)
122
123    def crash(self, item):
124        data = {
125            "test": item.get("test"),
126            "signature": item["signature"],
127            "stackwalk_stdout": item.get("stackwalk_stdout"),
128            "stackwalk_stderr": item.get("stackwalk_stderr"),
129        }
130        return self._output("crash", data)
131
132    def lint(self, item):
133        data = {
134            "level": item["level"],
135            "path": item["path"],
136            "message": item["message"],
137            "lineno": item["lineno"],
138            "column": item.get("column"),
139            "rule": item.get("rule"),
140            "linter": item.get("linter"),
141        }
142        self._output("lint", data)
143