1"""Supporting definitions for the Python regression tests.""" 2 3if __name__ != 'test.support': 4 raise ImportError('support must be imported from the test package') 5 6import contextlib 7import functools 8import os 9import re 10import stat 11import sys 12import sysconfig 13import time 14import types 15import unittest 16import warnings 17 18from .testresult import get_test_runner 19 20 21try: 22 from _testcapi import unicode_legacy_string 23except ImportError: 24 unicode_legacy_string = None 25 26__all__ = [ 27 # globals 28 "PIPE_MAX_SIZE", "verbose", "max_memuse", "use_resources", "failfast", 29 # exceptions 30 "Error", "TestFailed", "TestDidNotRun", "ResourceDenied", 31 # io 32 "record_original_stdout", "get_original_stdout", "captured_stdout", 33 "captured_stdin", "captured_stderr", 34 # unittest 35 "is_resource_enabled", "requires", "requires_freebsd_version", 36 "requires_linux_version", "requires_mac_ver", 37 "check_syntax_error", 38 "BasicTestRunner", "run_unittest", "run_doctest", 39 "requires_gzip", "requires_bz2", "requires_lzma", 40 "bigmemtest", "bigaddrspacetest", "cpython_only", "get_attribute", 41 "requires_IEEE_754", "requires_zlib", 42 "anticipate_failure", "load_package_tests", "detect_api_mismatch", 43 "check__all__", "skip_if_buggy_ucrt_strfptime", 44 "check_disallow_instantiation", 45 # sys 46 "is_jython", "is_android", "check_impl_detail", "unix_shell", 47 "setswitchinterval", 48 # network 49 "open_urlresource", 50 # processes 51 "reap_children", 52 # miscellaneous 53 "run_with_locale", "swap_item", "findfile", "infinite_recursion", 54 "swap_attr", "Matcher", "set_memlimit", "SuppressCrashReport", "sortdict", 55 "run_with_tz", "PGO", "missing_compiler_executable", 56 "ALWAYS_EQ", "NEVER_EQ", "LARGEST", "SMALLEST", 57 "LOOPBACK_TIMEOUT", "INTERNET_TIMEOUT", "SHORT_TIMEOUT", "LONG_TIMEOUT", 58 ] 59 60 61# Timeout in seconds for tests using a network server listening on the network 62# local loopback interface like 127.0.0.1. 63# 64# The timeout is long enough to prevent test failure: it takes into account 65# that the client and the server can run in different threads or even different 66# processes. 67# 68# The timeout should be long enough for connect(), recv() and send() methods 69# of socket.socket. 70LOOPBACK_TIMEOUT = 5.0 71if sys.platform == 'win32' and ' 32 bit (ARM)' in sys.version: 72 # bpo-37553: test_socket.SendfileUsingSendTest is taking longer than 2 73 # seconds on Windows ARM32 buildbot 74 LOOPBACK_TIMEOUT = 10 75elif sys.platform == 'vxworks': 76 LOOPBACK_TIMEOUT = 10 77 78# Timeout in seconds for network requests going to the internet. The timeout is 79# short enough to prevent a test to wait for too long if the internet request 80# is blocked for whatever reason. 81# 82# Usually, a timeout using INTERNET_TIMEOUT should not mark a test as failed, 83# but skip the test instead: see transient_internet(). 84INTERNET_TIMEOUT = 60.0 85 86# Timeout in seconds to mark a test as failed if the test takes "too long". 87# 88# The timeout value depends on the regrtest --timeout command line option. 89# 90# If a test using SHORT_TIMEOUT starts to fail randomly on slow buildbots, use 91# LONG_TIMEOUT instead. 92SHORT_TIMEOUT = 30.0 93 94# Timeout in seconds to detect when a test hangs. 95# 96# It is long enough to reduce the risk of test failure on the slowest Python 97# buildbots. It should not be used to mark a test as failed if the test takes 98# "too long". The timeout value depends on the regrtest --timeout command line 99# option. 100LONG_TIMEOUT = 5 * 60.0 101 102# TEST_HOME_DIR refers to the top level directory of the "test" package 103# that contains Python's regression test suite 104TEST_SUPPORT_DIR = os.path.dirname(os.path.abspath(__file__)) 105TEST_HOME_DIR = os.path.dirname(TEST_SUPPORT_DIR) 106STDLIB_DIR = os.path.dirname(TEST_HOME_DIR) 107REPO_ROOT = os.path.dirname(STDLIB_DIR) 108 109 110class Error(Exception): 111 """Base class for regression test exceptions.""" 112 113class TestFailed(Error): 114 """Test failed.""" 115 116class TestFailedWithDetails(TestFailed): 117 """Test failed.""" 118 def __init__(self, msg, errors, failures): 119 self.msg = msg 120 self.errors = errors 121 self.failures = failures 122 super().__init__(msg, errors, failures) 123 124 def __str__(self): 125 return self.msg 126 127class TestDidNotRun(Error): 128 """Test did not run any subtests.""" 129 130class ResourceDenied(unittest.SkipTest): 131 """Test skipped because it requested a disallowed resource. 132 133 This is raised when a test calls requires() for a resource that 134 has not be enabled. It is used to distinguish between expected 135 and unexpected skips. 136 """ 137 138def anticipate_failure(condition): 139 """Decorator to mark a test that is known to be broken in some cases 140 141 Any use of this decorator should have a comment identifying the 142 associated tracker issue. 143 """ 144 if condition: 145 return unittest.expectedFailure 146 return lambda f: f 147 148def load_package_tests(pkg_dir, loader, standard_tests, pattern): 149 """Generic load_tests implementation for simple test packages. 150 151 Most packages can implement load_tests using this function as follows: 152 153 def load_tests(*args): 154 return load_package_tests(os.path.dirname(__file__), *args) 155 """ 156 if pattern is None: 157 pattern = "test*" 158 top_dir = STDLIB_DIR 159 package_tests = loader.discover(start_dir=pkg_dir, 160 top_level_dir=top_dir, 161 pattern=pattern) 162 standard_tests.addTests(package_tests) 163 return standard_tests 164 165 166def get_attribute(obj, name): 167 """Get an attribute, raising SkipTest if AttributeError is raised.""" 168 try: 169 attribute = getattr(obj, name) 170 except AttributeError: 171 raise unittest.SkipTest("object %r has no attribute %r" % (obj, name)) 172 else: 173 return attribute 174 175verbose = 1 # Flag set to 0 by regrtest.py 176use_resources = None # Flag set to [] by regrtest.py 177max_memuse = 0 # Disable bigmem tests (they will still be run with 178 # small sizes, to make sure they work.) 179real_max_memuse = 0 180junit_xml_list = None # list of testsuite XML elements 181failfast = False 182 183# _original_stdout is meant to hold stdout at the time regrtest began. 184# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever. 185# The point is to have some flavor of stdout the user can actually see. 186_original_stdout = None 187def record_original_stdout(stdout): 188 global _original_stdout 189 _original_stdout = stdout 190 191def get_original_stdout(): 192 return _original_stdout or sys.stdout 193 194 195def _force_run(path, func, *args): 196 try: 197 return func(*args) 198 except OSError as err: 199 if verbose >= 2: 200 print('%s: %s' % (err.__class__.__name__, err)) 201 print('re-run %s%r' % (func.__name__, args)) 202 os.chmod(path, stat.S_IRWXU) 203 return func(*args) 204 205 206# Check whether a gui is actually available 207def _is_gui_available(): 208 if hasattr(_is_gui_available, 'result'): 209 return _is_gui_available.result 210 import platform 211 reason = None 212 if sys.platform.startswith('win') and platform.win32_is_iot(): 213 reason = "gui is not available on Windows IoT Core" 214 elif sys.platform.startswith('win'): 215 # if Python is running as a service (such as the buildbot service), 216 # gui interaction may be disallowed 217 import ctypes 218 import ctypes.wintypes 219 UOI_FLAGS = 1 220 WSF_VISIBLE = 0x0001 221 class USEROBJECTFLAGS(ctypes.Structure): 222 _fields_ = [("fInherit", ctypes.wintypes.BOOL), 223 ("fReserved", ctypes.wintypes.BOOL), 224 ("dwFlags", ctypes.wintypes.DWORD)] 225 dll = ctypes.windll.user32 226 h = dll.GetProcessWindowStation() 227 if not h: 228 raise ctypes.WinError() 229 uof = USEROBJECTFLAGS() 230 needed = ctypes.wintypes.DWORD() 231 res = dll.GetUserObjectInformationW(h, 232 UOI_FLAGS, 233 ctypes.byref(uof), 234 ctypes.sizeof(uof), 235 ctypes.byref(needed)) 236 if not res: 237 raise ctypes.WinError() 238 if not bool(uof.dwFlags & WSF_VISIBLE): 239 reason = "gui not available (WSF_VISIBLE flag not set)" 240 elif sys.platform == 'darwin': 241 # The Aqua Tk implementations on OS X can abort the process if 242 # being called in an environment where a window server connection 243 # cannot be made, for instance when invoked by a buildbot or ssh 244 # process not running under the same user id as the current console 245 # user. To avoid that, raise an exception if the window manager 246 # connection is not available. 247 from ctypes import cdll, c_int, pointer, Structure 248 from ctypes.util import find_library 249 250 app_services = cdll.LoadLibrary(find_library("ApplicationServices")) 251 252 if app_services.CGMainDisplayID() == 0: 253 reason = "gui tests cannot run without OS X window manager" 254 else: 255 class ProcessSerialNumber(Structure): 256 _fields_ = [("highLongOfPSN", c_int), 257 ("lowLongOfPSN", c_int)] 258 psn = ProcessSerialNumber() 259 psn_p = pointer(psn) 260 if ( (app_services.GetCurrentProcess(psn_p) < 0) or 261 (app_services.SetFrontProcess(psn_p) < 0) ): 262 reason = "cannot run without OS X gui process" 263 264 # check on every platform whether tkinter can actually do anything 265 if not reason: 266 try: 267 from tkinter import Tk 268 root = Tk() 269 root.withdraw() 270 root.update() 271 root.destroy() 272 except Exception as e: 273 err_string = str(e) 274 if len(err_string) > 50: 275 err_string = err_string[:50] + ' [...]' 276 reason = 'Tk unavailable due to {}: {}'.format(type(e).__name__, 277 err_string) 278 279 _is_gui_available.reason = reason 280 _is_gui_available.result = not reason 281 282 return _is_gui_available.result 283 284def is_resource_enabled(resource): 285 """Test whether a resource is enabled. 286 287 Known resources are set by regrtest.py. If not running under regrtest.py, 288 all resources are assumed enabled unless use_resources has been set. 289 """ 290 return use_resources is None or resource in use_resources 291 292def requires(resource, msg=None): 293 """Raise ResourceDenied if the specified resource is not available.""" 294 if not is_resource_enabled(resource): 295 if msg is None: 296 msg = "Use of the %r resource not enabled" % resource 297 raise ResourceDenied(msg) 298 if resource == 'gui' and not _is_gui_available(): 299 raise ResourceDenied(_is_gui_available.reason) 300 301def _requires_unix_version(sysname, min_version): 302 """Decorator raising SkipTest if the OS is `sysname` and the version is less 303 than `min_version`. 304 305 For example, @_requires_unix_version('FreeBSD', (7, 2)) raises SkipTest if 306 the FreeBSD version is less than 7.2. 307 """ 308 import platform 309 min_version_txt = '.'.join(map(str, min_version)) 310 version_txt = platform.release().split('-', 1)[0] 311 if platform.system() == sysname: 312 try: 313 version = tuple(map(int, version_txt.split('.'))) 314 except ValueError: 315 skip = False 316 else: 317 skip = version < min_version 318 else: 319 skip = False 320 321 return unittest.skipIf( 322 skip, 323 f"{sysname} version {min_version_txt} or higher required, not " 324 f"{version_txt}" 325 ) 326 327 328def requires_freebsd_version(*min_version): 329 """Decorator raising SkipTest if the OS is FreeBSD and the FreeBSD version is 330 less than `min_version`. 331 332 For example, @requires_freebsd_version(7, 2) raises SkipTest if the FreeBSD 333 version is less than 7.2. 334 """ 335 return _requires_unix_version('FreeBSD', min_version) 336 337def requires_linux_version(*min_version): 338 """Decorator raising SkipTest if the OS is Linux and the Linux version is 339 less than `min_version`. 340 341 For example, @requires_linux_version(2, 6, 32) raises SkipTest if the Linux 342 version is less than 2.6.32. 343 """ 344 return _requires_unix_version('Linux', min_version) 345 346def requires_mac_ver(*min_version): 347 """Decorator raising SkipTest if the OS is Mac OS X and the OS X 348 version if less than min_version. 349 350 For example, @requires_mac_ver(10, 5) raises SkipTest if the OS X version 351 is lesser than 10.5. 352 """ 353 def decorator(func): 354 @functools.wraps(func) 355 def wrapper(*args, **kw): 356 if sys.platform == 'darwin': 357 import platform 358 version_txt = platform.mac_ver()[0] 359 try: 360 version = tuple(map(int, version_txt.split('.'))) 361 except ValueError: 362 pass 363 else: 364 if version < min_version: 365 min_version_txt = '.'.join(map(str, min_version)) 366 raise unittest.SkipTest( 367 "Mac OS X %s or higher required, not %s" 368 % (min_version_txt, version_txt)) 369 return func(*args, **kw) 370 wrapper.min_version = min_version 371 return wrapper 372 return decorator 373 374 375def skip_if_buildbot(reason=None): 376 """Decorator raising SkipTest if running on a buildbot.""" 377 if not reason: 378 reason = 'not suitable for buildbots' 379 if sys.platform == 'win32': 380 isbuildbot = os.environ.get('USERNAME') == 'Buildbot' 381 else: 382 isbuildbot = os.environ.get('USER') == 'buildbot' 383 return unittest.skipIf(isbuildbot, reason) 384 385 386def system_must_validate_cert(f): 387 """Skip the test on TLS certificate validation failures.""" 388 @functools.wraps(f) 389 def dec(*args, **kwargs): 390 try: 391 f(*args, **kwargs) 392 except OSError as e: 393 if "CERTIFICATE_VERIFY_FAILED" in str(e): 394 raise unittest.SkipTest("system does not contain " 395 "necessary certificates") 396 raise 397 return dec 398 399# A constant likely larger than the underlying OS pipe buffer size, to 400# make writes blocking. 401# Windows limit seems to be around 512 B, and many Unix kernels have a 402# 64 KiB pipe buffer size or 16 * PAGE_SIZE: take a few megs to be sure. 403# (see issue #17835 for a discussion of this number). 404PIPE_MAX_SIZE = 4 * 1024 * 1024 + 1 405 406# A constant likely larger than the underlying OS socket buffer size, to make 407# writes blocking. 408# The socket buffer sizes can usually be tuned system-wide (e.g. through sysctl 409# on Linux), or on a per-socket basis (SO_SNDBUF/SO_RCVBUF). See issue #18643 410# for a discussion of this number. 411SOCK_MAX_SIZE = 16 * 1024 * 1024 + 1 412 413# decorator for skipping tests on non-IEEE 754 platforms 414requires_IEEE_754 = unittest.skipUnless( 415 float.__getformat__("double").startswith("IEEE"), 416 "test requires IEEE 754 doubles") 417 418def requires_zlib(reason='requires zlib'): 419 try: 420 import zlib 421 except ImportError: 422 zlib = None 423 return unittest.skipUnless(zlib, reason) 424 425def requires_gzip(reason='requires gzip'): 426 try: 427 import gzip 428 except ImportError: 429 gzip = None 430 return unittest.skipUnless(gzip, reason) 431 432def requires_bz2(reason='requires bz2'): 433 try: 434 import bz2 435 except ImportError: 436 bz2 = None 437 return unittest.skipUnless(bz2, reason) 438 439def requires_lzma(reason='requires lzma'): 440 try: 441 import lzma 442 except ImportError: 443 lzma = None 444 return unittest.skipUnless(lzma, reason) 445 446def has_no_debug_ranges(): 447 try: 448 import _testinternalcapi 449 except ImportError: 450 raise unittest.SkipTest("_testinternalcapi required") 451 config = _testinternalcapi.get_config() 452 return not bool(config['code_debug_ranges']) 453 454def requires_debug_ranges(reason='requires co_positions / debug_ranges'): 455 return unittest.skipIf(has_no_debug_ranges(), reason) 456 457requires_legacy_unicode_capi = unittest.skipUnless(unicode_legacy_string, 458 'requires legacy Unicode C API') 459 460is_jython = sys.platform.startswith('java') 461 462is_android = hasattr(sys, 'getandroidapilevel') 463 464if sys.platform not in ('win32', 'vxworks'): 465 unix_shell = '/system/bin/sh' if is_android else '/bin/sh' 466else: 467 unix_shell = None 468 469# Define the URL of a dedicated HTTP server for the network tests. 470# The URL must use clear-text HTTP: no redirection to encrypted HTTPS. 471TEST_HTTP_URL = "http://www.pythontest.net" 472 473# Set by libregrtest/main.py so we can skip tests that are not 474# useful for PGO 475PGO = False 476 477# Set by libregrtest/main.py if we are running the extended (time consuming) 478# PGO task. If this is True, PGO is also True. 479PGO_EXTENDED = False 480 481# TEST_DATA_DIR is used as a target download location for remote resources 482TEST_DATA_DIR = os.path.join(TEST_HOME_DIR, "data") 483 484 485def darwin_malloc_err_warning(test_name): 486 """Assure user that loud errors generated by macOS libc's malloc are 487 expected.""" 488 if sys.platform != 'darwin': 489 return 490 491 import shutil 492 msg = ' NOTICE ' 493 detail = (f'{test_name} may generate "malloc can\'t allocate region"\n' 494 'warnings on macOS systems. This behavior is known. Do not\n' 495 'report a bug unless tests are also failing. See bpo-40928.') 496 497 padding, _ = shutil.get_terminal_size() 498 print(msg.center(padding, '-')) 499 print(detail) 500 print('-' * padding) 501 502 503def findfile(filename, subdir=None): 504 """Try to find a file on sys.path or in the test directory. If it is not 505 found the argument passed to the function is returned (this does not 506 necessarily signal failure; could still be the legitimate path). 507 508 Setting *subdir* indicates a relative path to use to find the file 509 rather than looking directly in the path directories. 510 """ 511 if os.path.isabs(filename): 512 return filename 513 if subdir is not None: 514 filename = os.path.join(subdir, filename) 515 path = [TEST_HOME_DIR] + sys.path 516 for dn in path: 517 fn = os.path.join(dn, filename) 518 if os.path.exists(fn): return fn 519 return filename 520 521 522def sortdict(dict): 523 "Like repr(dict), but in sorted order." 524 items = sorted(dict.items()) 525 reprpairs = ["%r: %r" % pair for pair in items] 526 withcommas = ", ".join(reprpairs) 527 return "{%s}" % withcommas 528 529def check_syntax_error(testcase, statement, errtext='', *, lineno=None, offset=None): 530 with testcase.assertRaisesRegex(SyntaxError, errtext) as cm: 531 compile(statement, '<test string>', 'exec') 532 err = cm.exception 533 testcase.assertIsNotNone(err.lineno) 534 if lineno is not None: 535 testcase.assertEqual(err.lineno, lineno) 536 testcase.assertIsNotNone(err.offset) 537 if offset is not None: 538 testcase.assertEqual(err.offset, offset) 539 540 541def open_urlresource(url, *args, **kw): 542 import urllib.request, urllib.parse 543 from .os_helper import unlink 544 try: 545 import gzip 546 except ImportError: 547 gzip = None 548 549 check = kw.pop('check', None) 550 551 filename = urllib.parse.urlparse(url)[2].split('/')[-1] # '/': it's URL! 552 553 fn = os.path.join(TEST_DATA_DIR, filename) 554 555 def check_valid_file(fn): 556 f = open(fn, *args, **kw) 557 if check is None: 558 return f 559 elif check(f): 560 f.seek(0) 561 return f 562 f.close() 563 564 if os.path.exists(fn): 565 f = check_valid_file(fn) 566 if f is not None: 567 return f 568 unlink(fn) 569 570 # Verify the requirement before downloading the file 571 requires('urlfetch') 572 573 if verbose: 574 print('\tfetching %s ...' % url, file=get_original_stdout()) 575 opener = urllib.request.build_opener() 576 if gzip: 577 opener.addheaders.append(('Accept-Encoding', 'gzip')) 578 f = opener.open(url, timeout=INTERNET_TIMEOUT) 579 if gzip and f.headers.get('Content-Encoding') == 'gzip': 580 f = gzip.GzipFile(fileobj=f) 581 try: 582 with open(fn, "wb") as out: 583 s = f.read() 584 while s: 585 out.write(s) 586 s = f.read() 587 finally: 588 f.close() 589 590 f = check_valid_file(fn) 591 if f is not None: 592 return f 593 raise TestFailed('invalid resource %r' % fn) 594 595 596@contextlib.contextmanager 597def captured_output(stream_name): 598 """Return a context manager used by captured_stdout/stdin/stderr 599 that temporarily replaces the sys stream *stream_name* with a StringIO.""" 600 import io 601 orig_stdout = getattr(sys, stream_name) 602 setattr(sys, stream_name, io.StringIO()) 603 try: 604 yield getattr(sys, stream_name) 605 finally: 606 setattr(sys, stream_name, orig_stdout) 607 608def captured_stdout(): 609 """Capture the output of sys.stdout: 610 611 with captured_stdout() as stdout: 612 print("hello") 613 self.assertEqual(stdout.getvalue(), "hello\\n") 614 """ 615 return captured_output("stdout") 616 617def captured_stderr(): 618 """Capture the output of sys.stderr: 619 620 with captured_stderr() as stderr: 621 print("hello", file=sys.stderr) 622 self.assertEqual(stderr.getvalue(), "hello\\n") 623 """ 624 return captured_output("stderr") 625 626def captured_stdin(): 627 """Capture the input to sys.stdin: 628 629 with captured_stdin() as stdin: 630 stdin.write('hello\\n') 631 stdin.seek(0) 632 # call test code that consumes from sys.stdin 633 captured = input() 634 self.assertEqual(captured, "hello") 635 """ 636 return captured_output("stdin") 637 638 639def gc_collect(): 640 """Force as many objects as possible to be collected. 641 642 In non-CPython implementations of Python, this is needed because timely 643 deallocation is not guaranteed by the garbage collector. (Even in CPython 644 this can be the case in case of reference cycles.) This means that __del__ 645 methods may be called later than expected and weakrefs may remain alive for 646 longer than expected. This function tries its best to force all garbage 647 objects to disappear. 648 """ 649 import gc 650 gc.collect() 651 if is_jython: 652 time.sleep(0.1) 653 gc.collect() 654 gc.collect() 655 656@contextlib.contextmanager 657def disable_gc(): 658 import gc 659 have_gc = gc.isenabled() 660 gc.disable() 661 try: 662 yield 663 finally: 664 if have_gc: 665 gc.enable() 666 667 668def python_is_optimized(): 669 """Find if Python was built with optimizations.""" 670 cflags = sysconfig.get_config_var('PY_CFLAGS') or '' 671 final_opt = "" 672 for opt in cflags.split(): 673 if opt.startswith('-O'): 674 final_opt = opt 675 return final_opt not in ('', '-O0', '-Og') 676 677 678_header = 'nP' 679_align = '0n' 680if hasattr(sys, "getobjects"): 681 _header = '2P' + _header 682 _align = '0P' 683_vheader = _header + 'n' 684 685def calcobjsize(fmt): 686 import struct 687 return struct.calcsize(_header + fmt + _align) 688 689def calcvobjsize(fmt): 690 import struct 691 return struct.calcsize(_vheader + fmt + _align) 692 693 694_TPFLAGS_HAVE_GC = 1<<14 695_TPFLAGS_HEAPTYPE = 1<<9 696 697def check_sizeof(test, o, size): 698 try: 699 import _testinternalcapi 700 except ImportError: 701 raise unittest.SkipTest("_testinternalcapi required") 702 result = sys.getsizeof(o) 703 # add GC header size 704 if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\ 705 ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))): 706 size += _testinternalcapi.SIZEOF_PYGC_HEAD 707 msg = 'wrong size for %s: got %d, expected %d' \ 708 % (type(o), result, size) 709 test.assertEqual(result, size, msg) 710 711#======================================================================= 712# Decorator for running a function in a different locale, correctly resetting 713# it afterwards. 714 715@contextlib.contextmanager 716def run_with_locale(catstr, *locales): 717 try: 718 import locale 719 category = getattr(locale, catstr) 720 orig_locale = locale.setlocale(category) 721 except AttributeError: 722 # if the test author gives us an invalid category string 723 raise 724 except: 725 # cannot retrieve original locale, so do nothing 726 locale = orig_locale = None 727 else: 728 for loc in locales: 729 try: 730 locale.setlocale(category, loc) 731 break 732 except: 733 pass 734 735 try: 736 yield 737 finally: 738 if locale and orig_locale: 739 locale.setlocale(category, orig_locale) 740 741#======================================================================= 742# Decorator for running a function in a specific timezone, correctly 743# resetting it afterwards. 744 745def run_with_tz(tz): 746 def decorator(func): 747 def inner(*args, **kwds): 748 try: 749 tzset = time.tzset 750 except AttributeError: 751 raise unittest.SkipTest("tzset required") 752 if 'TZ' in os.environ: 753 orig_tz = os.environ['TZ'] 754 else: 755 orig_tz = None 756 os.environ['TZ'] = tz 757 tzset() 758 759 # now run the function, resetting the tz on exceptions 760 try: 761 return func(*args, **kwds) 762 finally: 763 if orig_tz is None: 764 del os.environ['TZ'] 765 else: 766 os.environ['TZ'] = orig_tz 767 time.tzset() 768 769 inner.__name__ = func.__name__ 770 inner.__doc__ = func.__doc__ 771 return inner 772 return decorator 773 774#======================================================================= 775# Big-memory-test support. Separate from 'resources' because memory use 776# should be configurable. 777 778# Some handy shorthands. Note that these are used for byte-limits as well 779# as size-limits, in the various bigmem tests 780_1M = 1024*1024 781_1G = 1024 * _1M 782_2G = 2 * _1G 783_4G = 4 * _1G 784 785MAX_Py_ssize_t = sys.maxsize 786 787def set_memlimit(limit): 788 global max_memuse 789 global real_max_memuse 790 sizes = { 791 'k': 1024, 792 'm': _1M, 793 'g': _1G, 794 't': 1024*_1G, 795 } 796 m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit, 797 re.IGNORECASE | re.VERBOSE) 798 if m is None: 799 raise ValueError('Invalid memory limit %r' % (limit,)) 800 memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()]) 801 real_max_memuse = memlimit 802 if memlimit > MAX_Py_ssize_t: 803 memlimit = MAX_Py_ssize_t 804 if memlimit < _2G - 1: 805 raise ValueError('Memory limit %r too low to be useful' % (limit,)) 806 max_memuse = memlimit 807 808class _MemoryWatchdog: 809 """An object which periodically watches the process' memory consumption 810 and prints it out. 811 """ 812 813 def __init__(self): 814 self.procfile = '/proc/{pid}/statm'.format(pid=os.getpid()) 815 self.started = False 816 817 def start(self): 818 import warnings 819 try: 820 f = open(self.procfile, 'r') 821 except OSError as e: 822 warnings.warn('/proc not available for stats: {}'.format(e), 823 RuntimeWarning) 824 sys.stderr.flush() 825 return 826 827 import subprocess 828 with f: 829 watchdog_script = findfile("memory_watchdog.py") 830 self.mem_watchdog = subprocess.Popen([sys.executable, watchdog_script], 831 stdin=f, 832 stderr=subprocess.DEVNULL) 833 self.started = True 834 835 def stop(self): 836 if self.started: 837 self.mem_watchdog.terminate() 838 self.mem_watchdog.wait() 839 840 841def bigmemtest(size, memuse, dry_run=True): 842 """Decorator for bigmem tests. 843 844 'size' is a requested size for the test (in arbitrary, test-interpreted 845 units.) 'memuse' is the number of bytes per unit for the test, or a good 846 estimate of it. For example, a test that needs two byte buffers, of 4 GiB 847 each, could be decorated with @bigmemtest(size=_4G, memuse=2). 848 849 The 'size' argument is normally passed to the decorated test method as an 850 extra argument. If 'dry_run' is true, the value passed to the test method 851 may be less than the requested value. If 'dry_run' is false, it means the 852 test doesn't support dummy runs when -M is not specified. 853 """ 854 def decorator(f): 855 def wrapper(self): 856 size = wrapper.size 857 memuse = wrapper.memuse 858 if not real_max_memuse: 859 maxsize = 5147 860 else: 861 maxsize = size 862 863 if ((real_max_memuse or not dry_run) 864 and real_max_memuse < maxsize * memuse): 865 raise unittest.SkipTest( 866 "not enough memory: %.1fG minimum needed" 867 % (size * memuse / (1024 ** 3))) 868 869 if real_max_memuse and verbose: 870 print() 871 print(" ... expected peak memory use: {peak:.1f}G" 872 .format(peak=size * memuse / (1024 ** 3))) 873 watchdog = _MemoryWatchdog() 874 watchdog.start() 875 else: 876 watchdog = None 877 878 try: 879 return f(self, maxsize) 880 finally: 881 if watchdog: 882 watchdog.stop() 883 884 wrapper.size = size 885 wrapper.memuse = memuse 886 return wrapper 887 return decorator 888 889def bigaddrspacetest(f): 890 """Decorator for tests that fill the address space.""" 891 def wrapper(self): 892 if max_memuse < MAX_Py_ssize_t: 893 if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31: 894 raise unittest.SkipTest( 895 "not enough memory: try a 32-bit build instead") 896 else: 897 raise unittest.SkipTest( 898 "not enough memory: %.1fG minimum needed" 899 % (MAX_Py_ssize_t / (1024 ** 3))) 900 else: 901 return f(self) 902 return wrapper 903 904#======================================================================= 905# unittest integration. 906 907class BasicTestRunner: 908 def run(self, test): 909 result = unittest.TestResult() 910 test(result) 911 return result 912 913def _id(obj): 914 return obj 915 916def requires_resource(resource): 917 if resource == 'gui' and not _is_gui_available(): 918 return unittest.skip(_is_gui_available.reason) 919 if is_resource_enabled(resource): 920 return _id 921 else: 922 return unittest.skip("resource {0!r} is not enabled".format(resource)) 923 924def cpython_only(test): 925 """ 926 Decorator for tests only applicable on CPython. 927 """ 928 return impl_detail(cpython=True)(test) 929 930def impl_detail(msg=None, **guards): 931 if check_impl_detail(**guards): 932 return _id 933 if msg is None: 934 guardnames, default = _parse_guards(guards) 935 if default: 936 msg = "implementation detail not available on {0}" 937 else: 938 msg = "implementation detail specific to {0}" 939 guardnames = sorted(guardnames.keys()) 940 msg = msg.format(' or '.join(guardnames)) 941 return unittest.skip(msg) 942 943def _parse_guards(guards): 944 # Returns a tuple ({platform_name: run_me}, default_value) 945 if not guards: 946 return ({'cpython': True}, False) 947 is_true = list(guards.values())[0] 948 assert list(guards.values()) == [is_true] * len(guards) # all True or all False 949 return (guards, not is_true) 950 951# Use the following check to guard CPython's implementation-specific tests -- 952# or to run them only on the implementation(s) guarded by the arguments. 953def check_impl_detail(**guards): 954 """This function returns True or False depending on the host platform. 955 Examples: 956 if check_impl_detail(): # only on CPython (default) 957 if check_impl_detail(jython=True): # only on Jython 958 if check_impl_detail(cpython=False): # everywhere except on CPython 959 """ 960 guards, default = _parse_guards(guards) 961 return guards.get(sys.implementation.name, default) 962 963 964def no_tracing(func): 965 """Decorator to temporarily turn off tracing for the duration of a test.""" 966 if not hasattr(sys, 'gettrace'): 967 return func 968 else: 969 @functools.wraps(func) 970 def wrapper(*args, **kwargs): 971 original_trace = sys.gettrace() 972 try: 973 sys.settrace(None) 974 return func(*args, **kwargs) 975 finally: 976 sys.settrace(original_trace) 977 return wrapper 978 979 980def refcount_test(test): 981 """Decorator for tests which involve reference counting. 982 983 To start, the decorator does not run the test if is not run by CPython. 984 After that, any trace function is unset during the test to prevent 985 unexpected refcounts caused by the trace function. 986 987 """ 988 return no_tracing(cpython_only(test)) 989 990 991def _filter_suite(suite, pred): 992 """Recursively filter test cases in a suite based on a predicate.""" 993 newtests = [] 994 for test in suite._tests: 995 if isinstance(test, unittest.TestSuite): 996 _filter_suite(test, pred) 997 newtests.append(test) 998 else: 999 if pred(test): 1000 newtests.append(test) 1001 suite._tests = newtests 1002 1003def _run_suite(suite): 1004 """Run tests from a unittest.TestSuite-derived class.""" 1005 runner = get_test_runner(sys.stdout, 1006 verbosity=verbose, 1007 capture_output=(junit_xml_list is not None)) 1008 1009 result = runner.run(suite) 1010 1011 if junit_xml_list is not None: 1012 junit_xml_list.append(result.get_xml_element()) 1013 1014 if not result.testsRun and not result.skipped: 1015 raise TestDidNotRun 1016 if not result.wasSuccessful(): 1017 if len(result.errors) == 1 and not result.failures: 1018 err = result.errors[0][1] 1019 elif len(result.failures) == 1 and not result.errors: 1020 err = result.failures[0][1] 1021 else: 1022 err = "multiple errors occurred" 1023 if not verbose: err += "; run in verbose mode for details" 1024 errors = [(str(tc), exc_str) for tc, exc_str in result.errors] 1025 failures = [(str(tc), exc_str) for tc, exc_str in result.failures] 1026 raise TestFailedWithDetails(err, errors, failures) 1027 1028 1029# By default, don't filter tests 1030_match_test_func = None 1031 1032_accept_test_patterns = None 1033_ignore_test_patterns = None 1034 1035 1036def match_test(test): 1037 # Function used by support.run_unittest() and regrtest --list-cases 1038 if _match_test_func is None: 1039 return True 1040 else: 1041 return _match_test_func(test.id()) 1042 1043 1044def _is_full_match_test(pattern): 1045 # If a pattern contains at least one dot, it's considered 1046 # as a full test identifier. 1047 # Example: 'test.test_os.FileTests.test_access'. 1048 # 1049 # ignore patterns which contain fnmatch patterns: '*', '?', '[...]' 1050 # or '[!...]'. For example, ignore 'test_access*'. 1051 return ('.' in pattern) and (not re.search(r'[?*\[\]]', pattern)) 1052 1053 1054def set_match_tests(accept_patterns=None, ignore_patterns=None): 1055 global _match_test_func, _accept_test_patterns, _ignore_test_patterns 1056 1057 1058 if accept_patterns is None: 1059 accept_patterns = () 1060 if ignore_patterns is None: 1061 ignore_patterns = () 1062 1063 accept_func = ignore_func = None 1064 1065 if accept_patterns != _accept_test_patterns: 1066 accept_patterns, accept_func = _compile_match_function(accept_patterns) 1067 if ignore_patterns != _ignore_test_patterns: 1068 ignore_patterns, ignore_func = _compile_match_function(ignore_patterns) 1069 1070 # Create a copy since patterns can be mutable and so modified later 1071 _accept_test_patterns = tuple(accept_patterns) 1072 _ignore_test_patterns = tuple(ignore_patterns) 1073 1074 if accept_func is not None or ignore_func is not None: 1075 def match_function(test_id): 1076 accept = True 1077 ignore = False 1078 if accept_func: 1079 accept = accept_func(test_id) 1080 if ignore_func: 1081 ignore = ignore_func(test_id) 1082 return accept and not ignore 1083 1084 _match_test_func = match_function 1085 1086 1087def _compile_match_function(patterns): 1088 if not patterns: 1089 func = None 1090 # set_match_tests(None) behaves as set_match_tests(()) 1091 patterns = () 1092 elif all(map(_is_full_match_test, patterns)): 1093 # Simple case: all patterns are full test identifier. 1094 # The test.bisect_cmd utility only uses such full test identifiers. 1095 func = set(patterns).__contains__ 1096 else: 1097 import fnmatch 1098 regex = '|'.join(map(fnmatch.translate, patterns)) 1099 # The search *is* case sensitive on purpose: 1100 # don't use flags=re.IGNORECASE 1101 regex_match = re.compile(regex).match 1102 1103 def match_test_regex(test_id): 1104 if regex_match(test_id): 1105 # The regex matches the whole identifier, for example 1106 # 'test.test_os.FileTests.test_access'. 1107 return True 1108 else: 1109 # Try to match parts of the test identifier. 1110 # For example, split 'test.test_os.FileTests.test_access' 1111 # into: 'test', 'test_os', 'FileTests' and 'test_access'. 1112 return any(map(regex_match, test_id.split("."))) 1113 1114 func = match_test_regex 1115 1116 return patterns, func 1117 1118 1119def run_unittest(*classes): 1120 """Run tests from unittest.TestCase-derived classes.""" 1121 valid_types = (unittest.TestSuite, unittest.TestCase) 1122 loader = unittest.TestLoader() 1123 suite = unittest.TestSuite() 1124 for cls in classes: 1125 if isinstance(cls, str): 1126 if cls in sys.modules: 1127 suite.addTest(loader.loadTestsFromModule(sys.modules[cls])) 1128 else: 1129 raise ValueError("str arguments must be keys in sys.modules") 1130 elif isinstance(cls, valid_types): 1131 suite.addTest(cls) 1132 else: 1133 suite.addTest(loader.loadTestsFromTestCase(cls)) 1134 _filter_suite(suite, match_test) 1135 _run_suite(suite) 1136 1137#======================================================================= 1138# Check for the presence of docstrings. 1139 1140# Rather than trying to enumerate all the cases where docstrings may be 1141# disabled, we just check for that directly 1142 1143def _check_docstrings(): 1144 """Just used to check if docstrings are enabled""" 1145 1146MISSING_C_DOCSTRINGS = (check_impl_detail() and 1147 sys.platform != 'win32' and 1148 not sysconfig.get_config_var('WITH_DOC_STRINGS')) 1149 1150HAVE_DOCSTRINGS = (_check_docstrings.__doc__ is not None and 1151 not MISSING_C_DOCSTRINGS) 1152 1153requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS, 1154 "test requires docstrings") 1155 1156 1157#======================================================================= 1158# doctest driver. 1159 1160def run_doctest(module, verbosity=None, optionflags=0): 1161 """Run doctest on the given module. Return (#failures, #tests). 1162 1163 If optional argument verbosity is not specified (or is None), pass 1164 support's belief about verbosity on to doctest. Else doctest's 1165 usual behavior is used (it searches sys.argv for -v). 1166 """ 1167 1168 import doctest 1169 1170 if verbosity is None: 1171 verbosity = verbose 1172 else: 1173 verbosity = None 1174 1175 f, t = doctest.testmod(module, verbose=verbosity, optionflags=optionflags) 1176 if f: 1177 raise TestFailed("%d of %d doctests failed" % (f, t)) 1178 if verbose: 1179 print('doctest (%s) ... %d tests with zero failures' % 1180 (module.__name__, t)) 1181 return f, t 1182 1183 1184#======================================================================= 1185# Support for saving and restoring the imported modules. 1186 1187def flush_std_streams(): 1188 if sys.stdout is not None: 1189 sys.stdout.flush() 1190 if sys.stderr is not None: 1191 sys.stderr.flush() 1192 1193 1194def print_warning(msg): 1195 # bpo-45410: Explicitly flush stdout to keep logs in order 1196 flush_std_streams() 1197 stream = print_warning.orig_stderr 1198 for line in msg.splitlines(): 1199 print(f"Warning -- {line}", file=stream) 1200 stream.flush() 1201 1202# bpo-39983: Store the original sys.stderr at Python startup to be able to 1203# log warnings even if sys.stderr is captured temporarily by a test. 1204print_warning.orig_stderr = sys.stderr 1205 1206 1207# Flag used by saved_test_environment of test.libregrtest.save_env, 1208# to check if a test modified the environment. The flag should be set to False 1209# before running a new test. 1210# 1211# For example, threading_helper.threading_cleanup() sets the flag is the function fails 1212# to cleanup threads. 1213environment_altered = False 1214 1215def reap_children(): 1216 """Use this function at the end of test_main() whenever sub-processes 1217 are started. This will help ensure that no extra children (zombies) 1218 stick around to hog resources and create problems when looking 1219 for refleaks. 1220 """ 1221 global environment_altered 1222 1223 # Need os.waitpid(-1, os.WNOHANG): Windows is not supported 1224 if not (hasattr(os, 'waitpid') and hasattr(os, 'WNOHANG')): 1225 return 1226 1227 # Reap all our dead child processes so we don't leave zombies around. 1228 # These hog resources and might be causing some of the buildbots to die. 1229 while True: 1230 try: 1231 # Read the exit status of any child process which already completed 1232 pid, status = os.waitpid(-1, os.WNOHANG) 1233 except OSError: 1234 break 1235 1236 if pid == 0: 1237 break 1238 1239 print_warning(f"reap_children() reaped child process {pid}") 1240 environment_altered = True 1241 1242 1243@contextlib.contextmanager 1244def swap_attr(obj, attr, new_val): 1245 """Temporary swap out an attribute with a new object. 1246 1247 Usage: 1248 with swap_attr(obj, "attr", 5): 1249 ... 1250 1251 This will set obj.attr to 5 for the duration of the with: block, 1252 restoring the old value at the end of the block. If `attr` doesn't 1253 exist on `obj`, it will be created and then deleted at the end of the 1254 block. 1255 1256 The old value (or None if it doesn't exist) will be assigned to the 1257 target of the "as" clause, if there is one. 1258 """ 1259 if hasattr(obj, attr): 1260 real_val = getattr(obj, attr) 1261 setattr(obj, attr, new_val) 1262 try: 1263 yield real_val 1264 finally: 1265 setattr(obj, attr, real_val) 1266 else: 1267 setattr(obj, attr, new_val) 1268 try: 1269 yield 1270 finally: 1271 if hasattr(obj, attr): 1272 delattr(obj, attr) 1273 1274@contextlib.contextmanager 1275def swap_item(obj, item, new_val): 1276 """Temporary swap out an item with a new object. 1277 1278 Usage: 1279 with swap_item(obj, "item", 5): 1280 ... 1281 1282 This will set obj["item"] to 5 for the duration of the with: block, 1283 restoring the old value at the end of the block. If `item` doesn't 1284 exist on `obj`, it will be created and then deleted at the end of the 1285 block. 1286 1287 The old value (or None if it doesn't exist) will be assigned to the 1288 target of the "as" clause, if there is one. 1289 """ 1290 if item in obj: 1291 real_val = obj[item] 1292 obj[item] = new_val 1293 try: 1294 yield real_val 1295 finally: 1296 obj[item] = real_val 1297 else: 1298 obj[item] = new_val 1299 try: 1300 yield 1301 finally: 1302 if item in obj: 1303 del obj[item] 1304 1305def args_from_interpreter_flags(): 1306 """Return a list of command-line arguments reproducing the current 1307 settings in sys.flags and sys.warnoptions.""" 1308 import subprocess 1309 return subprocess._args_from_interpreter_flags() 1310 1311def optim_args_from_interpreter_flags(): 1312 """Return a list of command-line arguments reproducing the current 1313 optimization settings in sys.flags.""" 1314 import subprocess 1315 return subprocess._optim_args_from_interpreter_flags() 1316 1317 1318class Matcher(object): 1319 1320 _partial_matches = ('msg', 'message') 1321 1322 def matches(self, d, **kwargs): 1323 """ 1324 Try to match a single dict with the supplied arguments. 1325 1326 Keys whose values are strings and which are in self._partial_matches 1327 will be checked for partial (i.e. substring) matches. You can extend 1328 this scheme to (for example) do regular expression matching, etc. 1329 """ 1330 result = True 1331 for k in kwargs: 1332 v = kwargs[k] 1333 dv = d.get(k) 1334 if not self.match_value(k, dv, v): 1335 result = False 1336 break 1337 return result 1338 1339 def match_value(self, k, dv, v): 1340 """ 1341 Try to match a single stored value (dv) with a supplied value (v). 1342 """ 1343 if type(v) != type(dv): 1344 result = False 1345 elif type(dv) is not str or k not in self._partial_matches: 1346 result = (v == dv) 1347 else: 1348 result = dv.find(v) >= 0 1349 return result 1350 1351 1352_buggy_ucrt = None 1353def skip_if_buggy_ucrt_strfptime(test): 1354 """ 1355 Skip decorator for tests that use buggy strptime/strftime 1356 1357 If the UCRT bugs are present time.localtime().tm_zone will be 1358 an empty string, otherwise we assume the UCRT bugs are fixed 1359 1360 See bpo-37552 [Windows] strptime/strftime return invalid 1361 results with UCRT version 17763.615 1362 """ 1363 import locale 1364 global _buggy_ucrt 1365 if _buggy_ucrt is None: 1366 if(sys.platform == 'win32' and 1367 locale.getdefaultlocale()[1] == 'cp65001' and 1368 time.localtime().tm_zone == ''): 1369 _buggy_ucrt = True 1370 else: 1371 _buggy_ucrt = False 1372 return unittest.skip("buggy MSVC UCRT strptime/strftime")(test) if _buggy_ucrt else test 1373 1374class PythonSymlink: 1375 """Creates a symlink for the current Python executable""" 1376 def __init__(self, link=None): 1377 from .os_helper import TESTFN 1378 1379 self.link = link or os.path.abspath(TESTFN) 1380 self._linked = [] 1381 self.real = os.path.realpath(sys.executable) 1382 self._also_link = [] 1383 1384 self._env = None 1385 1386 self._platform_specific() 1387 1388 def _platform_specific(self): 1389 pass 1390 1391 if sys.platform == "win32": 1392 def _platform_specific(self): 1393 import glob 1394 import _winapi 1395 1396 if os.path.lexists(self.real) and not os.path.exists(self.real): 1397 # App symlink appears to not exist, but we want the 1398 # real executable here anyway 1399 self.real = _winapi.GetModuleFileName(0) 1400 1401 dll = _winapi.GetModuleFileName(sys.dllhandle) 1402 src_dir = os.path.dirname(dll) 1403 dest_dir = os.path.dirname(self.link) 1404 self._also_link.append(( 1405 dll, 1406 os.path.join(dest_dir, os.path.basename(dll)) 1407 )) 1408 for runtime in glob.glob(os.path.join(glob.escape(src_dir), "vcruntime*.dll")): 1409 self._also_link.append(( 1410 runtime, 1411 os.path.join(dest_dir, os.path.basename(runtime)) 1412 )) 1413 1414 self._env = {k.upper(): os.getenv(k) for k in os.environ} 1415 self._env["PYTHONHOME"] = os.path.dirname(self.real) 1416 if sysconfig.is_python_build(True): 1417 self._env["PYTHONPATH"] = STDLIB_DIR 1418 1419 def __enter__(self): 1420 os.symlink(self.real, self.link) 1421 self._linked.append(self.link) 1422 for real, link in self._also_link: 1423 os.symlink(real, link) 1424 self._linked.append(link) 1425 return self 1426 1427 def __exit__(self, exc_type, exc_value, exc_tb): 1428 for link in self._linked: 1429 try: 1430 os.remove(link) 1431 except IOError as ex: 1432 if verbose: 1433 print("failed to clean up {}: {}".format(link, ex)) 1434 1435 def _call(self, python, args, env, returncode): 1436 import subprocess 1437 cmd = [python, *args] 1438 p = subprocess.Popen(cmd, stdout=subprocess.PIPE, 1439 stderr=subprocess.PIPE, env=env) 1440 r = p.communicate() 1441 if p.returncode != returncode: 1442 if verbose: 1443 print(repr(r[0])) 1444 print(repr(r[1]), file=sys.stderr) 1445 raise RuntimeError( 1446 'unexpected return code: {0} (0x{0:08X})'.format(p.returncode)) 1447 return r 1448 1449 def call_real(self, *args, returncode=0): 1450 return self._call(self.real, args, None, returncode) 1451 1452 def call_link(self, *args, returncode=0): 1453 return self._call(self.link, args, self._env, returncode) 1454 1455 1456def skip_if_pgo_task(test): 1457 """Skip decorator for tests not run in (non-extended) PGO task""" 1458 ok = not PGO or PGO_EXTENDED 1459 msg = "Not run for (non-extended) PGO task" 1460 return test if ok else unittest.skip(msg)(test) 1461 1462 1463def detect_api_mismatch(ref_api, other_api, *, ignore=()): 1464 """Returns the set of items in ref_api not in other_api, except for a 1465 defined list of items to be ignored in this check. 1466 1467 By default this skips private attributes beginning with '_' but 1468 includes all magic methods, i.e. those starting and ending in '__'. 1469 """ 1470 missing_items = set(dir(ref_api)) - set(dir(other_api)) 1471 if ignore: 1472 missing_items -= set(ignore) 1473 missing_items = set(m for m in missing_items 1474 if not m.startswith('_') or m.endswith('__')) 1475 return missing_items 1476 1477 1478def check__all__(test_case, module, name_of_module=None, extra=(), 1479 not_exported=()): 1480 """Assert that the __all__ variable of 'module' contains all public names. 1481 1482 The module's public names (its API) are detected automatically based on 1483 whether they match the public name convention and were defined in 1484 'module'. 1485 1486 The 'name_of_module' argument can specify (as a string or tuple thereof) 1487 what module(s) an API could be defined in in order to be detected as a 1488 public API. One case for this is when 'module' imports part of its public 1489 API from other modules, possibly a C backend (like 'csv' and its '_csv'). 1490 1491 The 'extra' argument can be a set of names that wouldn't otherwise be 1492 automatically detected as "public", like objects without a proper 1493 '__module__' attribute. If provided, it will be added to the 1494 automatically detected ones. 1495 1496 The 'not_exported' argument can be a set of names that must not be treated 1497 as part of the public API even though their names indicate otherwise. 1498 1499 Usage: 1500 import bar 1501 import foo 1502 import unittest 1503 from test import support 1504 1505 class MiscTestCase(unittest.TestCase): 1506 def test__all__(self): 1507 support.check__all__(self, foo) 1508 1509 class OtherTestCase(unittest.TestCase): 1510 def test__all__(self): 1511 extra = {'BAR_CONST', 'FOO_CONST'} 1512 not_exported = {'baz'} # Undocumented name. 1513 # bar imports part of its API from _bar. 1514 support.check__all__(self, bar, ('bar', '_bar'), 1515 extra=extra, not_exported=not_exported) 1516 1517 """ 1518 1519 if name_of_module is None: 1520 name_of_module = (module.__name__, ) 1521 elif isinstance(name_of_module, str): 1522 name_of_module = (name_of_module, ) 1523 1524 expected = set(extra) 1525 1526 for name in dir(module): 1527 if name.startswith('_') or name in not_exported: 1528 continue 1529 obj = getattr(module, name) 1530 if (getattr(obj, '__module__', None) in name_of_module or 1531 (not hasattr(obj, '__module__') and 1532 not isinstance(obj, types.ModuleType))): 1533 expected.add(name) 1534 test_case.assertCountEqual(module.__all__, expected) 1535 1536 1537def suppress_msvcrt_asserts(verbose=False): 1538 try: 1539 import msvcrt 1540 except ImportError: 1541 return 1542 1543 msvcrt.SetErrorMode(msvcrt.SEM_FAILCRITICALERRORS 1544 | msvcrt.SEM_NOALIGNMENTFAULTEXCEPT 1545 | msvcrt.SEM_NOGPFAULTERRORBOX 1546 | msvcrt.SEM_NOOPENFILEERRORBOX) 1547 1548 # CrtSetReportMode() is only available in debug build 1549 if hasattr(msvcrt, 'CrtSetReportMode'): 1550 for m in [msvcrt.CRT_WARN, msvcrt.CRT_ERROR, msvcrt.CRT_ASSERT]: 1551 if verbose: 1552 msvcrt.CrtSetReportMode(m, msvcrt.CRTDBG_MODE_FILE) 1553 msvcrt.CrtSetReportFile(m, msvcrt.CRTDBG_FILE_STDERR) 1554 else: 1555 msvcrt.CrtSetReportMode(m, 0) 1556 1557 1558class SuppressCrashReport: 1559 """Try to prevent a crash report from popping up. 1560 1561 On Windows, don't display the Windows Error Reporting dialog. On UNIX, 1562 disable the creation of coredump file. 1563 """ 1564 old_value = None 1565 old_modes = None 1566 1567 def __enter__(self): 1568 """On Windows, disable Windows Error Reporting dialogs using 1569 SetErrorMode() and CrtSetReportMode(). 1570 1571 On UNIX, try to save the previous core file size limit, then set 1572 soft limit to 0. 1573 """ 1574 if sys.platform.startswith('win'): 1575 # see http://msdn.microsoft.com/en-us/library/windows/desktop/ms680621.aspx 1576 try: 1577 import msvcrt 1578 except ImportError: 1579 return 1580 1581 self.old_value = msvcrt.GetErrorMode() 1582 1583 msvcrt.SetErrorMode(self.old_value | msvcrt.SEM_NOGPFAULTERRORBOX) 1584 1585 # bpo-23314: Suppress assert dialogs in debug builds. 1586 # CrtSetReportMode() is only available in debug build. 1587 if hasattr(msvcrt, 'CrtSetReportMode'): 1588 self.old_modes = {} 1589 for report_type in [msvcrt.CRT_WARN, 1590 msvcrt.CRT_ERROR, 1591 msvcrt.CRT_ASSERT]: 1592 old_mode = msvcrt.CrtSetReportMode(report_type, 1593 msvcrt.CRTDBG_MODE_FILE) 1594 old_file = msvcrt.CrtSetReportFile(report_type, 1595 msvcrt.CRTDBG_FILE_STDERR) 1596 self.old_modes[report_type] = old_mode, old_file 1597 1598 else: 1599 try: 1600 import resource 1601 self.resource = resource 1602 except ImportError: 1603 self.resource = None 1604 if self.resource is not None: 1605 try: 1606 self.old_value = self.resource.getrlimit(self.resource.RLIMIT_CORE) 1607 self.resource.setrlimit(self.resource.RLIMIT_CORE, 1608 (0, self.old_value[1])) 1609 except (ValueError, OSError): 1610 pass 1611 1612 if sys.platform == 'darwin': 1613 import subprocess 1614 # Check if the 'Crash Reporter' on OSX was configured 1615 # in 'Developer' mode and warn that it will get triggered 1616 # when it is. 1617 # 1618 # This assumes that this context manager is used in tests 1619 # that might trigger the next manager. 1620 cmd = ['/usr/bin/defaults', 'read', 1621 'com.apple.CrashReporter', 'DialogType'] 1622 proc = subprocess.Popen(cmd, 1623 stdout=subprocess.PIPE, 1624 stderr=subprocess.PIPE) 1625 with proc: 1626 stdout = proc.communicate()[0] 1627 if stdout.strip() == b'developer': 1628 print("this test triggers the Crash Reporter, " 1629 "that is intentional", end='', flush=True) 1630 1631 return self 1632 1633 def __exit__(self, *ignore_exc): 1634 """Restore Windows ErrorMode or core file behavior to initial value.""" 1635 if self.old_value is None: 1636 return 1637 1638 if sys.platform.startswith('win'): 1639 import msvcrt 1640 msvcrt.SetErrorMode(self.old_value) 1641 1642 if self.old_modes: 1643 for report_type, (old_mode, old_file) in self.old_modes.items(): 1644 msvcrt.CrtSetReportMode(report_type, old_mode) 1645 msvcrt.CrtSetReportFile(report_type, old_file) 1646 else: 1647 if self.resource is not None: 1648 try: 1649 self.resource.setrlimit(self.resource.RLIMIT_CORE, self.old_value) 1650 except (ValueError, OSError): 1651 pass 1652 1653 1654def patch(test_instance, object_to_patch, attr_name, new_value): 1655 """Override 'object_to_patch'.'attr_name' with 'new_value'. 1656 1657 Also, add a cleanup procedure to 'test_instance' to restore 1658 'object_to_patch' value for 'attr_name'. 1659 The 'attr_name' should be a valid attribute for 'object_to_patch'. 1660 1661 """ 1662 # check that 'attr_name' is a real attribute for 'object_to_patch' 1663 # will raise AttributeError if it does not exist 1664 getattr(object_to_patch, attr_name) 1665 1666 # keep a copy of the old value 1667 attr_is_local = False 1668 try: 1669 old_value = object_to_patch.__dict__[attr_name] 1670 except (AttributeError, KeyError): 1671 old_value = getattr(object_to_patch, attr_name, None) 1672 else: 1673 attr_is_local = True 1674 1675 # restore the value when the test is done 1676 def cleanup(): 1677 if attr_is_local: 1678 setattr(object_to_patch, attr_name, old_value) 1679 else: 1680 delattr(object_to_patch, attr_name) 1681 1682 test_instance.addCleanup(cleanup) 1683 1684 # actually override the attribute 1685 setattr(object_to_patch, attr_name, new_value) 1686 1687 1688def run_in_subinterp(code): 1689 """ 1690 Run code in a subinterpreter. Raise unittest.SkipTest if the tracemalloc 1691 module is enabled. 1692 """ 1693 # Issue #10915, #15751: PyGILState_*() functions don't work with 1694 # sub-interpreters, the tracemalloc module uses these functions internally 1695 try: 1696 import tracemalloc 1697 except ImportError: 1698 pass 1699 else: 1700 if tracemalloc.is_tracing(): 1701 raise unittest.SkipTest("run_in_subinterp() cannot be used " 1702 "if tracemalloc module is tracing " 1703 "memory allocations") 1704 import _testcapi 1705 return _testcapi.run_in_subinterp(code) 1706 1707 1708def check_free_after_iterating(test, iter, cls, args=()): 1709 class A(cls): 1710 def __del__(self): 1711 nonlocal done 1712 done = True 1713 try: 1714 next(it) 1715 except StopIteration: 1716 pass 1717 1718 done = False 1719 it = iter(A(*args)) 1720 # Issue 26494: Shouldn't crash 1721 test.assertRaises(StopIteration, next, it) 1722 # The sequence should be deallocated just after the end of iterating 1723 gc_collect() 1724 test.assertTrue(done) 1725 1726 1727def missing_compiler_executable(cmd_names=[]): 1728 """Check if the compiler components used to build the interpreter exist. 1729 1730 Check for the existence of the compiler executables whose names are listed 1731 in 'cmd_names' or all the compiler executables when 'cmd_names' is empty 1732 and return the first missing executable or None when none is found 1733 missing. 1734 1735 """ 1736 # TODO (PEP 632): alternate check without using distutils 1737 from distutils import ccompiler, sysconfig, spawn, errors 1738 compiler = ccompiler.new_compiler() 1739 sysconfig.customize_compiler(compiler) 1740 if compiler.compiler_type == "msvc": 1741 # MSVC has no executables, so check whether initialization succeeds 1742 try: 1743 compiler.initialize() 1744 except errors.DistutilsPlatformError: 1745 return "msvc" 1746 for name in compiler.executables: 1747 if cmd_names and name not in cmd_names: 1748 continue 1749 cmd = getattr(compiler, name) 1750 if cmd_names: 1751 assert cmd is not None, \ 1752 "the '%s' executable is not configured" % name 1753 elif not cmd: 1754 continue 1755 if spawn.find_executable(cmd[0]) is None: 1756 return cmd[0] 1757 1758 1759_is_android_emulator = None 1760def setswitchinterval(interval): 1761 # Setting a very low gil interval on the Android emulator causes python 1762 # to hang (issue #26939). 1763 minimum_interval = 1e-5 1764 if is_android and interval < minimum_interval: 1765 global _is_android_emulator 1766 if _is_android_emulator is None: 1767 import subprocess 1768 _is_android_emulator = (subprocess.check_output( 1769 ['getprop', 'ro.kernel.qemu']).strip() == b'1') 1770 if _is_android_emulator: 1771 interval = minimum_interval 1772 return sys.setswitchinterval(interval) 1773 1774 1775@contextlib.contextmanager 1776def disable_faulthandler(): 1777 import faulthandler 1778 1779 # use sys.__stderr__ instead of sys.stderr, since regrtest replaces 1780 # sys.stderr with a StringIO which has no file descriptor when a test 1781 # is run with -W/--verbose3. 1782 fd = sys.__stderr__.fileno() 1783 1784 is_enabled = faulthandler.is_enabled() 1785 try: 1786 faulthandler.disable() 1787 yield 1788 finally: 1789 if is_enabled: 1790 faulthandler.enable(file=fd, all_threads=True) 1791 1792 1793class SaveSignals: 1794 """ 1795 Save and restore signal handlers. 1796 1797 This class is only able to save/restore signal handlers registered 1798 by the Python signal module: see bpo-13285 for "external" signal 1799 handlers. 1800 """ 1801 1802 def __init__(self): 1803 import signal 1804 self.signal = signal 1805 self.signals = signal.valid_signals() 1806 # SIGKILL and SIGSTOP signals cannot be ignored nor caught 1807 for signame in ('SIGKILL', 'SIGSTOP'): 1808 try: 1809 signum = getattr(signal, signame) 1810 except AttributeError: 1811 continue 1812 self.signals.remove(signum) 1813 self.handlers = {} 1814 1815 def save(self): 1816 for signum in self.signals: 1817 handler = self.signal.getsignal(signum) 1818 if handler is None: 1819 # getsignal() returns None if a signal handler was not 1820 # registered by the Python signal module, 1821 # and the handler is not SIG_DFL nor SIG_IGN. 1822 # 1823 # Ignore the signal: we cannot restore the handler. 1824 continue 1825 self.handlers[signum] = handler 1826 1827 def restore(self): 1828 for signum, handler in self.handlers.items(): 1829 self.signal.signal(signum, handler) 1830 1831 1832def with_pymalloc(): 1833 import _testcapi 1834 return _testcapi.WITH_PYMALLOC 1835 1836 1837class _ALWAYS_EQ: 1838 """ 1839 Object that is equal to anything. 1840 """ 1841 def __eq__(self, other): 1842 return True 1843 def __ne__(self, other): 1844 return False 1845 1846ALWAYS_EQ = _ALWAYS_EQ() 1847 1848class _NEVER_EQ: 1849 """ 1850 Object that is not equal to anything. 1851 """ 1852 def __eq__(self, other): 1853 return False 1854 def __ne__(self, other): 1855 return True 1856 def __hash__(self): 1857 return 1 1858 1859NEVER_EQ = _NEVER_EQ() 1860 1861@functools.total_ordering 1862class _LARGEST: 1863 """ 1864 Object that is greater than anything (except itself). 1865 """ 1866 def __eq__(self, other): 1867 return isinstance(other, _LARGEST) 1868 def __lt__(self, other): 1869 return False 1870 1871LARGEST = _LARGEST() 1872 1873@functools.total_ordering 1874class _SMALLEST: 1875 """ 1876 Object that is less than anything (except itself). 1877 """ 1878 def __eq__(self, other): 1879 return isinstance(other, _SMALLEST) 1880 def __gt__(self, other): 1881 return False 1882 1883SMALLEST = _SMALLEST() 1884 1885def maybe_get_event_loop_policy(): 1886 """Return the global event loop policy if one is set, else return None.""" 1887 import asyncio.events 1888 return asyncio.events._event_loop_policy 1889 1890# Helpers for testing hashing. 1891NHASHBITS = sys.hash_info.width # number of bits in hash() result 1892assert NHASHBITS in (32, 64) 1893 1894# Return mean and sdev of number of collisions when tossing nballs balls 1895# uniformly at random into nbins bins. By definition, the number of 1896# collisions is the number of balls minus the number of occupied bins at 1897# the end. 1898def collision_stats(nbins, nballs): 1899 n, k = nbins, nballs 1900 # prob a bin empty after k trials = (1 - 1/n)**k 1901 # mean # empty is then n * (1 - 1/n)**k 1902 # so mean # occupied is n - n * (1 - 1/n)**k 1903 # so collisions = k - (n - n*(1 - 1/n)**k) 1904 # 1905 # For the variance: 1906 # n*(n-1)*(1-2/n)**k + meanempty - meanempty**2 = 1907 # n*(n-1)*(1-2/n)**k + meanempty * (1 - meanempty) 1908 # 1909 # Massive cancellation occurs, and, e.g., for a 64-bit hash code 1910 # 1-1/2**64 rounds uselessly to 1.0. Rather than make heroic (and 1911 # error-prone) efforts to rework the naive formulas to avoid those, 1912 # we use the `decimal` module to get plenty of extra precision. 1913 # 1914 # Note: the exact values are straightforward to compute with 1915 # rationals, but in context that's unbearably slow, requiring 1916 # multi-million bit arithmetic. 1917 import decimal 1918 with decimal.localcontext() as ctx: 1919 bits = n.bit_length() * 2 # bits in n**2 1920 # At least that many bits will likely cancel out. 1921 # Use that many decimal digits instead. 1922 ctx.prec = max(bits, 30) 1923 dn = decimal.Decimal(n) 1924 p1empty = ((dn - 1) / dn) ** k 1925 meanempty = n * p1empty 1926 occupied = n - meanempty 1927 collisions = k - occupied 1928 var = dn*(dn-1)*((dn-2)/dn)**k + meanempty * (1 - meanempty) 1929 return float(collisions), float(var.sqrt()) 1930 1931 1932class catch_unraisable_exception: 1933 """ 1934 Context manager catching unraisable exception using sys.unraisablehook. 1935 1936 Storing the exception value (cm.unraisable.exc_value) creates a reference 1937 cycle. The reference cycle is broken explicitly when the context manager 1938 exits. 1939 1940 Storing the object (cm.unraisable.object) can resurrect it if it is set to 1941 an object which is being finalized. Exiting the context manager clears the 1942 stored object. 1943 1944 Usage: 1945 1946 with support.catch_unraisable_exception() as cm: 1947 # code creating an "unraisable exception" 1948 ... 1949 1950 # check the unraisable exception: use cm.unraisable 1951 ... 1952 1953 # cm.unraisable attribute no longer exists at this point 1954 # (to break a reference cycle) 1955 """ 1956 1957 def __init__(self): 1958 self.unraisable = None 1959 self._old_hook = None 1960 1961 def _hook(self, unraisable): 1962 # Storing unraisable.object can resurrect an object which is being 1963 # finalized. Storing unraisable.exc_value creates a reference cycle. 1964 self.unraisable = unraisable 1965 1966 def __enter__(self): 1967 self._old_hook = sys.unraisablehook 1968 sys.unraisablehook = self._hook 1969 return self 1970 1971 def __exit__(self, *exc_info): 1972 sys.unraisablehook = self._old_hook 1973 del self.unraisable 1974 1975 1976def wait_process(pid, *, exitcode, timeout=None): 1977 """ 1978 Wait until process pid completes and check that the process exit code is 1979 exitcode. 1980 1981 Raise an AssertionError if the process exit code is not equal to exitcode. 1982 1983 If the process runs longer than timeout seconds (SHORT_TIMEOUT by default), 1984 kill the process (if signal.SIGKILL is available) and raise an 1985 AssertionError. The timeout feature is not available on Windows. 1986 """ 1987 if os.name != "nt": 1988 import signal 1989 1990 if timeout is None: 1991 timeout = SHORT_TIMEOUT 1992 t0 = time.monotonic() 1993 sleep = 0.001 1994 max_sleep = 0.1 1995 while True: 1996 pid2, status = os.waitpid(pid, os.WNOHANG) 1997 if pid2 != 0: 1998 break 1999 # process is still running 2000 2001 dt = time.monotonic() - t0 2002 if dt > SHORT_TIMEOUT: 2003 try: 2004 os.kill(pid, signal.SIGKILL) 2005 os.waitpid(pid, 0) 2006 except OSError: 2007 # Ignore errors like ChildProcessError or PermissionError 2008 pass 2009 2010 raise AssertionError(f"process {pid} is still running " 2011 f"after {dt:.1f} seconds") 2012 2013 sleep = min(sleep * 2, max_sleep) 2014 time.sleep(sleep) 2015 else: 2016 # Windows implementation 2017 pid2, status = os.waitpid(pid, 0) 2018 2019 exitcode2 = os.waitstatus_to_exitcode(status) 2020 if exitcode2 != exitcode: 2021 raise AssertionError(f"process {pid} exited with code {exitcode2}, " 2022 f"but exit code {exitcode} is expected") 2023 2024 # sanity check: it should not fail in practice 2025 if pid2 != pid: 2026 raise AssertionError(f"pid {pid2} != pid {pid}") 2027 2028def skip_if_broken_multiprocessing_synchronize(): 2029 """ 2030 Skip tests if the multiprocessing.synchronize module is missing, if there 2031 is no available semaphore implementation, or if creating a lock raises an 2032 OSError (on Linux only). 2033 """ 2034 from .import_helper import import_module 2035 2036 # Skip tests if the _multiprocessing extension is missing. 2037 import_module('_multiprocessing') 2038 2039 # Skip tests if there is no available semaphore implementation: 2040 # multiprocessing.synchronize requires _multiprocessing.SemLock. 2041 synchronize = import_module('multiprocessing.synchronize') 2042 2043 if sys.platform == "linux": 2044 try: 2045 # bpo-38377: On Linux, creating a semaphore fails with OSError 2046 # if the current user does not have the permission to create 2047 # a file in /dev/shm/ directory. 2048 synchronize.Lock(ctx=None) 2049 except OSError as exc: 2050 raise unittest.SkipTest(f"broken multiprocessing SemLock: {exc!r}") 2051 2052 2053def check_disallow_instantiation(testcase, tp, *args, **kwds): 2054 """ 2055 Check that given type cannot be instantiated using *args and **kwds. 2056 2057 See bpo-43916: Add Py_TPFLAGS_DISALLOW_INSTANTIATION type flag. 2058 """ 2059 mod = tp.__module__ 2060 name = tp.__name__ 2061 if mod != 'builtins': 2062 qualname = f"{mod}.{name}" 2063 else: 2064 qualname = f"{name}" 2065 msg = f"cannot create '{re.escape(qualname)}' instances" 2066 testcase.assertRaisesRegex(TypeError, msg, tp, *args, **kwds) 2067 2068@contextlib.contextmanager 2069def infinite_recursion(max_depth=75): 2070 """Set a lower limit for tests that interact with infinite recursions 2071 (e.g test_ast.ASTHelpers_Test.test_recursion_direct) since on some 2072 debug windows builds, due to not enough functions being inlined the 2073 stack size might not handle the default recursion limit (1000). See 2074 bpo-11105 for details.""" 2075 2076 original_depth = sys.getrecursionlimit() 2077 try: 2078 sys.setrecursionlimit(max_depth) 2079 yield 2080 finally: 2081 sys.setrecursionlimit(original_depth) 2082 2083def ignore_deprecations_from(module: str, *, like: str) -> object: 2084 token = object() 2085 warnings.filterwarnings( 2086 "ignore", 2087 category=DeprecationWarning, 2088 module=module, 2089 message=like + fr"(?#support{id(token)})", 2090 ) 2091 return token 2092 2093def clear_ignored_deprecations(*tokens: object) -> None: 2094 if not tokens: 2095 raise ValueError("Provide token or tokens returned by ignore_deprecations_from") 2096 2097 new_filters = [] 2098 endswith = tuple(rf"(?#support{id(token)})" for token in tokens) 2099 for action, message, category, module, lineno in warnings.filters: 2100 if action == "ignore" and category is DeprecationWarning: 2101 if isinstance(message, re.Pattern): 2102 msg = message.pattern 2103 else: 2104 msg = message or "" 2105 if msg.endswith(endswith): 2106 continue 2107 new_filters.append((action, message, category, module, lineno)) 2108 if warnings.filters != new_filters: 2109 warnings.filters[:] = new_filters 2110 warnings._filters_mutated() 2111