1import datetime
2import faulthandler
3import locale
4import os
5import platform
6import random
7import re
8import sys
9import sysconfig
10import tempfile
11import time
12import unittest
13from test.libregrtest.cmdline import _parse_args
14from test.libregrtest.runtest import (
15    findtests, runtest, get_abs_module,
16    STDTESTS, NOTTESTS, PASSED, FAILED, ENV_CHANGED, SKIPPED, RESOURCE_DENIED,
17    INTERRUPTED, CHILD_ERROR, TEST_DID_NOT_RUN, TIMEOUT,
18    PROGRESS_MIN_TIME, format_test_result, is_failed)
19from test.libregrtest.setup import setup_tests
20from test.libregrtest.utils import removepy, count, format_duration, printlist
21from test import support
22
23
24# bpo-38203: Maximum delay in seconds to exit Python (call Py_Finalize()).
25# Used to protect against threading._shutdown() hang.
26# Must be smaller than buildbot "1200 seconds without output" limit.
27EXIT_TIMEOUT = 120.0
28
29
30class Regrtest:
31    """Execute a test suite.
32
33    This also parses command-line options and modifies its behavior
34    accordingly.
35
36    tests -- a list of strings containing test names (optional)
37    testdir -- the directory in which to look for tests (optional)
38
39    Users other than the Python test suite will certainly want to
40    specify testdir; if it's omitted, the directory containing the
41    Python test suite is searched for.
42
43    If the tests argument is omitted, the tests listed on the
44    command-line will be used.  If that's empty, too, then all *.py
45    files beginning with test_ will be used.
46
47    The other default arguments (verbose, quiet, exclude,
48    single, randomize, findleaks, use_resources, trace, coverdir,
49    print_slow, and random_seed) allow programmers calling main()
50    directly to set the values that would normally be set by flags
51    on the command line.
52    """
53    def __init__(self):
54        # Namespace of command line options
55        self.ns = None
56
57        # tests
58        self.tests = []
59        self.selected = []
60
61        # test results
62        self.good = []
63        self.bad = []
64        self.skipped = []
65        self.resource_denieds = []
66        self.environment_changed = []
67        self.run_no_tests = []
68        self.rerun = []
69        self.first_result = None
70        self.interrupted = False
71
72        # used by --slow
73        self.test_times = []
74
75        # used by --coverage, trace.Trace instance
76        self.tracer = None
77
78        # used to display the progress bar "[ 3/100]"
79        self.start_time = time.monotonic()
80        self.test_count = ''
81        self.test_count_width = 1
82
83        # used by --single
84        self.next_single_test = None
85        self.next_single_filename = None
86
87        # used by --junit-xml
88        self.testsuite_xml = None
89
90        # misc
91        self.win_load_tracker = None
92        self.tmp_dir = None
93        self.worker_test_name = None
94
95    def get_executed(self):
96        return (set(self.good) | set(self.bad) | set(self.skipped)
97                | set(self.resource_denieds) | set(self.environment_changed)
98                | set(self.run_no_tests))
99
100    def accumulate_result(self, result, rerun=False):
101        test_name = result.test_name
102        ok = result.result
103
104        if ok not in (CHILD_ERROR, INTERRUPTED) and not rerun:
105            self.test_times.append((result.test_time, test_name))
106
107        if ok == PASSED:
108            self.good.append(test_name)
109        elif ok in (FAILED, CHILD_ERROR):
110            if not rerun:
111                self.bad.append(test_name)
112        elif ok == ENV_CHANGED:
113            self.environment_changed.append(test_name)
114        elif ok == SKIPPED:
115            self.skipped.append(test_name)
116        elif ok == RESOURCE_DENIED:
117            self.skipped.append(test_name)
118            self.resource_denieds.append(test_name)
119        elif ok == TEST_DID_NOT_RUN:
120            self.run_no_tests.append(test_name)
121        elif ok == INTERRUPTED:
122            self.interrupted = True
123        elif ok == TIMEOUT:
124            self.bad.append(test_name)
125        else:
126            raise ValueError("invalid test result: %r" % ok)
127
128        if rerun and ok not in {FAILED, CHILD_ERROR, INTERRUPTED}:
129            self.bad.remove(test_name)
130
131        xml_data = result.xml_data
132        if xml_data:
133            import xml.etree.ElementTree as ET
134            for e in xml_data:
135                try:
136                    self.testsuite_xml.append(ET.fromstring(e))
137                except ET.ParseError:
138                    print(xml_data, file=sys.__stderr__)
139                    raise
140
141    def log(self, line=''):
142        empty = not line
143
144        # add the system load prefix: "load avg: 1.80 "
145        load_avg = self.getloadavg()
146        if load_avg is not None:
147            line = f"load avg: {load_avg:.2f} {line}"
148
149        # add the timestamp prefix:  "0:01:05 "
150        test_time = time.monotonic() - self.start_time
151        test_time = datetime.timedelta(seconds=int(test_time))
152        line = f"{test_time} {line}"
153
154        if empty:
155            line = line[:-1]
156
157        print(line, flush=True)
158
159    def display_progress(self, test_index, text):
160        if self.ns.quiet:
161            return
162
163        # "[ 51/405/1] test_tcl passed"
164        line = f"{test_index:{self.test_count_width}}{self.test_count}"
165        fails = len(self.bad) + len(self.environment_changed)
166        if fails and not self.ns.pgo:
167            line = f"{line}/{fails}"
168        self.log(f"[{line}] {text}")
169
170    def parse_args(self, kwargs):
171        ns = _parse_args(sys.argv[1:], **kwargs)
172
173        if ns.xmlpath:
174            support.junit_xml_list = self.testsuite_xml = []
175
176        worker_args = ns.worker_args
177        if worker_args is not None:
178            from test.libregrtest.runtest_mp import parse_worker_args
179            ns, test_name = parse_worker_args(ns.worker_args)
180            ns.worker_args = worker_args
181            self.worker_test_name = test_name
182
183        # Strip .py extensions.
184        removepy(ns.args)
185
186        if ns.huntrleaks:
187            warmup, repetitions, _ = ns.huntrleaks
188            if warmup < 1 or repetitions < 1:
189                msg = ("Invalid values for the --huntrleaks/-R parameters. The "
190                       "number of warmups and repetitions must be at least 1 "
191                       "each (1:1).")
192                print(msg, file=sys.stderr, flush=True)
193                sys.exit(2)
194
195        if ns.tempdir:
196            ns.tempdir = os.path.expanduser(ns.tempdir)
197
198        self.ns = ns
199
200    def find_tests(self, tests):
201        self.tests = tests
202
203        if self.ns.single:
204            self.next_single_filename = os.path.join(self.tmp_dir, 'pynexttest')
205            try:
206                with open(self.next_single_filename, 'r') as fp:
207                    next_test = fp.read().strip()
208                    self.tests = [next_test]
209            except OSError:
210                pass
211
212        if self.ns.fromfile:
213            self.tests = []
214            # regex to match 'test_builtin' in line:
215            # '0:00:00 [  4/400] test_builtin -- test_dict took 1 sec'
216            regex = re.compile(r'\btest_[a-zA-Z0-9_]+\b')
217            with open(os.path.join(support.SAVEDCWD, self.ns.fromfile)) as fp:
218                for line in fp:
219                    line = line.split('#', 1)[0]
220                    line = line.strip()
221                    match = regex.search(line)
222                    if match is not None:
223                        self.tests.append(match.group())
224
225        removepy(self.tests)
226
227        stdtests = STDTESTS[:]
228        nottests = NOTTESTS.copy()
229        if self.ns.exclude:
230            for arg in self.ns.args:
231                if arg in stdtests:
232                    stdtests.remove(arg)
233                nottests.add(arg)
234            self.ns.args = []
235
236        # if testdir is set, then we are not running the python tests suite, so
237        # don't add default tests to be executed or skipped (pass empty values)
238        if self.ns.testdir:
239            alltests = findtests(self.ns.testdir, list(), set())
240        else:
241            alltests = findtests(self.ns.testdir, stdtests, nottests)
242
243        if not self.ns.fromfile:
244            self.selected = self.tests or self.ns.args or alltests
245        else:
246            self.selected = self.tests
247        if self.ns.single:
248            self.selected = self.selected[:1]
249            try:
250                pos = alltests.index(self.selected[0])
251                self.next_single_test = alltests[pos + 1]
252            except IndexError:
253                pass
254
255        # Remove all the selected tests that precede start if it's set.
256        if self.ns.start:
257            try:
258                del self.selected[:self.selected.index(self.ns.start)]
259            except ValueError:
260                print("Couldn't find starting test (%s), using all tests"
261                      % self.ns.start, file=sys.stderr)
262
263        if self.ns.randomize:
264            if self.ns.random_seed is None:
265                self.ns.random_seed = random.randrange(10000000)
266            random.seed(self.ns.random_seed)
267            random.shuffle(self.selected)
268
269    def list_tests(self):
270        for name in self.selected:
271            print(name)
272
273    def _list_cases(self, suite):
274        for test in suite:
275            if isinstance(test, unittest.loader._FailedTest):
276                continue
277            if isinstance(test, unittest.TestSuite):
278                self._list_cases(test)
279            elif isinstance(test, unittest.TestCase):
280                if support.match_test(test):
281                    print(test.id())
282
283    def list_cases(self):
284        support.verbose = False
285        support.set_match_tests(self.ns.match_tests, self.ns.ignore_tests)
286
287        for test_name in self.selected:
288            abstest = get_abs_module(self.ns, test_name)
289            try:
290                suite = unittest.defaultTestLoader.loadTestsFromName(abstest)
291                self._list_cases(suite)
292            except unittest.SkipTest:
293                self.skipped.append(test_name)
294
295        if self.skipped:
296            print(file=sys.stderr)
297            print(count(len(self.skipped), "test"), "skipped:", file=sys.stderr)
298            printlist(self.skipped, file=sys.stderr)
299
300    def rerun_failed_tests(self):
301        self.ns.verbose = True
302        self.ns.failfast = False
303        self.ns.verbose3 = False
304
305        self.first_result = self.get_tests_result()
306
307        self.log()
308        self.log("Re-running failed tests in verbose mode")
309        self.rerun = self.bad[:]
310        for test_name in self.rerun:
311            self.log(f"Re-running {test_name} in verbose mode")
312            self.ns.verbose = True
313            result = runtest(self.ns, test_name)
314
315            self.accumulate_result(result, rerun=True)
316
317            if result.result == INTERRUPTED:
318                break
319
320        if self.bad:
321            print(count(len(self.bad), 'test'), "failed again:")
322            printlist(self.bad)
323
324        self.display_result()
325
326    def display_result(self):
327        # If running the test suite for PGO then no one cares about results.
328        if self.ns.pgo:
329            return
330
331        print()
332        print("== Tests result: %s ==" % self.get_tests_result())
333
334        if self.interrupted:
335            print("Test suite interrupted by signal SIGINT.")
336
337        omitted = set(self.selected) - self.get_executed()
338        if omitted:
339            print()
340            print(count(len(omitted), "test"), "omitted:")
341            printlist(omitted)
342
343        if self.good and not self.ns.quiet:
344            print()
345            if (not self.bad
346                and not self.skipped
347                and not self.interrupted
348                and len(self.good) > 1):
349                print("All", end=' ')
350            print(count(len(self.good), "test"), "OK.")
351
352        if self.ns.print_slow:
353            self.test_times.sort(reverse=True)
354            print()
355            print("10 slowest tests:")
356            for test_time, test in self.test_times[:10]:
357                print("- %s: %s" % (test, format_duration(test_time)))
358
359        if self.bad:
360            print()
361            print(count(len(self.bad), "test"), "failed:")
362            printlist(self.bad)
363
364        if self.environment_changed:
365            print()
366            print("{} altered the execution environment:".format(
367                     count(len(self.environment_changed), "test")))
368            printlist(self.environment_changed)
369
370        if self.skipped and not self.ns.quiet:
371            print()
372            print(count(len(self.skipped), "test"), "skipped:")
373            printlist(self.skipped)
374
375        if self.rerun:
376            print()
377            print("%s:" % count(len(self.rerun), "re-run test"))
378            printlist(self.rerun)
379
380        if self.run_no_tests:
381            print()
382            print(count(len(self.run_no_tests), "test"), "run no tests:")
383            printlist(self.run_no_tests)
384
385    def run_tests_sequential(self):
386        if self.ns.trace:
387            import trace
388            self.tracer = trace.Trace(trace=False, count=True)
389
390        save_modules = sys.modules.keys()
391
392        msg = "Run tests sequentially"
393        if self.ns.timeout:
394            msg += " (timeout: %s)" % format_duration(self.ns.timeout)
395        self.log(msg)
396
397        previous_test = None
398        for test_index, test_name in enumerate(self.tests, 1):
399            start_time = time.monotonic()
400
401            text = test_name
402            if previous_test:
403                text = '%s -- %s' % (text, previous_test)
404            self.display_progress(test_index, text)
405
406            if self.tracer:
407                # If we're tracing code coverage, then we don't exit with status
408                # if on a false return value from main.
409                cmd = ('result = runtest(self.ns, test_name); '
410                       'self.accumulate_result(result)')
411                ns = dict(locals())
412                self.tracer.runctx(cmd, globals=globals(), locals=ns)
413                result = ns['result']
414            else:
415                result = runtest(self.ns, test_name)
416                self.accumulate_result(result)
417
418            if result.result == INTERRUPTED:
419                break
420
421            previous_test = format_test_result(result)
422            test_time = time.monotonic() - start_time
423            if test_time >= PROGRESS_MIN_TIME:
424                previous_test = "%s in %s" % (previous_test, format_duration(test_time))
425            elif result.result == PASSED:
426                # be quiet: say nothing if the test passed shortly
427                previous_test = None
428
429            # Unload the newly imported modules (best effort finalization)
430            for module in sys.modules.keys():
431                if module not in save_modules and module.startswith("test."):
432                    support.unload(module)
433
434            if self.ns.failfast and is_failed(result, self.ns):
435                break
436
437        if previous_test:
438            print(previous_test)
439
440    def _test_forever(self, tests):
441        while True:
442            for test_name in tests:
443                yield test_name
444                if self.bad:
445                    return
446                if self.ns.fail_env_changed and self.environment_changed:
447                    return
448
449    def display_header(self):
450        # Print basic platform information
451        print("==", platform.python_implementation(), *sys.version.split())
452        print("==", platform.platform(aliased=True),
453                      "%s-endian" % sys.byteorder)
454        print("== cwd:", os.getcwd())
455        cpu_count = os.cpu_count()
456        if cpu_count:
457            print("== CPU count:", cpu_count)
458        print("== encodings: locale=%s, FS=%s"
459              % (locale.getpreferredencoding(False),
460                 sys.getfilesystemencoding()))
461
462    def get_tests_result(self):
463        result = []
464        if self.bad:
465            result.append("FAILURE")
466        elif self.ns.fail_env_changed and self.environment_changed:
467            result.append("ENV CHANGED")
468        elif not any((self.good, self.bad, self.skipped, self.interrupted,
469            self.environment_changed)):
470            result.append("NO TEST RUN")
471
472        if self.interrupted:
473            result.append("INTERRUPTED")
474
475        if not result:
476            result.append("SUCCESS")
477
478        result = ', '.join(result)
479        if self.first_result:
480            result = '%s then %s' % (self.first_result, result)
481        return result
482
483    def run_tests(self):
484        # For a partial run, we do not need to clutter the output.
485        if (self.ns.header
486            or not(self.ns.pgo or self.ns.quiet or self.ns.single
487                   or self.tests or self.ns.args)):
488            self.display_header()
489
490        if self.ns.huntrleaks:
491            warmup, repetitions, _ = self.ns.huntrleaks
492            if warmup < 3:
493                msg = ("WARNING: Running tests with --huntrleaks/-R and less than "
494                        "3 warmup repetitions can give false positives!")
495                print(msg, file=sys.stdout, flush=True)
496
497        if self.ns.randomize:
498            print("Using random seed", self.ns.random_seed)
499
500        if self.ns.forever:
501            self.tests = self._test_forever(list(self.selected))
502            self.test_count = ''
503            self.test_count_width = 3
504        else:
505            self.tests = iter(self.selected)
506            self.test_count = '/{}'.format(len(self.selected))
507            self.test_count_width = len(self.test_count) - 1
508
509        if self.ns.use_mp:
510            from test.libregrtest.runtest_mp import run_tests_multiprocess
511            run_tests_multiprocess(self)
512        else:
513            self.run_tests_sequential()
514
515    def finalize(self):
516        if self.next_single_filename:
517            if self.next_single_test:
518                with open(self.next_single_filename, 'w') as fp:
519                    fp.write(self.next_single_test + '\n')
520            else:
521                os.unlink(self.next_single_filename)
522
523        if self.tracer:
524            r = self.tracer.results()
525            r.write_results(show_missing=True, summary=True,
526                            coverdir=self.ns.coverdir)
527
528        print()
529        duration = time.monotonic() - self.start_time
530        print("Total duration: %s" % format_duration(duration))
531        print("Tests result: %s" % self.get_tests_result())
532
533        if self.ns.runleaks:
534            os.system("leaks %d" % os.getpid())
535
536    def save_xml_result(self):
537        if not self.ns.xmlpath and not self.testsuite_xml:
538            return
539
540        import xml.etree.ElementTree as ET
541        root = ET.Element("testsuites")
542
543        # Manually count the totals for the overall summary
544        totals = {'tests': 0, 'errors': 0, 'failures': 0}
545        for suite in self.testsuite_xml:
546            root.append(suite)
547            for k in totals:
548                try:
549                    totals[k] += int(suite.get(k, 0))
550                except ValueError:
551                    pass
552
553        for k, v in totals.items():
554            root.set(k, str(v))
555
556        xmlpath = os.path.join(support.SAVEDCWD, self.ns.xmlpath)
557        with open(xmlpath, 'wb') as f:
558            for s in ET.tostringlist(root):
559                f.write(s)
560
561    def set_temp_dir(self):
562        if self.ns.tempdir:
563            self.tmp_dir = self.ns.tempdir
564
565        if not self.tmp_dir:
566            # When tests are run from the Python build directory, it is best practice
567            # to keep the test files in a subfolder.  This eases the cleanup of leftover
568            # files using the "make distclean" command.
569            if sysconfig.is_python_build():
570                self.tmp_dir = sysconfig.get_config_var('abs_builddir')
571                if self.tmp_dir is None:
572                    # bpo-30284: On Windows, only srcdir is available. Using
573                    # abs_builddir mostly matters on UNIX when building Python
574                    # out of the source tree, especially when the source tree
575                    # is read only.
576                    self.tmp_dir = sysconfig.get_config_var('srcdir')
577                self.tmp_dir = os.path.join(self.tmp_dir, 'build')
578            else:
579                self.tmp_dir = tempfile.gettempdir()
580
581        self.tmp_dir = os.path.abspath(self.tmp_dir)
582
583    def create_temp_dir(self):
584        os.makedirs(self.tmp_dir, exist_ok=True)
585
586        # Define a writable temp dir that will be used as cwd while running
587        # the tests. The name of the dir includes the pid to allow parallel
588        # testing (see the -j option).
589        pid = os.getpid()
590        if self.worker_test_name is not None:
591            test_cwd = 'test_python_worker_{}'.format(pid)
592        else:
593            test_cwd = 'test_python_{}'.format(pid)
594        test_cwd = os.path.join(self.tmp_dir, test_cwd)
595        return test_cwd
596
597    def cleanup(self):
598        import glob
599
600        path = os.path.join(self.tmp_dir, 'test_python_*')
601        print("Cleanup %s directory" % self.tmp_dir)
602        for name in glob.glob(path):
603            if os.path.isdir(name):
604                print("Remove directory: %s" % name)
605                support.rmtree(name)
606            else:
607                print("Remove file: %s" % name)
608                support.unlink(name)
609
610    def main(self, tests=None, **kwargs):
611        self.parse_args(kwargs)
612
613        self.set_temp_dir()
614
615        if self.ns.cleanup:
616            self.cleanup()
617            sys.exit(0)
618
619        test_cwd = self.create_temp_dir()
620
621        try:
622            # Run the tests in a context manager that temporarily changes the CWD
623            # to a temporary and writable directory. If it's not possible to
624            # create or change the CWD, the original CWD will be used.
625            # The original CWD is available from support.SAVEDCWD.
626            with support.temp_cwd(test_cwd, quiet=True):
627                # When using multiprocessing, worker processes will use test_cwd
628                # as their parent temporary directory. So when the main process
629                # exit, it removes also subdirectories of worker processes.
630                self.ns.tempdir = test_cwd
631
632                self._main(tests, kwargs)
633        except SystemExit as exc:
634            # bpo-38203: Python can hang at exit in Py_Finalize(), especially
635            # on threading._shutdown() call: put a timeout
636            faulthandler.dump_traceback_later(EXIT_TIMEOUT, exit=True)
637
638            sys.exit(exc.code)
639
640    def getloadavg(self):
641        if self.win_load_tracker is not None:
642            return self.win_load_tracker.getloadavg()
643
644        if hasattr(os, 'getloadavg'):
645            return os.getloadavg()[0]
646
647        return None
648
649    def _main(self, tests, kwargs):
650        if self.worker_test_name is not None:
651            from test.libregrtest.runtest_mp import run_tests_worker
652            run_tests_worker(self.ns, self.worker_test_name)
653
654        if self.ns.wait:
655            input("Press any key to continue...")
656
657        support.PGO = self.ns.pgo
658
659        setup_tests(self.ns)
660
661        self.find_tests(tests)
662
663        if self.ns.list_tests:
664            self.list_tests()
665            sys.exit(0)
666
667        if self.ns.list_cases:
668            self.list_cases()
669            sys.exit(0)
670
671        # If we're on windows and this is the parent runner (not a worker),
672        # track the load average.
673        if sys.platform == 'win32' and self.worker_test_name is None:
674            from test.libregrtest.win_utils import WindowsLoadTracker
675
676            try:
677                self.win_load_tracker = WindowsLoadTracker()
678            except FileNotFoundError as error:
679                # Windows IoT Core and Windows Nano Server do not provide
680                # typeperf.exe for x64, x86 or ARM
681                print(f'Failed to create WindowsLoadTracker: {error}')
682
683        try:
684            self.run_tests()
685            self.display_result()
686
687            if self.ns.verbose2 and self.bad:
688                self.rerun_failed_tests()
689        finally:
690            if self.win_load_tracker is not None:
691                self.win_load_tracker.close()
692                self.win_load_tracker = None
693
694        self.finalize()
695
696        self.save_xml_result()
697
698        if self.bad:
699            sys.exit(2)
700        if self.interrupted:
701            sys.exit(130)
702        if self.ns.fail_env_changed and self.environment_changed:
703            sys.exit(3)
704        sys.exit(0)
705
706
707def main(tests=None, **kwargs):
708    """Run the Python suite."""
709    Regrtest().main(tests=tests, **kwargs)
710