1import json 2import re 3import sys 4 5from mozlog.structured.formatters.base import BaseFormatter 6from ..executors.base import strip_server 7 8 9LONE_SURROGATE_RE = re.compile(u"[\uD800-\uDFFF]") 10 11 12def surrogate_replacement_ucs4(match): 13 return "U+" + hex(ord(match.group()))[2:] 14 15 16class SurrogateReplacementUcs2(object): 17 def __init__(self): 18 self.skip = False 19 20 def __call__(self, match): 21 char = match.group() 22 23 if self.skip: 24 self.skip = False 25 return char 26 27 is_low = 0xD800 <= ord(char) <= 0xDBFF 28 29 escape = True 30 if is_low: 31 next_idx = match.end() 32 if next_idx < len(match.string): 33 next_char = match.string[next_idx] 34 if 0xDC00 <= ord(next_char) <= 0xDFFF: 35 escape = False 36 37 if not escape: 38 self.skip = True 39 return char 40 41 return "U+" + hex(ord(match.group()))[2:] 42 43 44if sys.maxunicode == 0x10FFFF: 45 surrogate_replacement = surrogate_replacement_ucs4 46else: 47 surrogate_replacement = SurrogateReplacementUcs2() 48 49 50def replace_lone_surrogate(data): 51 return LONE_SURROGATE_RE.subn(surrogate_replacement, data)[0] 52 53 54class WptreportFormatter(BaseFormatter): 55 """Formatter that produces results in the format that wptreport expects.""" 56 57 def __init__(self): 58 self.raw_results = {} 59 self.results = {} 60 61 def suite_start(self, data): 62 if 'run_info' in data: 63 self.results['run_info'] = data['run_info'] 64 self.results['time_start'] = data['time'] 65 self.results["results"] = [] 66 67 def suite_end(self, data): 68 self.results['time_end'] = data['time'] 69 for test_name in self.raw_results: 70 result = {"test": test_name} 71 result.update(self.raw_results[test_name]) 72 self.results["results"].append(result) 73 return json.dumps(self.results) + "\n" 74 75 def find_or_create_test(self, data): 76 test_name = data["test"] 77 if test_name not in self.raw_results: 78 self.raw_results[test_name] = { 79 "subtests": [], 80 "status": "", 81 "message": None 82 } 83 return self.raw_results[test_name] 84 85 def test_start(self, data): 86 test = self.find_or_create_test(data) 87 test["start_time"] = data["time"] 88 89 def create_subtest(self, data): 90 test = self.find_or_create_test(data) 91 subtest_name = replace_lone_surrogate(data["subtest"]) 92 93 subtest = { 94 "name": subtest_name, 95 "status": "", 96 "message": None 97 } 98 test["subtests"].append(subtest) 99 100 return subtest 101 102 def test_status(self, data): 103 subtest = self.create_subtest(data) 104 subtest["status"] = data["status"] 105 if "expected" in data: 106 subtest["expected"] = data["expected"] 107 if "known_intermittent" in data: 108 subtest["known_intermittent"] = data["known_intermittent"] 109 if "message" in data: 110 subtest["message"] = replace_lone_surrogate(data["message"]) 111 112 def test_end(self, data): 113 test = self.find_or_create_test(data) 114 start_time = test.pop("start_time") 115 test["duration"] = data["time"] - start_time 116 test["status"] = data["status"] 117 if "expected" in data: 118 test["expected"] = data["expected"] 119 if "known_intermittent" in data: 120 test["known_intermittent"] = data["known_intermittent"] 121 if "message" in data: 122 test["message"] = replace_lone_surrogate(data["message"]) 123 if "reftest_screenshots" in data.get("extra", {}): 124 test["screenshots"] = { 125 strip_server(item["url"]): "sha1:" + item["hash"] 126 for item in data["extra"]["reftest_screenshots"] 127 if type(item) == dict 128 } 129 test_name = data["test"] 130 result = {"test": data["test"]} 131 result.update(self.raw_results[test_name]) 132 self.results["results"].append(result) 133 self.raw_results.pop(test_name) 134 135 def assertion_count(self, data): 136 test = self.find_or_create_test(data) 137 test["asserts"] = { 138 "count": data["count"], 139 "min": data["min_expected"], 140 "max": data["max_expected"] 141 } 142 143 def lsan_leak(self, data): 144 if "lsan_leaks" not in self.results: 145 self.results["lsan_leaks"] = [] 146 lsan_leaks = self.results["lsan_leaks"] 147 lsan_leaks.append({"frames": data["frames"], 148 "scope": data["scope"], 149 "allowed_match": data.get("allowed_match")}) 150 151 def find_or_create_mozleak(self, data): 152 if "mozleak" not in self.results: 153 self.results["mozleak"] = {} 154 scope = data["scope"] 155 if scope not in self.results["mozleak"]: 156 self.results["mozleak"][scope] = {"objects": [], "total": []} 157 return self.results["mozleak"][scope] 158 159 def mozleak_object(self, data): 160 scope_data = self.find_or_create_mozleak(data) 161 scope_data["objects"].append({"process": data["process"], 162 "name": data["name"], 163 "allowed": data.get("allowed", False), 164 "bytes": data["bytes"]}) 165 166 def mozleak_total(self, data): 167 scope_data = self.find_or_create_mozleak(data) 168 scope_data["total"].append({"bytes": data["bytes"], 169 "threshold": data.get("threshold", 0), 170 "process": data["process"]}) 171