1import datetime 2import faulthandler 3import locale 4import os 5import platform 6import random 7import re 8import sys 9import sysconfig 10import tempfile 11import time 12import unittest 13from test.libregrtest.cmdline import _parse_args 14from test.libregrtest.runtest import ( 15 findtests, runtest, get_abs_module, 16 STDTESTS, NOTTESTS, PASSED, FAILED, ENV_CHANGED, SKIPPED, RESOURCE_DENIED, 17 INTERRUPTED, CHILD_ERROR, TEST_DID_NOT_RUN, TIMEOUT, 18 PROGRESS_MIN_TIME, format_test_result, is_failed) 19from test.libregrtest.setup import setup_tests 20from test.libregrtest.utils import removepy, count, format_duration, printlist 21from test import support 22 23 24# bpo-38203: Maximum delay in seconds to exit Python (call Py_Finalize()). 25# Used to protect against threading._shutdown() hang. 26# Must be smaller than buildbot "1200 seconds without output" limit. 27EXIT_TIMEOUT = 120.0 28 29 30class Regrtest: 31 """Execute a test suite. 32 33 This also parses command-line options and modifies its behavior 34 accordingly. 35 36 tests -- a list of strings containing test names (optional) 37 testdir -- the directory in which to look for tests (optional) 38 39 Users other than the Python test suite will certainly want to 40 specify testdir; if it's omitted, the directory containing the 41 Python test suite is searched for. 42 43 If the tests argument is omitted, the tests listed on the 44 command-line will be used. If that's empty, too, then all *.py 45 files beginning with test_ will be used. 46 47 The other default arguments (verbose, quiet, exclude, 48 single, randomize, findleaks, use_resources, trace, coverdir, 49 print_slow, and random_seed) allow programmers calling main() 50 directly to set the values that would normally be set by flags 51 on the command line. 52 """ 53 def __init__(self): 54 # Namespace of command line options 55 self.ns = None 56 57 # tests 58 self.tests = [] 59 self.selected = [] 60 61 # test results 62 self.good = [] 63 self.bad = [] 64 self.skipped = [] 65 self.resource_denieds = [] 66 self.environment_changed = [] 67 self.run_no_tests = [] 68 self.rerun = [] 69 self.first_result = None 70 self.interrupted = False 71 72 # used by --slow 73 self.test_times = [] 74 75 # used by --coverage, trace.Trace instance 76 self.tracer = None 77 78 # used to display the progress bar "[ 3/100]" 79 self.start_time = time.monotonic() 80 self.test_count = '' 81 self.test_count_width = 1 82 83 # used by --single 84 self.next_single_test = None 85 self.next_single_filename = None 86 87 # used by --junit-xml 88 self.testsuite_xml = None 89 90 # misc 91 self.win_load_tracker = None 92 self.tmp_dir = None 93 self.worker_test_name = None 94 95 def get_executed(self): 96 return (set(self.good) | set(self.bad) | set(self.skipped) 97 | set(self.resource_denieds) | set(self.environment_changed) 98 | set(self.run_no_tests)) 99 100 def accumulate_result(self, result, rerun=False): 101 test_name = result.test_name 102 ok = result.result 103 104 if ok not in (CHILD_ERROR, INTERRUPTED) and not rerun: 105 self.test_times.append((result.test_time, test_name)) 106 107 if ok == PASSED: 108 self.good.append(test_name) 109 elif ok in (FAILED, CHILD_ERROR): 110 if not rerun: 111 self.bad.append(test_name) 112 elif ok == ENV_CHANGED: 113 self.environment_changed.append(test_name) 114 elif ok == SKIPPED: 115 self.skipped.append(test_name) 116 elif ok == RESOURCE_DENIED: 117 self.skipped.append(test_name) 118 self.resource_denieds.append(test_name) 119 elif ok == TEST_DID_NOT_RUN: 120 self.run_no_tests.append(test_name) 121 elif ok == INTERRUPTED: 122 self.interrupted = True 123 elif ok == TIMEOUT: 124 self.bad.append(test_name) 125 else: 126 raise ValueError("invalid test result: %r" % ok) 127 128 if rerun and ok not in {FAILED, CHILD_ERROR, INTERRUPTED}: 129 self.bad.remove(test_name) 130 131 xml_data = result.xml_data 132 if xml_data: 133 import xml.etree.ElementTree as ET 134 for e in xml_data: 135 try: 136 self.testsuite_xml.append(ET.fromstring(e)) 137 except ET.ParseError: 138 print(xml_data, file=sys.__stderr__) 139 raise 140 141 def log(self, line=''): 142 empty = not line 143 144 # add the system load prefix: "load avg: 1.80 " 145 load_avg = self.getloadavg() 146 if load_avg is not None: 147 line = f"load avg: {load_avg:.2f} {line}" 148 149 # add the timestamp prefix: "0:01:05 " 150 test_time = time.monotonic() - self.start_time 151 test_time = datetime.timedelta(seconds=int(test_time)) 152 line = f"{test_time} {line}" 153 154 if empty: 155 line = line[:-1] 156 157 print(line, flush=True) 158 159 def display_progress(self, test_index, text): 160 if self.ns.quiet: 161 return 162 163 # "[ 51/405/1] test_tcl passed" 164 line = f"{test_index:{self.test_count_width}}{self.test_count}" 165 fails = len(self.bad) + len(self.environment_changed) 166 if fails and not self.ns.pgo: 167 line = f"{line}/{fails}" 168 self.log(f"[{line}] {text}") 169 170 def parse_args(self, kwargs): 171 ns = _parse_args(sys.argv[1:], **kwargs) 172 173 if ns.xmlpath: 174 support.junit_xml_list = self.testsuite_xml = [] 175 176 worker_args = ns.worker_args 177 if worker_args is not None: 178 from test.libregrtest.runtest_mp import parse_worker_args 179 ns, test_name = parse_worker_args(ns.worker_args) 180 ns.worker_args = worker_args 181 self.worker_test_name = test_name 182 183 # Strip .py extensions. 184 removepy(ns.args) 185 186 if ns.huntrleaks: 187 warmup, repetitions, _ = ns.huntrleaks 188 if warmup < 1 or repetitions < 1: 189 msg = ("Invalid values for the --huntrleaks/-R parameters. The " 190 "number of warmups and repetitions must be at least 1 " 191 "each (1:1).") 192 print(msg, file=sys.stderr, flush=True) 193 sys.exit(2) 194 195 if ns.tempdir: 196 ns.tempdir = os.path.expanduser(ns.tempdir) 197 198 self.ns = ns 199 200 def find_tests(self, tests): 201 self.tests = tests 202 203 if self.ns.single: 204 self.next_single_filename = os.path.join(self.tmp_dir, 'pynexttest') 205 try: 206 with open(self.next_single_filename, 'r') as fp: 207 next_test = fp.read().strip() 208 self.tests = [next_test] 209 except OSError: 210 pass 211 212 if self.ns.fromfile: 213 self.tests = [] 214 # regex to match 'test_builtin' in line: 215 # '0:00:00 [ 4/400] test_builtin -- test_dict took 1 sec' 216 regex = re.compile(r'\btest_[a-zA-Z0-9_]+\b') 217 with open(os.path.join(support.SAVEDCWD, self.ns.fromfile)) as fp: 218 for line in fp: 219 line = line.split('#', 1)[0] 220 line = line.strip() 221 match = regex.search(line) 222 if match is not None: 223 self.tests.append(match.group()) 224 225 removepy(self.tests) 226 227 stdtests = STDTESTS[:] 228 nottests = NOTTESTS.copy() 229 if self.ns.exclude: 230 for arg in self.ns.args: 231 if arg in stdtests: 232 stdtests.remove(arg) 233 nottests.add(arg) 234 self.ns.args = [] 235 236 # if testdir is set, then we are not running the python tests suite, so 237 # don't add default tests to be executed or skipped (pass empty values) 238 if self.ns.testdir: 239 alltests = findtests(self.ns.testdir, list(), set()) 240 else: 241 alltests = findtests(self.ns.testdir, stdtests, nottests) 242 243 if not self.ns.fromfile: 244 self.selected = self.tests or self.ns.args or alltests 245 else: 246 self.selected = self.tests 247 if self.ns.single: 248 self.selected = self.selected[:1] 249 try: 250 pos = alltests.index(self.selected[0]) 251 self.next_single_test = alltests[pos + 1] 252 except IndexError: 253 pass 254 255 # Remove all the selected tests that precede start if it's set. 256 if self.ns.start: 257 try: 258 del self.selected[:self.selected.index(self.ns.start)] 259 except ValueError: 260 print("Couldn't find starting test (%s), using all tests" 261 % self.ns.start, file=sys.stderr) 262 263 if self.ns.randomize: 264 if self.ns.random_seed is None: 265 self.ns.random_seed = random.randrange(10000000) 266 random.seed(self.ns.random_seed) 267 random.shuffle(self.selected) 268 269 def list_tests(self): 270 for name in self.selected: 271 print(name) 272 273 def _list_cases(self, suite): 274 for test in suite: 275 if isinstance(test, unittest.loader._FailedTest): 276 continue 277 if isinstance(test, unittest.TestSuite): 278 self._list_cases(test) 279 elif isinstance(test, unittest.TestCase): 280 if support.match_test(test): 281 print(test.id()) 282 283 def list_cases(self): 284 support.verbose = False 285 support.set_match_tests(self.ns.match_tests, self.ns.ignore_tests) 286 287 for test_name in self.selected: 288 abstest = get_abs_module(self.ns, test_name) 289 try: 290 suite = unittest.defaultTestLoader.loadTestsFromName(abstest) 291 self._list_cases(suite) 292 except unittest.SkipTest: 293 self.skipped.append(test_name) 294 295 if self.skipped: 296 print(file=sys.stderr) 297 print(count(len(self.skipped), "test"), "skipped:", file=sys.stderr) 298 printlist(self.skipped, file=sys.stderr) 299 300 def rerun_failed_tests(self): 301 self.ns.verbose = True 302 self.ns.failfast = False 303 self.ns.verbose3 = False 304 305 self.first_result = self.get_tests_result() 306 307 self.log() 308 self.log("Re-running failed tests in verbose mode") 309 self.rerun = self.bad[:] 310 for test_name in self.rerun: 311 self.log(f"Re-running {test_name} in verbose mode") 312 self.ns.verbose = True 313 result = runtest(self.ns, test_name) 314 315 self.accumulate_result(result, rerun=True) 316 317 if result.result == INTERRUPTED: 318 break 319 320 if self.bad: 321 print(count(len(self.bad), 'test'), "failed again:") 322 printlist(self.bad) 323 324 self.display_result() 325 326 def display_result(self): 327 # If running the test suite for PGO then no one cares about results. 328 if self.ns.pgo: 329 return 330 331 print() 332 print("== Tests result: %s ==" % self.get_tests_result()) 333 334 if self.interrupted: 335 print("Test suite interrupted by signal SIGINT.") 336 337 omitted = set(self.selected) - self.get_executed() 338 if omitted: 339 print() 340 print(count(len(omitted), "test"), "omitted:") 341 printlist(omitted) 342 343 if self.good and not self.ns.quiet: 344 print() 345 if (not self.bad 346 and not self.skipped 347 and not self.interrupted 348 and len(self.good) > 1): 349 print("All", end=' ') 350 print(count(len(self.good), "test"), "OK.") 351 352 if self.ns.print_slow: 353 self.test_times.sort(reverse=True) 354 print() 355 print("10 slowest tests:") 356 for test_time, test in self.test_times[:10]: 357 print("- %s: %s" % (test, format_duration(test_time))) 358 359 if self.bad: 360 print() 361 print(count(len(self.bad), "test"), "failed:") 362 printlist(self.bad) 363 364 if self.environment_changed: 365 print() 366 print("{} altered the execution environment:".format( 367 count(len(self.environment_changed), "test"))) 368 printlist(self.environment_changed) 369 370 if self.skipped and not self.ns.quiet: 371 print() 372 print(count(len(self.skipped), "test"), "skipped:") 373 printlist(self.skipped) 374 375 if self.rerun: 376 print() 377 print("%s:" % count(len(self.rerun), "re-run test")) 378 printlist(self.rerun) 379 380 if self.run_no_tests: 381 print() 382 print(count(len(self.run_no_tests), "test"), "run no tests:") 383 printlist(self.run_no_tests) 384 385 def run_tests_sequential(self): 386 if self.ns.trace: 387 import trace 388 self.tracer = trace.Trace(trace=False, count=True) 389 390 save_modules = sys.modules.keys() 391 392 msg = "Run tests sequentially" 393 if self.ns.timeout: 394 msg += " (timeout: %s)" % format_duration(self.ns.timeout) 395 self.log(msg) 396 397 previous_test = None 398 for test_index, test_name in enumerate(self.tests, 1): 399 start_time = time.monotonic() 400 401 text = test_name 402 if previous_test: 403 text = '%s -- %s' % (text, previous_test) 404 self.display_progress(test_index, text) 405 406 if self.tracer: 407 # If we're tracing code coverage, then we don't exit with status 408 # if on a false return value from main. 409 cmd = ('result = runtest(self.ns, test_name); ' 410 'self.accumulate_result(result)') 411 ns = dict(locals()) 412 self.tracer.runctx(cmd, globals=globals(), locals=ns) 413 result = ns['result'] 414 else: 415 result = runtest(self.ns, test_name) 416 self.accumulate_result(result) 417 418 if result.result == INTERRUPTED: 419 break 420 421 previous_test = format_test_result(result) 422 test_time = time.monotonic() - start_time 423 if test_time >= PROGRESS_MIN_TIME: 424 previous_test = "%s in %s" % (previous_test, format_duration(test_time)) 425 elif result.result == PASSED: 426 # be quiet: say nothing if the test passed shortly 427 previous_test = None 428 429 # Unload the newly imported modules (best effort finalization) 430 for module in sys.modules.keys(): 431 if module not in save_modules and module.startswith("test."): 432 support.unload(module) 433 434 if self.ns.failfast and is_failed(result, self.ns): 435 break 436 437 if previous_test: 438 print(previous_test) 439 440 def _test_forever(self, tests): 441 while True: 442 for test_name in tests: 443 yield test_name 444 if self.bad: 445 return 446 if self.ns.fail_env_changed and self.environment_changed: 447 return 448 449 def display_header(self): 450 # Print basic platform information 451 print("==", platform.python_implementation(), *sys.version.split()) 452 print("==", platform.platform(aliased=True), 453 "%s-endian" % sys.byteorder) 454 print("== cwd:", os.getcwd()) 455 cpu_count = os.cpu_count() 456 if cpu_count: 457 print("== CPU count:", cpu_count) 458 print("== encodings: locale=%s, FS=%s" 459 % (locale.getpreferredencoding(False), 460 sys.getfilesystemencoding())) 461 462 def get_tests_result(self): 463 result = [] 464 if self.bad: 465 result.append("FAILURE") 466 elif self.ns.fail_env_changed and self.environment_changed: 467 result.append("ENV CHANGED") 468 elif not any((self.good, self.bad, self.skipped, self.interrupted, 469 self.environment_changed)): 470 result.append("NO TEST RUN") 471 472 if self.interrupted: 473 result.append("INTERRUPTED") 474 475 if not result: 476 result.append("SUCCESS") 477 478 result = ', '.join(result) 479 if self.first_result: 480 result = '%s then %s' % (self.first_result, result) 481 return result 482 483 def run_tests(self): 484 # For a partial run, we do not need to clutter the output. 485 if (self.ns.header 486 or not(self.ns.pgo or self.ns.quiet or self.ns.single 487 or self.tests or self.ns.args)): 488 self.display_header() 489 490 if self.ns.huntrleaks: 491 warmup, repetitions, _ = self.ns.huntrleaks 492 if warmup < 3: 493 msg = ("WARNING: Running tests with --huntrleaks/-R and less than " 494 "3 warmup repetitions can give false positives!") 495 print(msg, file=sys.stdout, flush=True) 496 497 if self.ns.randomize: 498 print("Using random seed", self.ns.random_seed) 499 500 if self.ns.forever: 501 self.tests = self._test_forever(list(self.selected)) 502 self.test_count = '' 503 self.test_count_width = 3 504 else: 505 self.tests = iter(self.selected) 506 self.test_count = '/{}'.format(len(self.selected)) 507 self.test_count_width = len(self.test_count) - 1 508 509 if self.ns.use_mp: 510 from test.libregrtest.runtest_mp import run_tests_multiprocess 511 run_tests_multiprocess(self) 512 else: 513 self.run_tests_sequential() 514 515 def finalize(self): 516 if self.next_single_filename: 517 if self.next_single_test: 518 with open(self.next_single_filename, 'w') as fp: 519 fp.write(self.next_single_test + '\n') 520 else: 521 os.unlink(self.next_single_filename) 522 523 if self.tracer: 524 r = self.tracer.results() 525 r.write_results(show_missing=True, summary=True, 526 coverdir=self.ns.coverdir) 527 528 print() 529 duration = time.monotonic() - self.start_time 530 print("Total duration: %s" % format_duration(duration)) 531 print("Tests result: %s" % self.get_tests_result()) 532 533 if self.ns.runleaks: 534 os.system("leaks %d" % os.getpid()) 535 536 def save_xml_result(self): 537 if not self.ns.xmlpath and not self.testsuite_xml: 538 return 539 540 import xml.etree.ElementTree as ET 541 root = ET.Element("testsuites") 542 543 # Manually count the totals for the overall summary 544 totals = {'tests': 0, 'errors': 0, 'failures': 0} 545 for suite in self.testsuite_xml: 546 root.append(suite) 547 for k in totals: 548 try: 549 totals[k] += int(suite.get(k, 0)) 550 except ValueError: 551 pass 552 553 for k, v in totals.items(): 554 root.set(k, str(v)) 555 556 xmlpath = os.path.join(support.SAVEDCWD, self.ns.xmlpath) 557 with open(xmlpath, 'wb') as f: 558 for s in ET.tostringlist(root): 559 f.write(s) 560 561 def set_temp_dir(self): 562 if self.ns.tempdir: 563 self.tmp_dir = self.ns.tempdir 564 565 if not self.tmp_dir: 566 # When tests are run from the Python build directory, it is best practice 567 # to keep the test files in a subfolder. This eases the cleanup of leftover 568 # files using the "make distclean" command. 569 if sysconfig.is_python_build(): 570 self.tmp_dir = sysconfig.get_config_var('abs_builddir') 571 if self.tmp_dir is None: 572 # bpo-30284: On Windows, only srcdir is available. Using 573 # abs_builddir mostly matters on UNIX when building Python 574 # out of the source tree, especially when the source tree 575 # is read only. 576 self.tmp_dir = sysconfig.get_config_var('srcdir') 577 self.tmp_dir = os.path.join(self.tmp_dir, 'build') 578 else: 579 self.tmp_dir = tempfile.gettempdir() 580 581 self.tmp_dir = os.path.abspath(self.tmp_dir) 582 583 def create_temp_dir(self): 584 os.makedirs(self.tmp_dir, exist_ok=True) 585 586 # Define a writable temp dir that will be used as cwd while running 587 # the tests. The name of the dir includes the pid to allow parallel 588 # testing (see the -j option). 589 pid = os.getpid() 590 if self.worker_test_name is not None: 591 test_cwd = 'test_python_worker_{}'.format(pid) 592 else: 593 test_cwd = 'test_python_{}'.format(pid) 594 test_cwd = os.path.join(self.tmp_dir, test_cwd) 595 return test_cwd 596 597 def cleanup(self): 598 import glob 599 600 path = os.path.join(self.tmp_dir, 'test_python_*') 601 print("Cleanup %s directory" % self.tmp_dir) 602 for name in glob.glob(path): 603 if os.path.isdir(name): 604 print("Remove directory: %s" % name) 605 support.rmtree(name) 606 else: 607 print("Remove file: %s" % name) 608 support.unlink(name) 609 610 def main(self, tests=None, **kwargs): 611 self.parse_args(kwargs) 612 613 self.set_temp_dir() 614 615 if self.ns.cleanup: 616 self.cleanup() 617 sys.exit(0) 618 619 test_cwd = self.create_temp_dir() 620 621 try: 622 # Run the tests in a context manager that temporarily changes the CWD 623 # to a temporary and writable directory. If it's not possible to 624 # create or change the CWD, the original CWD will be used. 625 # The original CWD is available from support.SAVEDCWD. 626 with support.temp_cwd(test_cwd, quiet=True): 627 # When using multiprocessing, worker processes will use test_cwd 628 # as their parent temporary directory. So when the main process 629 # exit, it removes also subdirectories of worker processes. 630 self.ns.tempdir = test_cwd 631 632 self._main(tests, kwargs) 633 except SystemExit as exc: 634 # bpo-38203: Python can hang at exit in Py_Finalize(), especially 635 # on threading._shutdown() call: put a timeout 636 faulthandler.dump_traceback_later(EXIT_TIMEOUT, exit=True) 637 638 sys.exit(exc.code) 639 640 def getloadavg(self): 641 if self.win_load_tracker is not None: 642 return self.win_load_tracker.getloadavg() 643 644 if hasattr(os, 'getloadavg'): 645 return os.getloadavg()[0] 646 647 return None 648 649 def _main(self, tests, kwargs): 650 if self.worker_test_name is not None: 651 from test.libregrtest.runtest_mp import run_tests_worker 652 run_tests_worker(self.ns, self.worker_test_name) 653 654 if self.ns.wait: 655 input("Press any key to continue...") 656 657 support.PGO = self.ns.pgo 658 659 setup_tests(self.ns) 660 661 self.find_tests(tests) 662 663 if self.ns.list_tests: 664 self.list_tests() 665 sys.exit(0) 666 667 if self.ns.list_cases: 668 self.list_cases() 669 sys.exit(0) 670 671 # If we're on windows and this is the parent runner (not a worker), 672 # track the load average. 673 if sys.platform == 'win32' and self.worker_test_name is None: 674 from test.libregrtest.win_utils import WindowsLoadTracker 675 676 try: 677 self.win_load_tracker = WindowsLoadTracker() 678 except FileNotFoundError as error: 679 # Windows IoT Core and Windows Nano Server do not provide 680 # typeperf.exe for x64, x86 or ARM 681 print(f'Failed to create WindowsLoadTracker: {error}') 682 683 try: 684 self.run_tests() 685 self.display_result() 686 687 if self.ns.verbose2 and self.bad: 688 self.rerun_failed_tests() 689 finally: 690 if self.win_load_tracker is not None: 691 self.win_load_tracker.close() 692 self.win_load_tracker = None 693 694 self.finalize() 695 696 self.save_xml_result() 697 698 if self.bad: 699 sys.exit(2) 700 if self.interrupted: 701 sys.exit(130) 702 if self.ns.fail_env_changed and self.environment_changed: 703 sys.exit(3) 704 sys.exit(0) 705 706 707def main(tests=None, **kwargs): 708 """Run the Python suite.""" 709 Regrtest().main(tests=tests, **kwargs) 710