1import faulthandler
2import functools
3import gc
4import importlib
5import io
6import os
7import sys
8import time
9import traceback
10import unittest
11
12from test import support
13from test.support import os_helper
14from test.libregrtest.cmdline import Namespace
15from test.libregrtest.save_env import saved_test_environment
16from test.libregrtest.utils import clear_caches, format_duration, print_warning
17
18
19class TestResult:
20    def __init__(
21        self,
22        name: str,
23        duration_sec: float = 0.0,
24        xml_data: list[str] | None = None,
25    ) -> None:
26        self.name = name
27        self.duration_sec = duration_sec
28        self.xml_data = xml_data
29
30    def __str__(self) -> str:
31        return f"{self.name} finished"
32
33
34class Passed(TestResult):
35    def __str__(self) -> str:
36        return f"{self.name} passed"
37
38
39class Failed(TestResult):
40    def __init__(
41        self,
42        name: str,
43        duration_sec: float = 0.0,
44        xml_data: list[str] | None = None,
45        errors: list[tuple[str, str]] | None = None,
46        failures: list[tuple[str, str]] | None = None,
47    ) -> None:
48        super().__init__(name, duration_sec=duration_sec, xml_data=xml_data)
49        self.errors = errors
50        self.failures = failures
51
52    def __str__(self) -> str:
53        if self.errors and self.failures:
54            le = len(self.errors)
55            lf = len(self.failures)
56            error_s = "error" + ("s" if le > 1 else "")
57            failure_s = "failure" + ("s" if lf > 1 else "")
58            return f"{self.name} failed ({le} {error_s}, {lf} {failure_s})"
59
60        if self.errors:
61            le = len(self.errors)
62            error_s = "error" + ("s" if le > 1 else "")
63            return f"{self.name} failed ({le} {error_s})"
64
65        if self.failures:
66            lf = len(self.failures)
67            failure_s = "failure" + ("s" if lf > 1 else "")
68            return f"{self.name} failed ({lf} {failure_s})"
69
70        return f"{self.name} failed"
71
72
73class UncaughtException(Failed):
74    def __str__(self) -> str:
75        return f"{self.name} failed (uncaught exception)"
76
77
78class EnvChanged(Failed):
79    def __str__(self) -> str:
80        return f"{self.name} failed (env changed)"
81
82
83class RefLeak(Failed):
84    def __str__(self) -> str:
85        return f"{self.name} failed (reference leak)"
86
87
88class Skipped(TestResult):
89    def __str__(self) -> str:
90        return f"{self.name} skipped"
91
92
93class ResourceDenied(Skipped):
94    def __str__(self) -> str:
95        return f"{self.name} skipped (resource denied)"
96
97
98class Interrupted(TestResult):
99    def __str__(self) -> str:
100        return f"{self.name} interrupted"
101
102
103class ChildError(Failed):
104    def __str__(self) -> str:
105        return f"{self.name} crashed"
106
107
108class DidNotRun(TestResult):
109    def __str__(self) -> str:
110        return f"{self.name} ran no tests"
111
112
113class Timeout(Failed):
114    def __str__(self) -> str:
115        return f"{self.name} timed out ({format_duration(self.duration_sec)})"
116
117
118# Minimum duration of a test to display its duration or to mention that
119# the test is running in background
120PROGRESS_MIN_TIME = 30.0   # seconds
121
122# small set of tests to determine if we have a basically functioning interpreter
123# (i.e. if any of these fail, then anything else is likely to follow)
124STDTESTS = [
125    'test_grammar',
126    'test_opcodes',
127    'test_dict',
128    'test_builtin',
129    'test_exceptions',
130    'test_types',
131    'test_unittest',
132    'test_doctest',
133    'test_doctest2',
134    'test_support'
135]
136
137# set of tests that we don't want to be executed when using regrtest
138NOTTESTS = set()
139
140
141# used by --findleaks, store for gc.garbage
142FOUND_GARBAGE = []
143
144
145def is_failed(result: TestResult, ns: Namespace) -> bool:
146    if isinstance(result, EnvChanged):
147        return ns.fail_env_changed
148    return isinstance(result, Failed)
149
150
151def findtestdir(path=None):
152    return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir
153
154
155def findtests(testdir=None, stdtests=STDTESTS, nottests=NOTTESTS):
156    """Return a list of all applicable test modules."""
157    testdir = findtestdir(testdir)
158    names = os.listdir(testdir)
159    tests = []
160    others = set(stdtests) | nottests
161    for name in names:
162        mod, ext = os.path.splitext(name)
163        if mod[:5] == "test_" and ext in (".py", "") and mod not in others:
164            tests.append(mod)
165    return stdtests + sorted(tests)
166
167
168def get_abs_module(ns: Namespace, test_name: str) -> str:
169    if test_name.startswith('test.') or ns.testdir:
170        return test_name
171    else:
172        # Import it from the test package
173        return 'test.' + test_name
174
175
176def _runtest(ns: Namespace, test_name: str) -> TestResult:
177    # Handle faulthandler timeout, capture stdout+stderr, XML serialization
178    # and measure time.
179
180    output_on_failure = ns.verbose3
181
182    use_timeout = (ns.timeout is not None)
183    if use_timeout:
184        faulthandler.dump_traceback_later(ns.timeout, exit=True)
185
186    start_time = time.perf_counter()
187    try:
188        support.set_match_tests(ns.match_tests, ns.ignore_tests)
189        support.junit_xml_list = xml_list = [] if ns.xmlpath else None
190        if ns.failfast:
191            support.failfast = True
192
193        if output_on_failure:
194            support.verbose = True
195
196            stream = io.StringIO()
197            orig_stdout = sys.stdout
198            orig_stderr = sys.stderr
199            try:
200                sys.stdout = stream
201                sys.stderr = stream
202                result = _runtest_inner(ns, test_name,
203                                        display_failure=False)
204                if not isinstance(result, Passed):
205                    output = stream.getvalue()
206                    orig_stderr.write(output)
207                    orig_stderr.flush()
208            finally:
209                sys.stdout = orig_stdout
210                sys.stderr = orig_stderr
211        else:
212            # Tell tests to be moderately quiet
213            support.verbose = ns.verbose
214
215            result = _runtest_inner(ns, test_name,
216                                    display_failure=not ns.verbose)
217
218        if xml_list:
219            import xml.etree.ElementTree as ET
220            result.xml_data = [
221                ET.tostring(x).decode('us-ascii')
222                for x in xml_list
223            ]
224
225        result.duration_sec = time.perf_counter() - start_time
226        return result
227    finally:
228        if use_timeout:
229            faulthandler.cancel_dump_traceback_later()
230        support.junit_xml_list = None
231
232
233def runtest(ns: Namespace, test_name: str) -> TestResult:
234    """Run a single test.
235
236    ns -- regrtest namespace of options
237    test_name -- the name of the test
238
239    Returns a TestResult sub-class depending on the kind of result received.
240
241    If ns.xmlpath is not None, xml_data is a list containing each
242    generated testsuite element.
243    """
244    try:
245        return _runtest(ns, test_name)
246    except:
247        if not ns.pgo:
248            msg = traceback.format_exc()
249            print(f"test {test_name} crashed -- {msg}",
250                  file=sys.stderr, flush=True)
251        return Failed(test_name)
252
253
254def _test_module(the_module):
255    loader = unittest.TestLoader()
256    tests = loader.loadTestsFromModule(the_module)
257    for error in loader.errors:
258        print(error, file=sys.stderr)
259    if loader.errors:
260        raise Exception("errors while loading tests")
261    support.run_unittest(tests)
262
263
264def save_env(ns: Namespace, test_name: str):
265    return saved_test_environment(test_name, ns.verbose, ns.quiet, pgo=ns.pgo)
266
267
268def _runtest_inner2(ns: Namespace, test_name: str) -> bool:
269    # Load the test function, run the test function, handle huntrleaks
270    # and findleaks to detect leaks
271
272    abstest = get_abs_module(ns, test_name)
273
274    # remove the module from sys.module to reload it if it was already imported
275    try:
276        del sys.modules[abstest]
277    except KeyError:
278        pass
279
280    the_module = importlib.import_module(abstest)
281
282    if ns.huntrleaks:
283        from test.libregrtest.refleak import dash_R
284
285    # If the test has a test_main, that will run the appropriate
286    # tests.  If not, use normal unittest test loading.
287    test_runner = getattr(the_module, "test_main", None)
288    if test_runner is None:
289        test_runner = functools.partial(_test_module, the_module)
290
291    try:
292        with save_env(ns, test_name):
293            if ns.huntrleaks:
294                # Return True if the test leaked references
295                refleak = dash_R(ns, test_name, test_runner)
296            else:
297                test_runner()
298                refleak = False
299    finally:
300        cleanup_test_droppings(test_name, ns.verbose)
301
302    support.gc_collect()
303
304    if gc.garbage:
305        support.environment_altered = True
306        print_warning(f"{test_name} created {len(gc.garbage)} "
307                      f"uncollectable object(s).")
308
309        # move the uncollectable objects somewhere,
310        # so we don't see them again
311        FOUND_GARBAGE.extend(gc.garbage)
312        gc.garbage.clear()
313
314    support.reap_children()
315
316    return refleak
317
318
319def _runtest_inner(
320    ns: Namespace, test_name: str, display_failure: bool = True
321) -> TestResult:
322    # Detect environment changes, handle exceptions.
323
324    # Reset the environment_altered flag to detect if a test altered
325    # the environment
326    support.environment_altered = False
327
328    if ns.pgo:
329        display_failure = False
330
331    try:
332        clear_caches()
333
334        with save_env(ns, test_name):
335            refleak = _runtest_inner2(ns, test_name)
336    except support.ResourceDenied as msg:
337        if not ns.quiet and not ns.pgo:
338            print(f"{test_name} skipped -- {msg}", flush=True)
339        return ResourceDenied(test_name)
340    except unittest.SkipTest as msg:
341        if not ns.quiet and not ns.pgo:
342            print(f"{test_name} skipped -- {msg}", flush=True)
343        return Skipped(test_name)
344    except support.TestFailedWithDetails as exc:
345        msg = f"test {test_name} failed"
346        if display_failure:
347            msg = f"{msg} -- {exc}"
348        print(msg, file=sys.stderr, flush=True)
349        return Failed(test_name, errors=exc.errors, failures=exc.failures)
350    except support.TestFailed as exc:
351        msg = f"test {test_name} failed"
352        if display_failure:
353            msg = f"{msg} -- {exc}"
354        print(msg, file=sys.stderr, flush=True)
355        return Failed(test_name)
356    except support.TestDidNotRun:
357        return DidNotRun(test_name)
358    except KeyboardInterrupt:
359        print()
360        return Interrupted(test_name)
361    except:
362        if not ns.pgo:
363            msg = traceback.format_exc()
364            print(f"test {test_name} crashed -- {msg}",
365                  file=sys.stderr, flush=True)
366        return UncaughtException(test_name)
367
368    if refleak:
369        return RefLeak(test_name)
370    if support.environment_altered:
371        return EnvChanged(test_name)
372    return Passed(test_name)
373
374
375def cleanup_test_droppings(test_name: str, verbose: int) -> None:
376    # First kill any dangling references to open files etc.
377    # This can also issue some ResourceWarnings which would otherwise get
378    # triggered during the following test run, and possibly produce failures.
379    support.gc_collect()
380
381    # Try to clean up junk commonly left behind.  While tests shouldn't leave
382    # any files or directories behind, when a test fails that can be tedious
383    # for it to arrange.  The consequences can be especially nasty on Windows,
384    # since if a test leaves a file open, it cannot be deleted by name (while
385    # there's nothing we can do about that here either, we can display the
386    # name of the offending test, which is a real help).
387    for name in (os_helper.TESTFN,):
388        if not os.path.exists(name):
389            continue
390
391        if os.path.isdir(name):
392            import shutil
393            kind, nuker = "directory", shutil.rmtree
394        elif os.path.isfile(name):
395            kind, nuker = "file", os.unlink
396        else:
397            raise RuntimeError(f"os.path says {name!r} exists but is neither "
398                               f"directory nor file")
399
400        if verbose:
401            print_warning(f"{test_name} left behind {kind} {name!r}")
402            support.environment_altered = True
403
404        try:
405            import stat
406            # fix possible permissions problems that might prevent cleanup
407            os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
408            nuker(name)
409        except Exception as exc:
410            print_warning(f"{test_name} left behind {kind} {name!r} "
411                          f"and it couldn't be removed: {exc}")
412