1import datetime
2import faulthandler
3import locale
4import os
5import platform
6import random
7import re
8import sys
9import sysconfig
10import tempfile
11import time
12import unittest
13from test.libregrtest.cmdline import _parse_args
14from test.libregrtest.runtest import (
15    findtests, runtest, get_abs_module,
16    STDTESTS, NOTTESTS, PASSED, FAILED, ENV_CHANGED, SKIPPED, RESOURCE_DENIED,
17    INTERRUPTED, CHILD_ERROR, TEST_DID_NOT_RUN, TIMEOUT,
18    PROGRESS_MIN_TIME, format_test_result, is_failed)
19from test.libregrtest.setup import setup_tests
20from test.libregrtest.pgo import setup_pgo_tests
21from test.libregrtest.utils import removepy, count, format_duration, printlist
22from test import support
23
24
25# bpo-38203: Maximum delay in seconds to exit Python (call Py_Finalize()).
26# Used to protect against threading._shutdown() hang.
27# Must be smaller than buildbot "1200 seconds without output" limit.
28EXIT_TIMEOUT = 120.0
29
30
31class Regrtest:
32    """Execute a test suite.
33
34    This also parses command-line options and modifies its behavior
35    accordingly.
36
37    tests -- a list of strings containing test names (optional)
38    testdir -- the directory in which to look for tests (optional)
39
40    Users other than the Python test suite will certainly want to
41    specify testdir; if it's omitted, the directory containing the
42    Python test suite is searched for.
43
44    If the tests argument is omitted, the tests listed on the
45    command-line will be used.  If that's empty, too, then all *.py
46    files beginning with test_ will be used.
47
48    The other default arguments (verbose, quiet, exclude,
49    single, randomize, findleaks, use_resources, trace, coverdir,
50    print_slow, and random_seed) allow programmers calling main()
51    directly to set the values that would normally be set by flags
52    on the command line.
53    """
54    def __init__(self):
55        # Namespace of command line options
56        self.ns = None
57
58        # tests
59        self.tests = []
60        self.selected = []
61
62        # test results
63        self.good = []
64        self.bad = []
65        self.skipped = []
66        self.resource_denieds = []
67        self.environment_changed = []
68        self.run_no_tests = []
69        self.rerun = []
70        self.first_result = None
71        self.interrupted = False
72
73        # used by --slow
74        self.test_times = []
75
76        # used by --coverage, trace.Trace instance
77        self.tracer = None
78
79        # used to display the progress bar "[ 3/100]"
80        self.start_time = time.monotonic()
81        self.test_count = ''
82        self.test_count_width = 1
83
84        # used by --single
85        self.next_single_test = None
86        self.next_single_filename = None
87
88        # used by --junit-xml
89        self.testsuite_xml = None
90
91        # misc
92        self.win_load_tracker = None
93        self.tmp_dir = None
94        self.worker_test_name = None
95
96    def get_executed(self):
97        return (set(self.good) | set(self.bad) | set(self.skipped)
98                | set(self.resource_denieds) | set(self.environment_changed)
99                | set(self.run_no_tests))
100
101    def accumulate_result(self, result, rerun=False):
102        test_name = result.test_name
103        ok = result.result
104
105        if ok not in (CHILD_ERROR, INTERRUPTED) and not rerun:
106            self.test_times.append((result.test_time, test_name))
107
108        if ok == PASSED:
109            self.good.append(test_name)
110        elif ok in (FAILED, CHILD_ERROR):
111            if not rerun:
112                self.bad.append(test_name)
113        elif ok == ENV_CHANGED:
114            self.environment_changed.append(test_name)
115        elif ok == SKIPPED:
116            self.skipped.append(test_name)
117        elif ok == RESOURCE_DENIED:
118            self.skipped.append(test_name)
119            self.resource_denieds.append(test_name)
120        elif ok == TEST_DID_NOT_RUN:
121            self.run_no_tests.append(test_name)
122        elif ok == INTERRUPTED:
123            self.interrupted = True
124        elif ok == TIMEOUT:
125            self.bad.append(test_name)
126        else:
127            raise ValueError("invalid test result: %r" % ok)
128
129        if rerun and ok not in {FAILED, CHILD_ERROR, INTERRUPTED}:
130            self.bad.remove(test_name)
131
132        xml_data = result.xml_data
133        if xml_data:
134            import xml.etree.ElementTree as ET
135            for e in xml_data:
136                try:
137                    self.testsuite_xml.append(ET.fromstring(e))
138                except ET.ParseError:
139                    print(xml_data, file=sys.__stderr__)
140                    raise
141
142    def log(self, line=''):
143        empty = not line
144
145        # add the system load prefix: "load avg: 1.80 "
146        load_avg = self.getloadavg()
147        if load_avg is not None:
148            line = f"load avg: {load_avg:.2f} {line}"
149
150        # add the timestamp prefix:  "0:01:05 "
151        test_time = time.monotonic() - self.start_time
152        test_time = datetime.timedelta(seconds=int(test_time))
153        line = f"{test_time} {line}"
154
155        if empty:
156            line = line[:-1]
157
158        print(line, flush=True)
159
160    def display_progress(self, test_index, text):
161        if self.ns.quiet:
162            return
163
164        # "[ 51/405/1] test_tcl passed"
165        line = f"{test_index:{self.test_count_width}}{self.test_count}"
166        fails = len(self.bad) + len(self.environment_changed)
167        if fails and not self.ns.pgo:
168            line = f"{line}/{fails}"
169        self.log(f"[{line}] {text}")
170
171    def parse_args(self, kwargs):
172        ns = _parse_args(sys.argv[1:], **kwargs)
173
174        if ns.xmlpath:
175            support.junit_xml_list = self.testsuite_xml = []
176
177        worker_args = ns.worker_args
178        if worker_args is not None:
179            from test.libregrtest.runtest_mp import parse_worker_args
180            ns, test_name = parse_worker_args(ns.worker_args)
181            ns.worker_args = worker_args
182            self.worker_test_name = test_name
183
184        # Strip .py extensions.
185        removepy(ns.args)
186
187        if ns.huntrleaks:
188            warmup, repetitions, _ = ns.huntrleaks
189            if warmup < 1 or repetitions < 1:
190                msg = ("Invalid values for the --huntrleaks/-R parameters. The "
191                       "number of warmups and repetitions must be at least 1 "
192                       "each (1:1).")
193                print(msg, file=sys.stderr, flush=True)
194                sys.exit(2)
195
196        if ns.tempdir:
197            ns.tempdir = os.path.expanduser(ns.tempdir)
198
199        self.ns = ns
200
201    def find_tests(self, tests):
202        self.tests = tests
203
204        if self.ns.single:
205            self.next_single_filename = os.path.join(self.tmp_dir, 'pynexttest')
206            try:
207                with open(self.next_single_filename, 'r') as fp:
208                    next_test = fp.read().strip()
209                    self.tests = [next_test]
210            except OSError:
211                pass
212
213        if self.ns.fromfile:
214            self.tests = []
215            # regex to match 'test_builtin' in line:
216            # '0:00:00 [  4/400] test_builtin -- test_dict took 1 sec'
217            regex = re.compile(r'\btest_[a-zA-Z0-9_]+\b')
218            with open(os.path.join(support.SAVEDCWD, self.ns.fromfile)) as fp:
219                for line in fp:
220                    line = line.split('#', 1)[0]
221                    line = line.strip()
222                    match = regex.search(line)
223                    if match is not None:
224                        self.tests.append(match.group())
225
226        removepy(self.tests)
227
228        if self.ns.pgo:
229            # add default PGO tests if no tests are specified
230            setup_pgo_tests(self.ns)
231
232        stdtests = STDTESTS[:]
233        nottests = NOTTESTS.copy()
234        if self.ns.exclude:
235            for arg in self.ns.args:
236                if arg in stdtests:
237                    stdtests.remove(arg)
238                nottests.add(arg)
239            self.ns.args = []
240
241        # if testdir is set, then we are not running the python tests suite, so
242        # don't add default tests to be executed or skipped (pass empty values)
243        if self.ns.testdir:
244            alltests = findtests(self.ns.testdir, list(), set())
245        else:
246            alltests = findtests(self.ns.testdir, stdtests, nottests)
247
248        if not self.ns.fromfile:
249            self.selected = self.tests or self.ns.args or alltests
250        else:
251            self.selected = self.tests
252        if self.ns.single:
253            self.selected = self.selected[:1]
254            try:
255                pos = alltests.index(self.selected[0])
256                self.next_single_test = alltests[pos + 1]
257            except IndexError:
258                pass
259
260        # Remove all the selected tests that precede start if it's set.
261        if self.ns.start:
262            try:
263                del self.selected[:self.selected.index(self.ns.start)]
264            except ValueError:
265                print("Couldn't find starting test (%s), using all tests"
266                      % self.ns.start, file=sys.stderr)
267
268        if self.ns.randomize:
269            if self.ns.random_seed is None:
270                self.ns.random_seed = random.randrange(10000000)
271            random.seed(self.ns.random_seed)
272            random.shuffle(self.selected)
273
274    def list_tests(self):
275        for name in self.selected:
276            print(name)
277
278    def _list_cases(self, suite):
279        for test in suite:
280            if isinstance(test, unittest.loader._FailedTest):
281                continue
282            if isinstance(test, unittest.TestSuite):
283                self._list_cases(test)
284            elif isinstance(test, unittest.TestCase):
285                if support.match_test(test):
286                    print(test.id())
287
288    def list_cases(self):
289        support.verbose = False
290        support.set_match_tests(self.ns.match_tests, self.ns.ignore_tests)
291
292        for test_name in self.selected:
293            abstest = get_abs_module(self.ns, test_name)
294            try:
295                suite = unittest.defaultTestLoader.loadTestsFromName(abstest)
296                self._list_cases(suite)
297            except unittest.SkipTest:
298                self.skipped.append(test_name)
299
300        if self.skipped:
301            print(file=sys.stderr)
302            print(count(len(self.skipped), "test"), "skipped:", file=sys.stderr)
303            printlist(self.skipped, file=sys.stderr)
304
305    def rerun_failed_tests(self):
306        self.ns.verbose = True
307        self.ns.failfast = False
308        self.ns.verbose3 = False
309
310        self.first_result = self.get_tests_result()
311
312        self.log()
313        self.log("Re-running failed tests in verbose mode")
314        self.rerun = self.bad[:]
315        for test_name in self.rerun:
316            self.log(f"Re-running {test_name} in verbose mode")
317            self.ns.verbose = True
318            result = runtest(self.ns, test_name)
319
320            self.accumulate_result(result, rerun=True)
321
322            if result.result == INTERRUPTED:
323                break
324
325        if self.bad:
326            print(count(len(self.bad), 'test'), "failed again:")
327            printlist(self.bad)
328
329        self.display_result()
330
331    def display_result(self):
332        # If running the test suite for PGO then no one cares about results.
333        if self.ns.pgo:
334            return
335
336        print()
337        print("== Tests result: %s ==" % self.get_tests_result())
338
339        if self.interrupted:
340            print("Test suite interrupted by signal SIGINT.")
341
342        omitted = set(self.selected) - self.get_executed()
343        if omitted:
344            print()
345            print(count(len(omitted), "test"), "omitted:")
346            printlist(omitted)
347
348        if self.good and not self.ns.quiet:
349            print()
350            if (not self.bad
351                and not self.skipped
352                and not self.interrupted
353                and len(self.good) > 1):
354                print("All", end=' ')
355            print(count(len(self.good), "test"), "OK.")
356
357        if self.ns.print_slow:
358            self.test_times.sort(reverse=True)
359            print()
360            print("10 slowest tests:")
361            for test_time, test in self.test_times[:10]:
362                print("- %s: %s" % (test, format_duration(test_time)))
363
364        if self.bad:
365            print()
366            print(count(len(self.bad), "test"), "failed:")
367            printlist(self.bad)
368
369        if self.environment_changed:
370            print()
371            print("{} altered the execution environment:".format(
372                     count(len(self.environment_changed), "test")))
373            printlist(self.environment_changed)
374
375        if self.skipped and not self.ns.quiet:
376            print()
377            print(count(len(self.skipped), "test"), "skipped:")
378            printlist(self.skipped)
379
380        if self.rerun:
381            print()
382            print("%s:" % count(len(self.rerun), "re-run test"))
383            printlist(self.rerun)
384
385        if self.run_no_tests:
386            print()
387            print(count(len(self.run_no_tests), "test"), "run no tests:")
388            printlist(self.run_no_tests)
389
390    def run_tests_sequential(self):
391        if self.ns.trace:
392            import trace
393            self.tracer = trace.Trace(trace=False, count=True)
394
395        save_modules = sys.modules.keys()
396
397        msg = "Run tests sequentially"
398        if self.ns.timeout:
399            msg += " (timeout: %s)" % format_duration(self.ns.timeout)
400        self.log(msg)
401
402        previous_test = None
403        for test_index, test_name in enumerate(self.tests, 1):
404            start_time = time.monotonic()
405
406            text = test_name
407            if previous_test:
408                text = '%s -- %s' % (text, previous_test)
409            self.display_progress(test_index, text)
410
411            if self.tracer:
412                # If we're tracing code coverage, then we don't exit with status
413                # if on a false return value from main.
414                cmd = ('result = runtest(self.ns, test_name); '
415                       'self.accumulate_result(result)')
416                ns = dict(locals())
417                self.tracer.runctx(cmd, globals=globals(), locals=ns)
418                result = ns['result']
419            else:
420                result = runtest(self.ns, test_name)
421                self.accumulate_result(result)
422
423            if result.result == INTERRUPTED:
424                break
425
426            previous_test = format_test_result(result)
427            test_time = time.monotonic() - start_time
428            if test_time >= PROGRESS_MIN_TIME:
429                previous_test = "%s in %s" % (previous_test, format_duration(test_time))
430            elif result.result == PASSED:
431                # be quiet: say nothing if the test passed shortly
432                previous_test = None
433
434            # Unload the newly imported modules (best effort finalization)
435            for module in sys.modules.keys():
436                if module not in save_modules and module.startswith("test."):
437                    support.unload(module)
438
439            if self.ns.failfast and is_failed(result, self.ns):
440                break
441
442        if previous_test:
443            print(previous_test)
444
445    def _test_forever(self, tests):
446        while True:
447            for test_name in tests:
448                yield test_name
449                if self.bad:
450                    return
451                if self.ns.fail_env_changed and self.environment_changed:
452                    return
453
454    def display_header(self):
455        # Print basic platform information
456        print("==", platform.python_implementation(), *sys.version.split())
457        print("==", platform.platform(aliased=True),
458                      "%s-endian" % sys.byteorder)
459        print("== cwd:", os.getcwd())
460        cpu_count = os.cpu_count()
461        if cpu_count:
462            print("== CPU count:", cpu_count)
463        print("== encodings: locale=%s, FS=%s"
464              % (locale.getpreferredencoding(False),
465                 sys.getfilesystemencoding()))
466
467    def get_tests_result(self):
468        result = []
469        if self.bad:
470            result.append("FAILURE")
471        elif self.ns.fail_env_changed and self.environment_changed:
472            result.append("ENV CHANGED")
473        elif not any((self.good, self.bad, self.skipped, self.interrupted,
474            self.environment_changed)):
475            result.append("NO TEST RUN")
476
477        if self.interrupted:
478            result.append("INTERRUPTED")
479
480        if not result:
481            result.append("SUCCESS")
482
483        result = ', '.join(result)
484        if self.first_result:
485            result = '%s then %s' % (self.first_result, result)
486        return result
487
488    def run_tests(self):
489        # For a partial run, we do not need to clutter the output.
490        if (self.ns.header
491            or not(self.ns.pgo or self.ns.quiet or self.ns.single
492                   or self.tests or self.ns.args)):
493            self.display_header()
494
495        if self.ns.huntrleaks:
496            warmup, repetitions, _ = self.ns.huntrleaks
497            if warmup < 3:
498                msg = ("WARNING: Running tests with --huntrleaks/-R and less than "
499                        "3 warmup repetitions can give false positives!")
500                print(msg, file=sys.stdout, flush=True)
501
502        if self.ns.randomize:
503            print("Using random seed", self.ns.random_seed)
504
505        if self.ns.forever:
506            self.tests = self._test_forever(list(self.selected))
507            self.test_count = ''
508            self.test_count_width = 3
509        else:
510            self.tests = iter(self.selected)
511            self.test_count = '/{}'.format(len(self.selected))
512            self.test_count_width = len(self.test_count) - 1
513
514        if self.ns.use_mp:
515            from test.libregrtest.runtest_mp import run_tests_multiprocess
516            run_tests_multiprocess(self)
517        else:
518            self.run_tests_sequential()
519
520    def finalize(self):
521        if self.next_single_filename:
522            if self.next_single_test:
523                with open(self.next_single_filename, 'w') as fp:
524                    fp.write(self.next_single_test + '\n')
525            else:
526                os.unlink(self.next_single_filename)
527
528        if self.tracer:
529            r = self.tracer.results()
530            r.write_results(show_missing=True, summary=True,
531                            coverdir=self.ns.coverdir)
532
533        print()
534        duration = time.monotonic() - self.start_time
535        print("Total duration: %s" % format_duration(duration))
536        print("Tests result: %s" % self.get_tests_result())
537
538        if self.ns.runleaks:
539            os.system("leaks %d" % os.getpid())
540
541    def save_xml_result(self):
542        if not self.ns.xmlpath and not self.testsuite_xml:
543            return
544
545        import xml.etree.ElementTree as ET
546        root = ET.Element("testsuites")
547
548        # Manually count the totals for the overall summary
549        totals = {'tests': 0, 'errors': 0, 'failures': 0}
550        for suite in self.testsuite_xml:
551            root.append(suite)
552            for k in totals:
553                try:
554                    totals[k] += int(suite.get(k, 0))
555                except ValueError:
556                    pass
557
558        for k, v in totals.items():
559            root.set(k, str(v))
560
561        xmlpath = os.path.join(support.SAVEDCWD, self.ns.xmlpath)
562        with open(xmlpath, 'wb') as f:
563            for s in ET.tostringlist(root):
564                f.write(s)
565
566    def set_temp_dir(self):
567        if self.ns.tempdir:
568            self.tmp_dir = self.ns.tempdir
569
570        if not self.tmp_dir:
571            # When tests are run from the Python build directory, it is best practice
572            # to keep the test files in a subfolder.  This eases the cleanup of leftover
573            # files using the "make distclean" command.
574            if sysconfig.is_python_build():
575                self.tmp_dir = sysconfig.get_config_var('abs_builddir')
576                if self.tmp_dir is None:
577                    # bpo-30284: On Windows, only srcdir is available. Using
578                    # abs_builddir mostly matters on UNIX when building Python
579                    # out of the source tree, especially when the source tree
580                    # is read only.
581                    self.tmp_dir = sysconfig.get_config_var('srcdir')
582                self.tmp_dir = os.path.join(self.tmp_dir, 'build')
583            else:
584                self.tmp_dir = tempfile.gettempdir()
585
586        self.tmp_dir = os.path.abspath(self.tmp_dir)
587
588    def create_temp_dir(self):
589        os.makedirs(self.tmp_dir, exist_ok=True)
590
591        # Define a writable temp dir that will be used as cwd while running
592        # the tests. The name of the dir includes the pid to allow parallel
593        # testing (see the -j option).
594        pid = os.getpid()
595        if self.worker_test_name is not None:
596            test_cwd = 'test_python_worker_{}'.format(pid)
597        else:
598            test_cwd = 'test_python_{}'.format(pid)
599        test_cwd = os.path.join(self.tmp_dir, test_cwd)
600        return test_cwd
601
602    def cleanup(self):
603        import glob
604
605        path = os.path.join(glob.escape(self.tmp_dir), 'test_python_*')
606        print("Cleanup %s directory" % self.tmp_dir)
607        for name in glob.glob(path):
608            if os.path.isdir(name):
609                print("Remove directory: %s" % name)
610                support.rmtree(name)
611            else:
612                print("Remove file: %s" % name)
613                support.unlink(name)
614
615    def main(self, tests=None, **kwargs):
616        self.parse_args(kwargs)
617
618        self.set_temp_dir()
619
620        if self.ns.cleanup:
621            self.cleanup()
622            sys.exit(0)
623
624        test_cwd = self.create_temp_dir()
625
626        try:
627            # Run the tests in a context manager that temporarily changes the CWD
628            # to a temporary and writable directory. If it's not possible to
629            # create or change the CWD, the original CWD will be used.
630            # The original CWD is available from support.SAVEDCWD.
631            with support.temp_cwd(test_cwd, quiet=True):
632                # When using multiprocessing, worker processes will use test_cwd
633                # as their parent temporary directory. So when the main process
634                # exit, it removes also subdirectories of worker processes.
635                self.ns.tempdir = test_cwd
636
637                self._main(tests, kwargs)
638        except SystemExit as exc:
639            # bpo-38203: Python can hang at exit in Py_Finalize(), especially
640            # on threading._shutdown() call: put a timeout
641            faulthandler.dump_traceback_later(EXIT_TIMEOUT, exit=True)
642
643            sys.exit(exc.code)
644
645    def getloadavg(self):
646        if self.win_load_tracker is not None:
647            return self.win_load_tracker.getloadavg()
648
649        if hasattr(os, 'getloadavg'):
650            return os.getloadavg()[0]
651
652        return None
653
654    def _main(self, tests, kwargs):
655        if self.worker_test_name is not None:
656            from test.libregrtest.runtest_mp import run_tests_worker
657            run_tests_worker(self.ns, self.worker_test_name)
658
659        if self.ns.wait:
660            input("Press any key to continue...")
661
662        support.PGO = self.ns.pgo
663        support.PGO_EXTENDED = self.ns.pgo_extended
664
665        setup_tests(self.ns)
666
667        self.find_tests(tests)
668
669        if self.ns.list_tests:
670            self.list_tests()
671            sys.exit(0)
672
673        if self.ns.list_cases:
674            self.list_cases()
675            sys.exit(0)
676
677        # If we're on windows and this is the parent runner (not a worker),
678        # track the load average.
679        if sys.platform == 'win32' and self.worker_test_name is None:
680            from test.libregrtest.win_utils import WindowsLoadTracker
681
682            try:
683                self.win_load_tracker = WindowsLoadTracker()
684            except FileNotFoundError as error:
685                # Windows IoT Core and Windows Nano Server do not provide
686                # typeperf.exe for x64, x86 or ARM
687                print(f'Failed to create WindowsLoadTracker: {error}')
688
689        try:
690            self.run_tests()
691            self.display_result()
692
693            if self.ns.verbose2 and self.bad:
694                self.rerun_failed_tests()
695        finally:
696            if self.win_load_tracker is not None:
697                self.win_load_tracker.close()
698                self.win_load_tracker = None
699
700        self.finalize()
701
702        self.save_xml_result()
703
704        if self.bad:
705            sys.exit(2)
706        if self.interrupted:
707            sys.exit(130)
708        if self.ns.fail_env_changed and self.environment_changed:
709            sys.exit(3)
710        sys.exit(0)
711
712
713def main(tests=None, **kwargs):
714    """Run the Python suite."""
715    Regrtest().main(tests=tests, **kwargs)
716