1import json
2import re
3import sys
4
5from mozlog.structured.formatters.base import BaseFormatter
6from ..executors.base import strip_server
7
8
9LONE_SURROGATE_RE = re.compile(u"[\uD800-\uDFFF]")
10
11
12def surrogate_replacement_ucs4(match):
13    return "U+" + hex(ord(match.group()))[2:]
14
15
16class SurrogateReplacementUcs2(object):
17    def __init__(self):
18        self.skip = False
19
20    def __call__(self, match):
21        char = match.group()
22
23        if self.skip:
24            self.skip = False
25            return char
26
27        is_low = 0xD800 <= ord(char) <= 0xDBFF
28
29        escape = True
30        if is_low:
31            next_idx = match.end()
32            if next_idx < len(match.string):
33                next_char = match.string[next_idx]
34                if 0xDC00 <= ord(next_char) <= 0xDFFF:
35                    escape = False
36
37        if not escape:
38            self.skip = True
39            return char
40
41        return "U+" + hex(ord(match.group()))[2:]
42
43
44if sys.maxunicode == 0x10FFFF:
45    surrogate_replacement = surrogate_replacement_ucs4
46else:
47    surrogate_replacement = SurrogateReplacementUcs2()
48
49
50def replace_lone_surrogate(data):
51    return LONE_SURROGATE_RE.subn(surrogate_replacement, data)[0]
52
53
54class WptreportFormatter(BaseFormatter):
55    """Formatter that produces results in the format that wptreport expects."""
56
57    def __init__(self):
58        self.raw_results = {}
59        self.results = {}
60
61    def suite_start(self, data):
62        if 'run_info' in data:
63            self.results['run_info'] = data['run_info']
64        self.results['time_start'] = data['time']
65        self.results["results"] = []
66
67    def suite_end(self, data):
68        self.results['time_end'] = data['time']
69        for test_name in self.raw_results:
70            result = {"test": test_name}
71            result.update(self.raw_results[test_name])
72            self.results["results"].append(result)
73        return json.dumps(self.results) + "\n"
74
75    def find_or_create_test(self, data):
76        test_name = data["test"]
77        if test_name not in self.raw_results:
78            self.raw_results[test_name] = {
79                "subtests": [],
80                "status": "",
81                "message": None
82            }
83        return self.raw_results[test_name]
84
85    def test_start(self, data):
86        test = self.find_or_create_test(data)
87        test["start_time"] = data["time"]
88
89    def create_subtest(self, data):
90        test = self.find_or_create_test(data)
91        subtest_name = replace_lone_surrogate(data["subtest"])
92
93        subtest = {
94            "name": subtest_name,
95            "status": "",
96            "message": None
97        }
98        test["subtests"].append(subtest)
99
100        return subtest
101
102    def test_status(self, data):
103        subtest = self.create_subtest(data)
104        subtest["status"] = data["status"]
105        if "expected" in data:
106            subtest["expected"] = data["expected"]
107        if "known_intermittent" in data:
108            subtest["known_intermittent"] = data["known_intermittent"]
109        if "message" in data:
110            subtest["message"] = replace_lone_surrogate(data["message"])
111
112    def test_end(self, data):
113        test = self.find_or_create_test(data)
114        start_time = test.pop("start_time")
115        test["duration"] = data["time"] - start_time
116        test["status"] = data["status"]
117        if "expected" in data:
118            test["expected"] = data["expected"]
119        if "known_intermittent" in data:
120            test["known_intermittent"] = data["known_intermittent"]
121        if "message" in data:
122            test["message"] = replace_lone_surrogate(data["message"])
123        if "reftest_screenshots" in data.get("extra", {}):
124            test["screenshots"] = {
125                strip_server(item["url"]): "sha1:" + item["hash"]
126                for item in data["extra"]["reftest_screenshots"]
127                if type(item) == dict
128            }
129        test_name = data["test"]
130        result = {"test": data["test"]}
131        result.update(self.raw_results[test_name])
132        self.results["results"].append(result)
133        self.raw_results.pop(test_name)
134
135    def assertion_count(self, data):
136        test = self.find_or_create_test(data)
137        test["asserts"] = {
138            "count": data["count"],
139            "min": data["min_expected"],
140            "max": data["max_expected"]
141        }
142
143    def lsan_leak(self, data):
144        if "lsan_leaks" not in self.results:
145            self.results["lsan_leaks"] = []
146        lsan_leaks = self.results["lsan_leaks"]
147        lsan_leaks.append({"frames": data["frames"],
148                           "scope": data["scope"],
149                           "allowed_match": data.get("allowed_match")})
150
151    def find_or_create_mozleak(self, data):
152        if "mozleak" not in self.results:
153            self.results["mozleak"] = {}
154        scope = data["scope"]
155        if scope not in self.results["mozleak"]:
156            self.results["mozleak"][scope] = {"objects": [], "total": []}
157        return self.results["mozleak"][scope]
158
159    def mozleak_object(self, data):
160        scope_data = self.find_or_create_mozleak(data)
161        scope_data["objects"].append({"process": data["process"],
162                                      "name": data["name"],
163                                      "allowed": data.get("allowed", False),
164                                      "bytes": data["bytes"]})
165
166    def mozleak_total(self, data):
167        scope_data = self.find_or_create_mozleak(data)
168        scope_data["total"].append({"bytes": data["bytes"],
169                                    "threshold": data.get("threshold", 0),
170                                    "process": data["process"]})
171