1from __future__ import print_function
2
3import json
4import pipes
5import re
6
7from .progressbar import NullProgressBar, ProgressBar
8from .structuredlog import TestLogger
9
10# subprocess.list2cmdline does not properly escape for sh-like shells
11
12
13def escape_cmdline(args):
14    return " ".join([pipes.quote(a) for a in args])
15
16
17class TestOutput:
18    """Output from a test run."""
19
20    def __init__(self, test, cmd, out, err, rc, dt, timed_out, extra=None):
21        self.test = test  # Test
22        self.cmd = cmd  # str:   command line of test
23        self.out = out  # str:   stdout
24        self.err = err  # str:   stderr
25        self.rc = rc  # int:   return code
26        self.dt = dt  # float: run time
27        self.timed_out = timed_out  # bool: did the test time out
28        self.extra = extra  # includes the pid on some platforms
29
30    def describe_failure(self):
31        if self.timed_out:
32            return "Timeout"
33        lines = self.err.splitlines()
34        for line in lines:
35            # Skip the asm.js compilation success message.
36            if "Successfully compiled asm.js code" not in line:
37                return line
38        return "Unknown"
39
40
41class NullTestOutput:
42    """Variant of TestOutput that indicates a test was not run."""
43
44    def __init__(self, test):
45        self.test = test
46        self.cmd = ""
47        self.out = ""
48        self.err = ""
49        self.rc = 0
50        self.dt = 0.0
51        self.timed_out = False
52
53
54class TestResult:
55    PASS = "PASS"
56    FAIL = "FAIL"
57    CRASH = "CRASH"
58
59    """Classified result from a test run."""
60
61    def __init__(self, test, result, results, wpt_results=None):
62        self.test = test
63        self.result = result
64        self.results = results
65        self.wpt_results = wpt_results  # Only used for wpt tests.
66
67    @classmethod
68    def from_wpt_output(cls, output):
69        """Parse the output from a web-platform test that uses testharness.js.
70        (The output is written to stdout in js/src/tests/testharnessreport.js.)
71        """
72        from wptrunner.executors.base import testharness_result_converter
73
74        rc = output.rc
75        stdout = output.out.split("\n")
76        if rc != 0:
77            if rc == 3:
78                harness_status = "ERROR"
79                harness_message = "Exit code reported exception"
80            else:
81                harness_status = "CRASH"
82                harness_message = "Exit code reported crash"
83            tests = []
84        else:
85            for (idx, line) in enumerate(stdout):
86                if line.startswith("WPT OUTPUT: "):
87                    msg = line[len("WPT OUTPUT: ") :]
88                    data = [output.test.wpt.url] + json.loads(msg)
89                    harness_status_obj, tests = testharness_result_converter(
90                        output.test.wpt, data
91                    )
92                    harness_status = harness_status_obj.status
93                    harness_message = "Reported by harness: %s" % (
94                        harness_status_obj.message,
95                    )
96                    del stdout[idx]
97                    break
98            else:
99                harness_status = "ERROR"
100                harness_message = "No harness output found"
101                tests = []
102        stdout.append("Harness status: %s (%s)" % (harness_status, harness_message))
103
104        result = cls.PASS
105        results = []
106        subtests = []
107        expected_harness_status = output.test.wpt.expected()
108        if harness_status != expected_harness_status:
109            if harness_status == "CRASH":
110                result = cls.CRASH
111            else:
112                result = cls.FAIL
113        else:
114            for test in tests:
115                test_output = 'Subtest "%s": ' % (test.name,)
116                expected = output.test.wpt.expected(test.name)
117                if test.status == expected:
118                    test_result = (cls.PASS, "")
119                    test_output += "as expected: %s" % (test.status,)
120                else:
121                    test_result = (cls.FAIL, test.message)
122                    result = cls.FAIL
123                    test_output += "expected %s, found %s" % (expected, test.status)
124                    if test.message:
125                        test_output += ' (with message: "%s")' % (test.message,)
126                subtests.append(
127                    {
128                        "test": output.test.wpt.id,
129                        "subtest": test.name,
130                        "status": test.status,
131                        "expected": expected,
132                    }
133                )
134                results.append(test_result)
135                stdout.append(test_output)
136
137        output.out = "\n".join(stdout) + "\n"
138
139        wpt_results = {
140            "name": output.test.wpt.id,
141            "status": harness_status,
142            "expected": expected_harness_status,
143            "subtests": subtests,
144        }
145
146        return cls(output.test, result, results, wpt_results)
147
148    @classmethod
149    def from_output(cls, output):
150        test = output.test
151        result = None  # str:      overall result, see class-level variables
152        results = []  # (str,str) list: subtest results (pass/fail, message)
153
154        if test.wpt:
155            return cls.from_wpt_output(output)
156
157        out, err, rc = output.out, output.err, output.rc
158
159        failures = 0
160        passes = 0
161
162        expected_rcs = []
163        if test.path.endswith("-n.js"):
164            expected_rcs.append(3)
165
166        for line in out.split("\n"):
167            if line.startswith(" FAILED!"):
168                failures += 1
169                msg = line[len(" FAILED! ") :]
170                results.append((cls.FAIL, msg))
171            elif line.startswith(" PASSED!"):
172                passes += 1
173                msg = line[len(" PASSED! ") :]
174                results.append((cls.PASS, msg))
175            else:
176                m = re.match(
177                    "--- NOTE: IN THIS TESTCASE, WE EXPECT EXIT CODE"
178                    " ((?:-|\\d)+) ---",
179                    line,
180                )
181                if m:
182                    expected_rcs.append(int(m.group(1)))
183
184        if test.error is not None:
185            expected_rcs.append(3)
186            if test.error not in err:
187                failures += 1
188                results.append(
189                    (cls.FAIL, "Expected uncaught error: {}".format(test.error))
190                )
191
192        if rc and rc not in expected_rcs:
193            if rc == 3:
194                result = cls.FAIL
195            else:
196                result = cls.CRASH
197        else:
198            if (rc or passes > 0) and failures == 0:
199                result = cls.PASS
200            else:
201                result = cls.FAIL
202
203        return cls(test, result, results)
204
205
206class TestDuration:
207    def __init__(self, test, duration):
208        self.test = test
209        self.duration = duration
210
211
212class ResultsSink:
213    def __init__(self, testsuite, options, testcount):
214        self.options = options
215        self.fp = options.output_fp
216        if self.options.format == "automation":
217            self.slog = TestLogger(testsuite)
218            self.slog.suite_start()
219
220        self.wptreport = None
221        if self.options.wptreport:
222            try:
223                from .wptreport import WptreportHandler
224
225                self.wptreport = WptreportHandler(self.options.wptreport)
226                self.wptreport.suite_start()
227            except ImportError:
228                pass
229
230        self.groups = {}
231        self.output_dict = {}
232        self.counts = {"PASS": 0, "FAIL": 0, "TIMEOUT": 0, "SKIP": 0}
233        self.slow_tests = []
234        self.n = 0
235
236        if options.hide_progress:
237            self.pb = NullProgressBar()
238        else:
239            fmt = [
240                {"value": "PASS", "color": "green"},
241                {"value": "FAIL", "color": "red"},
242                {"value": "TIMEOUT", "color": "blue"},
243                {"value": "SKIP", "color": "brightgray"},
244            ]
245            self.pb = ProgressBar(testcount, fmt)
246
247    def push(self, output):
248        if self.options.show_slow and output.dt >= self.options.slow_test_threshold:
249            self.slow_tests.append(TestDuration(output.test, output.dt))
250        if output.timed_out:
251            self.counts["TIMEOUT"] += 1
252        if isinstance(output, NullTestOutput):
253            if self.options.format == "automation":
254                self.print_automation_result(
255                    "TEST-KNOWN-FAIL", output.test, time=output.dt, skip=True
256                )
257            self.counts["SKIP"] += 1
258            self.n += 1
259        else:
260            result = TestResult.from_output(output)
261
262            if self.wptreport is not None and result.wpt_results:
263                self.wptreport.test(result.wpt_results, output.dt)
264
265            tup = (result.result, result.test.expect, result.test.random)
266            dev_label = self.LABELS[tup][1]
267
268            if self.options.check_output:
269                if output.test.path in self.output_dict.keys():
270                    if self.output_dict[output.test.path] != output:
271                        self.counts["FAIL"] += 1
272                        self.print_automation_result(
273                            "TEST-UNEXPECTED-FAIL",
274                            result.test,
275                            time=output.dt,
276                            message="Same test with different flag producing different output",
277                        )
278                else:
279                    self.output_dict[output.test.path] = output
280
281            if output.timed_out:
282                dev_label = "TIMEOUTS"
283            self.groups.setdefault(dev_label, []).append(result)
284
285            if dev_label == "REGRESSIONS":
286                show_output = (
287                    self.options.show_output or not self.options.no_show_failed
288                )
289            elif dev_label == "TIMEOUTS":
290                show_output = self.options.show_output
291            else:
292                show_output = self.options.show_output and not self.options.failed_only
293
294            if dev_label in ("REGRESSIONS", "TIMEOUTS"):
295                show_cmd = self.options.show_cmd
296            else:
297                show_cmd = self.options.show_cmd and not self.options.failed_only
298
299            if show_output or show_cmd:
300                self.pb.beginline()
301
302                if show_output:
303                    print(
304                        "## {}: rc = {:d}, run time = {}".format(
305                            output.test.path, output.rc, output.dt
306                        ),
307                        file=self.fp,
308                    )
309
310                if show_cmd:
311                    print(escape_cmdline(output.cmd), file=self.fp)
312
313                if show_output:
314
315                    def write_with_fallback(fp, data):
316                        try:
317                            fp.write(data)
318                        except UnicodeEncodeError as e:
319                            # In case the data contains something not directly
320                            # encodable, use \uXXXX.
321                            fp.write(
322                                "WARNING: Falling back from exception: {}\n".format(e)
323                            )
324                            fp.write("WARNING: The following output is escaped, ")
325                            fp.write("and may be different than original one.\n")
326                            fp.write(
327                                data.encode("ascii", "namereplace").decode("ascii")
328                            )
329
330                    write_with_fallback(self.fp, output.out)
331                    write_with_fallback(self.fp, output.err)
332
333            self.n += 1
334
335            if result.result == TestResult.PASS and not result.test.random:
336                self.counts["PASS"] += 1
337            elif result.test.expect and not result.test.random:
338                self.counts["FAIL"] += 1
339            else:
340                self.counts["SKIP"] += 1
341
342            if self.options.format == "automation":
343                if result.result != TestResult.PASS and len(result.results) > 1:
344                    for sub_ok, msg in result.results:
345                        tup = (sub_ok, result.test.expect, result.test.random)
346                        label = self.LABELS[tup][0]
347                        if label == "TEST-UNEXPECTED-PASS":
348                            label = "TEST-PASS (EXPECTED RANDOM)"
349                        self.print_automation_result(
350                            label, result.test, time=output.dt, message=msg
351                        )
352                tup = (result.result, result.test.expect, result.test.random)
353                self.print_automation_result(
354                    self.LABELS[tup][0],
355                    result.test,
356                    time=output.dt,
357                    extra=getattr(output, "extra", None),
358                )
359                return
360
361            if dev_label:
362
363                def singular(label):
364                    return "FIXED" if label == "FIXES" else label[:-1]
365
366                self.pb.message("{} - {}".format(singular(dev_label), output.test.path))
367
368        self.pb.update(self.n, self.counts)
369
370    def finish(self, completed):
371        self.pb.finish(completed)
372        if self.options.format == "automation":
373            self.slog.suite_end()
374        else:
375            self.list(completed)
376
377        if self.wptreport is not None:
378            self.wptreport.suite_end()
379
380    # Conceptually, this maps (test result x test expectation) to text labels.
381    #      key   is (result, expect, random)
382    #      value is (automation label, dev test category)
383    LABELS = {
384        (TestResult.CRASH, False, False): ("TEST-UNEXPECTED-FAIL", "REGRESSIONS"),
385        (TestResult.CRASH, False, True): ("TEST-UNEXPECTED-FAIL", "REGRESSIONS"),
386        (TestResult.CRASH, True, False): ("TEST-UNEXPECTED-FAIL", "REGRESSIONS"),
387        (TestResult.CRASH, True, True): ("TEST-UNEXPECTED-FAIL", "REGRESSIONS"),
388        (TestResult.FAIL, False, False): ("TEST-KNOWN-FAIL", ""),
389        (TestResult.FAIL, False, True): ("TEST-KNOWN-FAIL (EXPECTED RANDOM)", ""),
390        (TestResult.FAIL, True, False): ("TEST-UNEXPECTED-FAIL", "REGRESSIONS"),
391        (TestResult.FAIL, True, True): ("TEST-KNOWN-FAIL (EXPECTED RANDOM)", ""),
392        (TestResult.PASS, False, False): ("TEST-UNEXPECTED-PASS", "FIXES"),
393        (TestResult.PASS, False, True): ("TEST-PASS (EXPECTED RANDOM)", ""),
394        (TestResult.PASS, True, False): ("TEST-PASS", ""),
395        (TestResult.PASS, True, True): ("TEST-PASS (EXPECTED RANDOM)", ""),
396    }
397
398    def list(self, completed):
399        for label, results in sorted(self.groups.items()):
400            if label == "":
401                continue
402
403            print(label)
404            for result in results:
405                print(
406                    "    {}".format(
407                        " ".join(
408                            result.test.jitflags
409                            + result.test.options
410                            + [result.test.path]
411                        )
412                    )
413                )
414
415        if self.options.failure_file:
416            failure_file = open(self.options.failure_file, "w")
417            if not self.all_passed():
418                if "REGRESSIONS" in self.groups:
419                    for result in self.groups["REGRESSIONS"]:
420                        print(result.test.path, file=failure_file)
421                if "TIMEOUTS" in self.groups:
422                    for result in self.groups["TIMEOUTS"]:
423                        print(result.test.path, file=failure_file)
424            failure_file.close()
425
426        suffix = "" if completed else " (partial run -- interrupted by user)"
427        if self.all_passed():
428            print("PASS" + suffix)
429        else:
430            print("FAIL" + suffix)
431
432        if self.options.show_slow:
433            min_duration = self.options.slow_test_threshold
434            print("Slow tests (duration > {}s)".format(min_duration))
435            slow_tests = sorted(self.slow_tests, key=lambda x: x.duration, reverse=True)
436            any = False
437            for test in slow_tests:
438                print("{:>5} {}".format(round(test.duration, 2), test.test))
439                any = True
440            if not any:
441                print("None")
442
443    def all_passed(self):
444        return "REGRESSIONS" not in self.groups and "TIMEOUTS" not in self.groups
445
446    def print_automation_result(
447        self, label, test, message=None, skip=False, time=None, extra=None
448    ):
449        result = label
450        result += " | " + test.path
451        args = []
452        if self.options.shell_args:
453            args.append(self.options.shell_args)
454        args += test.jitflags
455        result += ' | (args: "{}")'.format(" ".join(args))
456        if message:
457            result += " | " + message
458        if skip:
459            result += " | (SKIP)"
460        if time > self.options.timeout:
461            result += " | (TIMEOUT)"
462        result += " [{:.1f} s]".format(time)
463        print(result)
464
465        details = {"extra": extra.copy() if extra else {}}
466        if self.options.shell_args:
467            details["extra"]["shell_args"] = self.options.shell_args
468        details["extra"]["jitflags"] = test.jitflags
469        if message:
470            details["message"] = message
471        status = "FAIL" if "TEST-UNEXPECTED" in label else "PASS"
472
473        self.slog.test(test.path, status, time or 0, **details)
474