1# This Source Code Form is subject to the terms of the Mozilla Public
2# License, v. 2.0. If a copy of the MPL was not distributed with this
3# file, You can obtain one at http://mozilla.org/MPL/2.0/.
4from __future__ import absolute_import
5
6from mozlog.formatters import base
7import collections
8import os
9import sys
10import subprocess
11import platform
12import six
13
14DEFAULT_MOVE_UP_CODE = u"\x1b[A"
15DEFAULT_CLEAR_EOL_CODE = u"\x1b[K"
16
17
18class GroupingFormatter(base.BaseFormatter):
19    """Formatter designed to produce unexpected test results grouped
20    together in a readable format."""
21
22    def __init__(self):
23        super(GroupingFormatter, self).__init__()
24        self.number_of_tests = 0
25        self.completed_tests = 0
26        self.need_to_erase_last_line = False
27        self.current_display = ""
28        self.running_tests = {}
29        self.test_output = collections.defaultdict(str)
30        self.subtest_failures = collections.defaultdict(list)
31        self.test_failure_text = ""
32        self.tests_with_failing_subtests = []
33        self.interactive = os.isatty(sys.stdout.fileno())
34        self.show_logs = False
35
36        self.message_handler.register_message_handlers(
37            "show_logs",
38            {
39                "on": self._enable_show_logs,
40                "off": self._disable_show_logs,
41            },
42        )
43
44        # TODO(mrobinson, 8313): We need to add support for Windows terminals here.
45        if self.interactive:
46            self.move_up, self.clear_eol = self.get_move_up_and_clear_eol_codes()
47            if platform.system() != "Windows":
48                self.line_width = int(
49                    subprocess.check_output(["stty", "size"]).split()[1]
50                )
51            else:
52                # Until we figure out proper Windows support,
53                # this makes things work well enough to run.
54                self.line_width = 80
55
56        self.expected = {
57            "OK": 0,
58            "PASS": 0,
59            "FAIL": 0,
60            "PRECONDITION_FAILED": 0,
61            "ERROR": 0,
62            "TIMEOUT": 0,
63            "SKIP": 0,
64            "CRASH": 0,
65        }
66
67        self.unexpected_tests = {
68            "OK": [],
69            "PASS": [],
70            "FAIL": [],
71            "PRECONDITION_FAILED": [],
72            "ERROR": [],
73            "TIMEOUT": [],
74            "CRASH": [],
75        }
76
77        # Follows the format of {(<test>, <subtest>): <data>}, where
78        # (<test>, None) represents a top level test.
79        self.known_intermittent_results = {}
80
81    def _enable_show_logs(self):
82        self.show_logs = True
83
84    def _disable_show_logs(self):
85        self.show_logs = False
86
87    def get_move_up_and_clear_eol_codes(self):
88        try:
89            import blessings
90        except ImportError:
91            return DEFAULT_MOVE_UP_CODE, DEFAULT_CLEAR_EOL_CODE
92
93        try:
94            self.terminal = blessings.Terminal()
95            return self.terminal.move_up, self.terminal.clear_eol
96        except Exception as exception:
97            sys.stderr.write(
98                "GroupingFormatter: Could not get terminal "
99                "control characters: %s\n" % exception
100            )
101            return DEFAULT_MOVE_UP_CODE, DEFAULT_CLEAR_EOL_CODE
102
103    def text_to_erase_display(self):
104        if not self.interactive or not self.current_display:
105            return ""
106        return (self.move_up + self.clear_eol) * self.current_display.count("\n")
107
108    def generate_output(self, text=None, new_display=None):
109        if not self.interactive:
110            return text
111
112        output = self.text_to_erase_display()
113        if text:
114            output += text
115        if new_display is not None:
116            self.current_display = new_display
117        return output + self.current_display
118
119    def build_status_line(self):
120        if self.number_of_tests == 0:
121            new_display = "  [%i] " % self.completed_tests
122        else:
123            new_display = "  [%i/%i] " % (self.completed_tests, self.number_of_tests)
124
125        if self.running_tests:
126            indent = " " * len(new_display)
127            if self.interactive:
128                max_width = self.line_width - len(new_display)
129            else:
130                max_width = sys.maxsize
131            return (
132                new_display
133                + ("\n%s" % indent).join(
134                    val[:max_width] for val in self.running_tests.values()
135                )
136                + "\n"
137            )
138        else:
139            return new_display + "No tests running.\n"
140
141    def suite_start(self, data):
142        self.number_of_tests = sum(
143            len(tests) for tests in six.itervalues(data["tests"])
144        )
145        self.start_time = data["time"]
146
147        if self.number_of_tests == 0:
148            return "Running tests in %s\n\n" % data[u"source"]
149        else:
150            return "Running %i tests in %s\n\n" % (
151                self.number_of_tests,
152                data[u"source"],
153            )
154
155    def test_start(self, data):
156        self.running_tests[data["thread"]] = data["test"]
157        return self.generate_output(text=None, new_display=self.build_status_line())
158
159    def wrap_and_indent_lines(self, lines, indent):
160        assert len(lines) > 0
161
162        output = indent + u"\u25B6 %s\n" % lines[0]
163        for line in lines[1:-1]:
164            output += indent + u"\u2502 %s\n" % line
165        if len(lines) > 1:
166            output += indent + u"\u2514 %s\n" % lines[-1]
167        return output
168
169    def get_lines_for_unexpected_result(
170        self, test_name, status, expected, message, stack
171    ):
172        # Test names sometimes contain control characters, which we want
173        # to be printed in their raw form, and not their interpreted form.
174        test_name = test_name.encode("unicode-escape").decode("utf-8")
175
176        if expected:
177            expected_text = u" [expected %s]" % expected
178        else:
179            expected_text = u""
180
181        lines = [u"%s%s %s" % (status, expected_text, test_name)]
182        if message:
183            lines.append(u"  \u2192 %s" % message)
184        if stack:
185            lines.append("")
186            lines += [stackline for stackline in stack.splitlines()]
187        return lines
188
189    def get_lines_for_known_intermittents(self, known_intermittent_results):
190        lines = []
191
192        for (test, subtest), data in six.iteritems(self.known_intermittent_results):
193            status = data["status"]
194            known_intermittent = ", ".join(data["known_intermittent"])
195            expected = " [expected %s, known intermittent [%s]" % (
196                data["expected"],
197                known_intermittent,
198            )
199            lines += [
200                u"%s%s %s%s"
201                % (
202                    status,
203                    expected,
204                    test,
205                    (", %s" % subtest) if subtest is not None else "",
206                )
207            ]
208        output = self.wrap_and_indent_lines(lines, "  ") + "\n"
209        return output
210
211    def get_output_for_unexpected_subtests(self, test_name, unexpected_subtests):
212        if not unexpected_subtests:
213            return ""
214
215        def add_subtest_failure(lines, subtest, stack=None):
216            lines += self.get_lines_for_unexpected_result(
217                subtest.get("subtest", None),
218                subtest.get("status", None),
219                subtest.get("expected", None),
220                subtest.get("message", None),
221                stack,
222            )
223
224        def make_subtests_failure(test_name, subtests, stack=None):
225            lines = [u"Unexpected subtest result in %s:" % test_name]
226            for subtest in subtests[:-1]:
227                add_subtest_failure(lines, subtest, None)
228            add_subtest_failure(lines, subtests[-1], stack)
229            return self.wrap_and_indent_lines(lines, "  ") + "\n"
230
231        # Organize the failures by stack trace so we don't print the same stack trace
232        # more than once. They are really tall and we don't want to flood the screen
233        # with duplicate information.
234        output = ""
235        failures_by_stack = collections.defaultdict(list)
236        for failure in unexpected_subtests:
237            # Print stackless results first. They are all separate.
238            if "stack" not in failure:
239                output += make_subtests_failure(test_name, [failure], None)
240            else:
241                failures_by_stack[failure["stack"]].append(failure)
242
243        for (stack, failures) in six.iteritems(failures_by_stack):
244            output += make_subtests_failure(test_name, failures, stack)
245        return output
246
247    def test_end(self, data):
248        self.completed_tests += 1
249        test_status = data["status"]
250        test_name = data["test"]
251        known_intermittent_statuses = data.get("known_intermittent", [])
252        subtest_failures = self.subtest_failures.pop(test_name, [])
253        if "expected" in data and test_status not in known_intermittent_statuses:
254            had_unexpected_test_result = True
255        else:
256            had_unexpected_test_result = False
257
258        del self.running_tests[data["thread"]]
259        new_display = self.build_status_line()
260
261        if not had_unexpected_test_result and not subtest_failures:
262            self.expected[test_status] += 1
263            if self.interactive:
264                return self.generate_output(text=None, new_display=new_display)
265            else:
266                return self.generate_output(
267                    text="  %s\n" % test_name, new_display=new_display
268                )
269
270        if test_status in known_intermittent_statuses:
271            self.known_intermittent_results[(test_name, None)] = data
272
273        # If the test crashed or timed out, we also include any process output,
274        # because there is a good chance that the test produced a stack trace
275        # or other error messages.
276        if test_status in ("CRASH", "TIMEOUT"):
277            stack = self.test_output[test_name] + data.get("stack", "")
278        else:
279            stack = data.get("stack", None)
280
281        output = ""
282        if had_unexpected_test_result:
283            self.unexpected_tests[test_status].append(data)
284            lines = self.get_lines_for_unexpected_result(
285                test_name,
286                test_status,
287                data.get("expected", None),
288                data.get("message", None),
289                stack,
290            )
291            output += self.wrap_and_indent_lines(lines, "  ") + "\n"
292
293        if subtest_failures:
294            self.tests_with_failing_subtests.append(test_name)
295            output += self.get_output_for_unexpected_subtests(
296                test_name, subtest_failures
297            )
298        self.test_failure_text += output
299
300        return self.generate_output(text=output, new_display=new_display)
301
302    def test_status(self, data):
303        if "expected" in data and data["status"] not in data.get(
304            "known_intermittent", []
305        ):
306            self.subtest_failures[data["test"]].append(data)
307        elif data["status"] in data.get("known_intermittent", []):
308            self.known_intermittent_results[(data["test"], data["subtest"])] = data
309
310    def suite_end(self, data):
311        self.end_time = data["time"]
312
313        if not self.interactive:
314            output = u"\n"
315        else:
316            output = ""
317
318        output += u"Ran %i tests finished in %.1f seconds.\n" % (
319            self.completed_tests,
320            (self.end_time - self.start_time) / 1000.0,
321        )
322        output += u"  \u2022 %i ran as expected. %i tests skipped.\n" % (
323            sum(self.expected.values()),
324            self.expected["SKIP"],
325        )
326        if self.known_intermittent_results:
327            output += u"  \u2022 %i known intermittent results.\n" % (
328                len(self.known_intermittent_results)
329            )
330
331        def text_for_unexpected_list(text, section):
332            tests = self.unexpected_tests[section]
333            if not tests:
334                return u""
335            return u"  \u2022 %i tests %s\n" % (len(tests), text)
336
337        output += text_for_unexpected_list(u"crashed unexpectedly", "CRASH")
338        output += text_for_unexpected_list(u"had errors unexpectedly", "ERROR")
339        output += text_for_unexpected_list(u"failed unexpectedly", "FAIL")
340        output += text_for_unexpected_list(
341            u"precondition failed unexpectedly", "PRECONDITION_FAILED"
342        )
343        output += text_for_unexpected_list(u"timed out unexpectedly", "TIMEOUT")
344        output += text_for_unexpected_list(u"passed unexpectedly", "PASS")
345        output += text_for_unexpected_list(u"unexpectedly okay", "OK")
346
347        num_with_failing_subtests = len(self.tests_with_failing_subtests)
348        if num_with_failing_subtests:
349            output += (
350                u"  \u2022 %i tests had unexpected subtest results\n"
351                % num_with_failing_subtests
352            )
353        output += "\n"
354
355        # Repeat failing test output, so that it is easier to find, since the
356        # non-interactive version prints all the test names.
357        if not self.interactive and self.test_failure_text:
358            output += u"Tests with unexpected results:\n" + self.test_failure_text
359
360        if self.known_intermittent_results:
361            results = self.get_lines_for_known_intermittents(
362                self.known_intermittent_results
363            )
364            output += u"Tests with known intermittent results:\n" + results
365
366        return self.generate_output(text=output, new_display="")
367
368    def process_output(self, data):
369        if data["thread"] not in self.running_tests:
370            return
371        test_name = self.running_tests[data["thread"]]
372        self.test_output[test_name] += data["data"] + "\n"
373
374    def log(self, data):
375        if data.get("component"):
376            message = "%s %s %s" % (data["component"], data["level"], data["message"])
377        else:
378            message = "%s %s" % (data["level"], data["message"])
379        if "stack" in data:
380            message += "\n%s" % data["stack"]
381
382        # We are logging messages that begin with STDERR, because that is how exceptions
383        # in this formatter are indicated.
384        if data["message"].startswith("STDERR"):
385            return self.generate_output(text=message + "\n")
386
387        if data["level"] in ("CRITICAL", "ERROR"):
388            return self.generate_output(text=message + "\n")
389        # Show all messages if show_logs switched on.
390        if self.show_logs:
391            return self.generate_output(text=message + "\n")
392