1# This Source Code Form is subject to the terms of the Mozilla Public 2# License, v. 2.0. If a copy of the MPL was not distributed with this 3# file, You can obtain one at http://mozilla.org/MPL/2.0/. 4from __future__ import absolute_import 5 6from mozlog.formatters import base 7import collections 8import os 9import sys 10import subprocess 11import platform 12import six 13 14DEFAULT_MOVE_UP_CODE = u"\x1b[A" 15DEFAULT_CLEAR_EOL_CODE = u"\x1b[K" 16 17 18class GroupingFormatter(base.BaseFormatter): 19 """Formatter designed to produce unexpected test results grouped 20 together in a readable format.""" 21 22 def __init__(self): 23 super(GroupingFormatter, self).__init__() 24 self.number_of_tests = 0 25 self.completed_tests = 0 26 self.need_to_erase_last_line = False 27 self.current_display = "" 28 self.running_tests = {} 29 self.test_output = collections.defaultdict(str) 30 self.subtest_failures = collections.defaultdict(list) 31 self.test_failure_text = "" 32 self.tests_with_failing_subtests = [] 33 self.interactive = os.isatty(sys.stdout.fileno()) 34 self.show_logs = False 35 36 self.message_handler.register_message_handlers( 37 "show_logs", 38 { 39 "on": self._enable_show_logs, 40 "off": self._disable_show_logs, 41 }, 42 ) 43 44 # TODO(mrobinson, 8313): We need to add support for Windows terminals here. 45 if self.interactive: 46 self.move_up, self.clear_eol = self.get_move_up_and_clear_eol_codes() 47 if platform.system() != "Windows": 48 self.line_width = int( 49 subprocess.check_output(["stty", "size"]).split()[1] 50 ) 51 else: 52 # Until we figure out proper Windows support, 53 # this makes things work well enough to run. 54 self.line_width = 80 55 56 self.expected = { 57 "OK": 0, 58 "PASS": 0, 59 "FAIL": 0, 60 "PRECONDITION_FAILED": 0, 61 "ERROR": 0, 62 "TIMEOUT": 0, 63 "SKIP": 0, 64 "CRASH": 0, 65 } 66 67 self.unexpected_tests = { 68 "OK": [], 69 "PASS": [], 70 "FAIL": [], 71 "PRECONDITION_FAILED": [], 72 "ERROR": [], 73 "TIMEOUT": [], 74 "CRASH": [], 75 } 76 77 # Follows the format of {(<test>, <subtest>): <data>}, where 78 # (<test>, None) represents a top level test. 79 self.known_intermittent_results = {} 80 81 def _enable_show_logs(self): 82 self.show_logs = True 83 84 def _disable_show_logs(self): 85 self.show_logs = False 86 87 def get_move_up_and_clear_eol_codes(self): 88 try: 89 import blessings 90 except ImportError: 91 return DEFAULT_MOVE_UP_CODE, DEFAULT_CLEAR_EOL_CODE 92 93 try: 94 self.terminal = blessings.Terminal() 95 return self.terminal.move_up, self.terminal.clear_eol 96 except Exception as exception: 97 sys.stderr.write( 98 "GroupingFormatter: Could not get terminal " 99 "control characters: %s\n" % exception 100 ) 101 return DEFAULT_MOVE_UP_CODE, DEFAULT_CLEAR_EOL_CODE 102 103 def text_to_erase_display(self): 104 if not self.interactive or not self.current_display: 105 return "" 106 return (self.move_up + self.clear_eol) * self.current_display.count("\n") 107 108 def generate_output(self, text=None, new_display=None): 109 if not self.interactive: 110 return text 111 112 output = self.text_to_erase_display() 113 if text: 114 output += text 115 if new_display is not None: 116 self.current_display = new_display 117 return output + self.current_display 118 119 def build_status_line(self): 120 if self.number_of_tests == 0: 121 new_display = " [%i] " % self.completed_tests 122 else: 123 new_display = " [%i/%i] " % (self.completed_tests, self.number_of_tests) 124 125 if self.running_tests: 126 indent = " " * len(new_display) 127 if self.interactive: 128 max_width = self.line_width - len(new_display) 129 else: 130 max_width = sys.maxsize 131 return ( 132 new_display 133 + ("\n%s" % indent).join( 134 val[:max_width] for val in self.running_tests.values() 135 ) 136 + "\n" 137 ) 138 else: 139 return new_display + "No tests running.\n" 140 141 def suite_start(self, data): 142 self.number_of_tests = sum( 143 len(tests) for tests in six.itervalues(data["tests"]) 144 ) 145 self.start_time = data["time"] 146 147 if self.number_of_tests == 0: 148 return "Running tests in %s\n\n" % data[u"source"] 149 else: 150 return "Running %i tests in %s\n\n" % ( 151 self.number_of_tests, 152 data[u"source"], 153 ) 154 155 def test_start(self, data): 156 self.running_tests[data["thread"]] = data["test"] 157 return self.generate_output(text=None, new_display=self.build_status_line()) 158 159 def wrap_and_indent_lines(self, lines, indent): 160 assert len(lines) > 0 161 162 output = indent + u"\u25B6 %s\n" % lines[0] 163 for line in lines[1:-1]: 164 output += indent + u"\u2502 %s\n" % line 165 if len(lines) > 1: 166 output += indent + u"\u2514 %s\n" % lines[-1] 167 return output 168 169 def get_lines_for_unexpected_result( 170 self, test_name, status, expected, message, stack 171 ): 172 # Test names sometimes contain control characters, which we want 173 # to be printed in their raw form, and not their interpreted form. 174 test_name = test_name.encode("unicode-escape").decode("utf-8") 175 176 if expected: 177 expected_text = u" [expected %s]" % expected 178 else: 179 expected_text = u"" 180 181 lines = [u"%s%s %s" % (status, expected_text, test_name)] 182 if message: 183 lines.append(u" \u2192 %s" % message) 184 if stack: 185 lines.append("") 186 lines += [stackline for stackline in stack.splitlines()] 187 return lines 188 189 def get_lines_for_known_intermittents(self, known_intermittent_results): 190 lines = [] 191 192 for (test, subtest), data in six.iteritems(self.known_intermittent_results): 193 status = data["status"] 194 known_intermittent = ", ".join(data["known_intermittent"]) 195 expected = " [expected %s, known intermittent [%s]" % ( 196 data["expected"], 197 known_intermittent, 198 ) 199 lines += [ 200 u"%s%s %s%s" 201 % ( 202 status, 203 expected, 204 test, 205 (", %s" % subtest) if subtest is not None else "", 206 ) 207 ] 208 output = self.wrap_and_indent_lines(lines, " ") + "\n" 209 return output 210 211 def get_output_for_unexpected_subtests(self, test_name, unexpected_subtests): 212 if not unexpected_subtests: 213 return "" 214 215 def add_subtest_failure(lines, subtest, stack=None): 216 lines += self.get_lines_for_unexpected_result( 217 subtest.get("subtest", None), 218 subtest.get("status", None), 219 subtest.get("expected", None), 220 subtest.get("message", None), 221 stack, 222 ) 223 224 def make_subtests_failure(test_name, subtests, stack=None): 225 lines = [u"Unexpected subtest result in %s:" % test_name] 226 for subtest in subtests[:-1]: 227 add_subtest_failure(lines, subtest, None) 228 add_subtest_failure(lines, subtests[-1], stack) 229 return self.wrap_and_indent_lines(lines, " ") + "\n" 230 231 # Organize the failures by stack trace so we don't print the same stack trace 232 # more than once. They are really tall and we don't want to flood the screen 233 # with duplicate information. 234 output = "" 235 failures_by_stack = collections.defaultdict(list) 236 for failure in unexpected_subtests: 237 # Print stackless results first. They are all separate. 238 if "stack" not in failure: 239 output += make_subtests_failure(test_name, [failure], None) 240 else: 241 failures_by_stack[failure["stack"]].append(failure) 242 243 for (stack, failures) in six.iteritems(failures_by_stack): 244 output += make_subtests_failure(test_name, failures, stack) 245 return output 246 247 def test_end(self, data): 248 self.completed_tests += 1 249 test_status = data["status"] 250 test_name = data["test"] 251 known_intermittent_statuses = data.get("known_intermittent", []) 252 subtest_failures = self.subtest_failures.pop(test_name, []) 253 if "expected" in data and test_status not in known_intermittent_statuses: 254 had_unexpected_test_result = True 255 else: 256 had_unexpected_test_result = False 257 258 del self.running_tests[data["thread"]] 259 new_display = self.build_status_line() 260 261 if not had_unexpected_test_result and not subtest_failures: 262 self.expected[test_status] += 1 263 if self.interactive: 264 return self.generate_output(text=None, new_display=new_display) 265 else: 266 return self.generate_output( 267 text=" %s\n" % test_name, new_display=new_display 268 ) 269 270 if test_status in known_intermittent_statuses: 271 self.known_intermittent_results[(test_name, None)] = data 272 273 # If the test crashed or timed out, we also include any process output, 274 # because there is a good chance that the test produced a stack trace 275 # or other error messages. 276 if test_status in ("CRASH", "TIMEOUT"): 277 stack = self.test_output[test_name] + data.get("stack", "") 278 else: 279 stack = data.get("stack", None) 280 281 output = "" 282 if had_unexpected_test_result: 283 self.unexpected_tests[test_status].append(data) 284 lines = self.get_lines_for_unexpected_result( 285 test_name, 286 test_status, 287 data.get("expected", None), 288 data.get("message", None), 289 stack, 290 ) 291 output += self.wrap_and_indent_lines(lines, " ") + "\n" 292 293 if subtest_failures: 294 self.tests_with_failing_subtests.append(test_name) 295 output += self.get_output_for_unexpected_subtests( 296 test_name, subtest_failures 297 ) 298 self.test_failure_text += output 299 300 return self.generate_output(text=output, new_display=new_display) 301 302 def test_status(self, data): 303 if "expected" in data and data["status"] not in data.get( 304 "known_intermittent", [] 305 ): 306 self.subtest_failures[data["test"]].append(data) 307 elif data["status"] in data.get("known_intermittent", []): 308 self.known_intermittent_results[(data["test"], data["subtest"])] = data 309 310 def suite_end(self, data): 311 self.end_time = data["time"] 312 313 if not self.interactive: 314 output = u"\n" 315 else: 316 output = "" 317 318 output += u"Ran %i tests finished in %.1f seconds.\n" % ( 319 self.completed_tests, 320 (self.end_time - self.start_time) / 1000.0, 321 ) 322 output += u" \u2022 %i ran as expected. %i tests skipped.\n" % ( 323 sum(self.expected.values()), 324 self.expected["SKIP"], 325 ) 326 if self.known_intermittent_results: 327 output += u" \u2022 %i known intermittent results.\n" % ( 328 len(self.known_intermittent_results) 329 ) 330 331 def text_for_unexpected_list(text, section): 332 tests = self.unexpected_tests[section] 333 if not tests: 334 return u"" 335 return u" \u2022 %i tests %s\n" % (len(tests), text) 336 337 output += text_for_unexpected_list(u"crashed unexpectedly", "CRASH") 338 output += text_for_unexpected_list(u"had errors unexpectedly", "ERROR") 339 output += text_for_unexpected_list(u"failed unexpectedly", "FAIL") 340 output += text_for_unexpected_list( 341 u"precondition failed unexpectedly", "PRECONDITION_FAILED" 342 ) 343 output += text_for_unexpected_list(u"timed out unexpectedly", "TIMEOUT") 344 output += text_for_unexpected_list(u"passed unexpectedly", "PASS") 345 output += text_for_unexpected_list(u"unexpectedly okay", "OK") 346 347 num_with_failing_subtests = len(self.tests_with_failing_subtests) 348 if num_with_failing_subtests: 349 output += ( 350 u" \u2022 %i tests had unexpected subtest results\n" 351 % num_with_failing_subtests 352 ) 353 output += "\n" 354 355 # Repeat failing test output, so that it is easier to find, since the 356 # non-interactive version prints all the test names. 357 if not self.interactive and self.test_failure_text: 358 output += u"Tests with unexpected results:\n" + self.test_failure_text 359 360 if self.known_intermittent_results: 361 results = self.get_lines_for_known_intermittents( 362 self.known_intermittent_results 363 ) 364 output += u"Tests with known intermittent results:\n" + results 365 366 return self.generate_output(text=output, new_display="") 367 368 def process_output(self, data): 369 if data["thread"] not in self.running_tests: 370 return 371 test_name = self.running_tests[data["thread"]] 372 self.test_output[test_name] += data["data"] + "\n" 373 374 def log(self, data): 375 if data.get("component"): 376 message = "%s %s %s" % (data["component"], data["level"], data["message"]) 377 else: 378 message = "%s %s" % (data["level"], data["message"]) 379 if "stack" in data: 380 message += "\n%s" % data["stack"] 381 382 # We are logging messages that begin with STDERR, because that is how exceptions 383 # in this formatter are indicated. 384 if data["message"].startswith("STDERR"): 385 return self.generate_output(text=message + "\n") 386 387 if data["level"] in ("CRITICAL", "ERROR"): 388 return self.generate_output(text=message + "\n") 389 # Show all messages if show_logs switched on. 390 if self.show_logs: 391 return self.generate_output(text=message + "\n") 392