1"""Supporting definitions for the Python regression tests.""" 2 3if __name__ != 'test.support': 4 raise ImportError('support must be imported from the test package') 5 6import asyncio.events 7import collections.abc 8import contextlib 9import errno 10import faulthandler 11import fnmatch 12import functools 13import gc 14import glob 15import hashlib 16import importlib 17import importlib.util 18import locale 19import logging.handlers 20import nntplib 21import os 22import platform 23import re 24import shutil 25import socket 26import stat 27import struct 28import subprocess 29import sys 30import sysconfig 31import tempfile 32import _thread 33import threading 34import time 35import types 36import unittest 37import urllib.error 38import warnings 39 40from .testresult import get_test_runner 41 42try: 43 import multiprocessing.process 44except ImportError: 45 multiprocessing = None 46 47try: 48 import zlib 49except ImportError: 50 zlib = None 51 52try: 53 import gzip 54except ImportError: 55 gzip = None 56 57try: 58 import bz2 59except ImportError: 60 bz2 = None 61 62try: 63 import lzma 64except ImportError: 65 lzma = None 66 67try: 68 import resource 69except ImportError: 70 resource = None 71 72try: 73 import _hashlib 74except ImportError: 75 _hashlib = None 76 77__all__ = [ 78 # globals 79 "PIPE_MAX_SIZE", "verbose", "max_memuse", "use_resources", "failfast", 80 # exceptions 81 "Error", "TestFailed", "TestDidNotRun", "ResourceDenied", 82 # imports 83 "import_module", "import_fresh_module", "CleanImport", 84 # modules 85 "unload", "forget", 86 # io 87 "record_original_stdout", "get_original_stdout", "captured_stdout", 88 "captured_stdin", "captured_stderr", 89 # filesystem 90 "TESTFN", "SAVEDCWD", "unlink", "rmtree", "temp_cwd", "findfile", 91 "create_empty_file", "can_symlink", "fs_is_case_insensitive", 92 # unittest 93 "is_resource_enabled", "requires", "requires_freebsd_version", 94 "requires_linux_version", "requires_mac_ver", "requires_hashdigest", 95 "check_syntax_error", "check_syntax_warning", 96 "TransientResource", "time_out", "socket_peer_reset", "ioerror_peer_reset", 97 "transient_internet", "BasicTestRunner", "run_unittest", "run_doctest", 98 "skip_unless_symlink", "requires_gzip", "requires_bz2", "requires_lzma", 99 "bigmemtest", "bigaddrspacetest", "cpython_only", "get_attribute", 100 "requires_IEEE_754", "skip_unless_xattr", "requires_zlib", 101 "anticipate_failure", "load_package_tests", "detect_api_mismatch", 102 "check__all__", "skip_unless_bind_unix_socket", "skip_if_buggy_ucrt_strfptime", 103 "ignore_warnings", 104 # sys 105 "is_jython", "is_android", "check_impl_detail", "unix_shell", 106 "setswitchinterval", 107 # network 108 "HOST", "IPV6_ENABLED", "find_unused_port", "bind_port", "open_urlresource", 109 "bind_unix_socket", 110 # processes 111 'temp_umask', "reap_children", 112 # logging 113 "TestHandler", 114 # threads 115 "threading_setup", "threading_cleanup", "reap_threads", "start_threads", 116 # miscellaneous 117 "check_warnings", "check_no_resource_warning", "check_no_warnings", 118 "EnvironmentVarGuard", 119 "run_with_locale", "swap_item", 120 "swap_attr", "Matcher", "set_memlimit", "SuppressCrashReport", "sortdict", 121 "run_with_tz", "PGO", "missing_compiler_executable", "fd_count", 122 "ALWAYS_EQ", "LARGEST", "SMALLEST" 123 ] 124 125class Error(Exception): 126 """Base class for regression test exceptions.""" 127 128class TestFailed(Error): 129 """Test failed.""" 130 131class TestDidNotRun(Error): 132 """Test did not run any subtests.""" 133 134class ResourceDenied(unittest.SkipTest): 135 """Test skipped because it requested a disallowed resource. 136 137 This is raised when a test calls requires() for a resource that 138 has not be enabled. It is used to distinguish between expected 139 and unexpected skips. 140 """ 141 142@contextlib.contextmanager 143def _ignore_deprecated_imports(ignore=True): 144 """Context manager to suppress package and module deprecation 145 warnings when importing them. 146 147 If ignore is False, this context manager has no effect. 148 """ 149 if ignore: 150 with warnings.catch_warnings(): 151 warnings.filterwarnings("ignore", ".+ (module|package)", 152 DeprecationWarning) 153 yield 154 else: 155 yield 156 157 158def ignore_warnings(*, category): 159 """Decorator to suppress deprecation warnings. 160 161 Use of context managers to hide warnings make diffs 162 more noisy and tools like 'git blame' less useful. 163 """ 164 def decorator(test): 165 @functools.wraps(test) 166 def wrapper(self, *args, **kwargs): 167 with warnings.catch_warnings(): 168 warnings.simplefilter('ignore', category=category) 169 return test(self, *args, **kwargs) 170 return wrapper 171 return decorator 172 173 174def import_module(name, deprecated=False, *, required_on=()): 175 """Import and return the module to be tested, raising SkipTest if 176 it is not available. 177 178 If deprecated is True, any module or package deprecation messages 179 will be suppressed. If a module is required on a platform but optional for 180 others, set required_on to an iterable of platform prefixes which will be 181 compared against sys.platform. 182 """ 183 with _ignore_deprecated_imports(deprecated): 184 try: 185 return importlib.import_module(name) 186 except ImportError as msg: 187 if sys.platform.startswith(tuple(required_on)): 188 raise 189 raise unittest.SkipTest(str(msg)) 190 191 192def _save_and_remove_module(name, orig_modules): 193 """Helper function to save and remove a module from sys.modules 194 195 Raise ImportError if the module can't be imported. 196 """ 197 # try to import the module and raise an error if it can't be imported 198 if name not in sys.modules: 199 __import__(name) 200 del sys.modules[name] 201 for modname in list(sys.modules): 202 if modname == name or modname.startswith(name + '.'): 203 orig_modules[modname] = sys.modules[modname] 204 del sys.modules[modname] 205 206def _save_and_block_module(name, orig_modules): 207 """Helper function to save and block a module in sys.modules 208 209 Return True if the module was in sys.modules, False otherwise. 210 """ 211 saved = True 212 try: 213 orig_modules[name] = sys.modules[name] 214 except KeyError: 215 saved = False 216 sys.modules[name] = None 217 return saved 218 219 220def anticipate_failure(condition): 221 """Decorator to mark a test that is known to be broken in some cases 222 223 Any use of this decorator should have a comment identifying the 224 associated tracker issue. 225 """ 226 if condition: 227 return unittest.expectedFailure 228 return lambda f: f 229 230def load_package_tests(pkg_dir, loader, standard_tests, pattern): 231 """Generic load_tests implementation for simple test packages. 232 233 Most packages can implement load_tests using this function as follows: 234 235 def load_tests(*args): 236 return load_package_tests(os.path.dirname(__file__), *args) 237 """ 238 if pattern is None: 239 pattern = "test*" 240 top_dir = os.path.dirname( # Lib 241 os.path.dirname( # test 242 os.path.dirname(__file__))) # support 243 package_tests = loader.discover(start_dir=pkg_dir, 244 top_level_dir=top_dir, 245 pattern=pattern) 246 standard_tests.addTests(package_tests) 247 return standard_tests 248 249 250def import_fresh_module(name, fresh=(), blocked=(), deprecated=False): 251 """Import and return a module, deliberately bypassing sys.modules. 252 253 This function imports and returns a fresh copy of the named Python module 254 by removing the named module from sys.modules before doing the import. 255 Note that unlike reload, the original module is not affected by 256 this operation. 257 258 *fresh* is an iterable of additional module names that are also removed 259 from the sys.modules cache before doing the import. 260 261 *blocked* is an iterable of module names that are replaced with None 262 in the module cache during the import to ensure that attempts to import 263 them raise ImportError. 264 265 The named module and any modules named in the *fresh* and *blocked* 266 parameters are saved before starting the import and then reinserted into 267 sys.modules when the fresh import is complete. 268 269 Module and package deprecation messages are suppressed during this import 270 if *deprecated* is True. 271 272 This function will raise ImportError if the named module cannot be 273 imported. 274 """ 275 # NOTE: test_heapq, test_json and test_warnings include extra sanity checks 276 # to make sure that this utility function is working as expected 277 with _ignore_deprecated_imports(deprecated): 278 # Keep track of modules saved for later restoration as well 279 # as those which just need a blocking entry removed 280 orig_modules = {} 281 names_to_remove = [] 282 _save_and_remove_module(name, orig_modules) 283 try: 284 for fresh_name in fresh: 285 _save_and_remove_module(fresh_name, orig_modules) 286 for blocked_name in blocked: 287 if not _save_and_block_module(blocked_name, orig_modules): 288 names_to_remove.append(blocked_name) 289 fresh_module = importlib.import_module(name) 290 except ImportError: 291 fresh_module = None 292 finally: 293 for orig_name, module in orig_modules.items(): 294 sys.modules[orig_name] = module 295 for name_to_remove in names_to_remove: 296 del sys.modules[name_to_remove] 297 return fresh_module 298 299 300def get_attribute(obj, name): 301 """Get an attribute, raising SkipTest if AttributeError is raised.""" 302 try: 303 attribute = getattr(obj, name) 304 except AttributeError: 305 raise unittest.SkipTest("object %r has no attribute %r" % (obj, name)) 306 else: 307 return attribute 308 309verbose = 1 # Flag set to 0 by regrtest.py 310use_resources = None # Flag set to [] by regrtest.py 311max_memuse = 0 # Disable bigmem tests (they will still be run with 312 # small sizes, to make sure they work.) 313real_max_memuse = 0 314junit_xml_list = None # list of testsuite XML elements 315failfast = False 316 317# _original_stdout is meant to hold stdout at the time regrtest began. 318# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever. 319# The point is to have some flavor of stdout the user can actually see. 320_original_stdout = None 321def record_original_stdout(stdout): 322 global _original_stdout 323 _original_stdout = stdout 324 325def get_original_stdout(): 326 return _original_stdout or sys.stdout 327 328def unload(name): 329 try: 330 del sys.modules[name] 331 except KeyError: 332 pass 333 334def _force_run(path, func, *args): 335 try: 336 return func(*args) 337 except OSError as err: 338 if verbose >= 2: 339 print('%s: %s' % (err.__class__.__name__, err)) 340 print('re-run %s%r' % (func.__name__, args)) 341 os.chmod(path, stat.S_IRWXU) 342 return func(*args) 343 344if sys.platform.startswith("win"): 345 def _waitfor(func, pathname, waitall=False): 346 # Perform the operation 347 func(pathname) 348 # Now setup the wait loop 349 if waitall: 350 dirname = pathname 351 else: 352 dirname, name = os.path.split(pathname) 353 dirname = dirname or '.' 354 # Check for `pathname` to be removed from the filesystem. 355 # The exponential backoff of the timeout amounts to a total 356 # of ~1 second after which the deletion is probably an error 357 # anyway. 358 # Testing on an i7@4.3GHz shows that usually only 1 iteration is 359 # required when contention occurs. 360 timeout = 0.001 361 while timeout < 1.0: 362 # Note we are only testing for the existence of the file(s) in 363 # the contents of the directory regardless of any security or 364 # access rights. If we have made it this far, we have sufficient 365 # permissions to do that much using Python's equivalent of the 366 # Windows API FindFirstFile. 367 # Other Windows APIs can fail or give incorrect results when 368 # dealing with files that are pending deletion. 369 L = os.listdir(dirname) 370 if not (L if waitall else name in L): 371 return 372 # Increase the timeout and try again 373 time.sleep(timeout) 374 timeout *= 2 375 warnings.warn('tests may fail, delete still pending for ' + pathname, 376 RuntimeWarning, stacklevel=4) 377 378 def _unlink(filename): 379 _waitfor(os.unlink, filename) 380 381 def _rmdir(dirname): 382 _waitfor(os.rmdir, dirname) 383 384 def _rmtree(path): 385 def _rmtree_inner(path): 386 for name in _force_run(path, os.listdir, path): 387 fullname = os.path.join(path, name) 388 try: 389 mode = os.lstat(fullname).st_mode 390 except OSError as exc: 391 print("support.rmtree(): os.lstat(%r) failed with %s" % (fullname, exc), 392 file=sys.__stderr__) 393 mode = 0 394 if stat.S_ISDIR(mode): 395 _waitfor(_rmtree_inner, fullname, waitall=True) 396 _force_run(fullname, os.rmdir, fullname) 397 else: 398 _force_run(fullname, os.unlink, fullname) 399 _waitfor(_rmtree_inner, path, waitall=True) 400 _waitfor(lambda p: _force_run(p, os.rmdir, p), path) 401 402 def _longpath(path): 403 try: 404 import ctypes 405 except ImportError: 406 # No ctypes means we can't expands paths. 407 pass 408 else: 409 buffer = ctypes.create_unicode_buffer(len(path) * 2) 410 length = ctypes.windll.kernel32.GetLongPathNameW(path, buffer, 411 len(buffer)) 412 if length: 413 return buffer[:length] 414 return path 415else: 416 _unlink = os.unlink 417 _rmdir = os.rmdir 418 419 def _rmtree(path): 420 try: 421 shutil.rmtree(path) 422 return 423 except OSError: 424 pass 425 426 def _rmtree_inner(path): 427 for name in _force_run(path, os.listdir, path): 428 fullname = os.path.join(path, name) 429 try: 430 mode = os.lstat(fullname).st_mode 431 except OSError: 432 mode = 0 433 if stat.S_ISDIR(mode): 434 _rmtree_inner(fullname) 435 _force_run(path, os.rmdir, fullname) 436 else: 437 _force_run(path, os.unlink, fullname) 438 _rmtree_inner(path) 439 os.rmdir(path) 440 441 def _longpath(path): 442 return path 443 444def unlink(filename): 445 try: 446 _unlink(filename) 447 except (FileNotFoundError, NotADirectoryError): 448 pass 449 450def rmdir(dirname): 451 try: 452 _rmdir(dirname) 453 except FileNotFoundError: 454 pass 455 456def rmtree(path): 457 try: 458 _rmtree(path) 459 except FileNotFoundError: 460 pass 461 462def make_legacy_pyc(source): 463 """Move a PEP 3147/488 pyc file to its legacy pyc location. 464 465 :param source: The file system path to the source file. The source file 466 does not need to exist, however the PEP 3147/488 pyc file must exist. 467 :return: The file system path to the legacy pyc file. 468 """ 469 pyc_file = importlib.util.cache_from_source(source) 470 up_one = os.path.dirname(os.path.abspath(source)) 471 legacy_pyc = os.path.join(up_one, source + 'c') 472 os.rename(pyc_file, legacy_pyc) 473 return legacy_pyc 474 475def forget(modname): 476 """'Forget' a module was ever imported. 477 478 This removes the module from sys.modules and deletes any PEP 3147/488 or 479 legacy .pyc files. 480 """ 481 unload(modname) 482 for dirname in sys.path: 483 source = os.path.join(dirname, modname + '.py') 484 # It doesn't matter if they exist or not, unlink all possible 485 # combinations of PEP 3147/488 and legacy pyc files. 486 unlink(source + 'c') 487 for opt in ('', 1, 2): 488 unlink(importlib.util.cache_from_source(source, optimization=opt)) 489 490# Check whether a gui is actually available 491def _is_gui_available(): 492 if hasattr(_is_gui_available, 'result'): 493 return _is_gui_available.result 494 reason = None 495 if sys.platform.startswith('win'): 496 # if Python is running as a service (such as the buildbot service), 497 # gui interaction may be disallowed 498 import ctypes 499 import ctypes.wintypes 500 UOI_FLAGS = 1 501 WSF_VISIBLE = 0x0001 502 class USEROBJECTFLAGS(ctypes.Structure): 503 _fields_ = [("fInherit", ctypes.wintypes.BOOL), 504 ("fReserved", ctypes.wintypes.BOOL), 505 ("dwFlags", ctypes.wintypes.DWORD)] 506 dll = ctypes.windll.user32 507 h = dll.GetProcessWindowStation() 508 if not h: 509 raise ctypes.WinError() 510 uof = USEROBJECTFLAGS() 511 needed = ctypes.wintypes.DWORD() 512 res = dll.GetUserObjectInformationW(h, 513 UOI_FLAGS, 514 ctypes.byref(uof), 515 ctypes.sizeof(uof), 516 ctypes.byref(needed)) 517 if not res: 518 raise ctypes.WinError() 519 if not bool(uof.dwFlags & WSF_VISIBLE): 520 reason = "gui not available (WSF_VISIBLE flag not set)" 521 elif sys.platform == 'darwin': 522 # The Aqua Tk implementations on OS X can abort the process if 523 # being called in an environment where a window server connection 524 # cannot be made, for instance when invoked by a buildbot or ssh 525 # process not running under the same user id as the current console 526 # user. To avoid that, raise an exception if the window manager 527 # connection is not available. 528 from ctypes import cdll, c_int, pointer, Structure 529 from ctypes.util import find_library 530 531 app_services = cdll.LoadLibrary(find_library("ApplicationServices")) 532 533 if app_services.CGMainDisplayID() == 0: 534 reason = "gui tests cannot run without OS X window manager" 535 else: 536 class ProcessSerialNumber(Structure): 537 _fields_ = [("highLongOfPSN", c_int), 538 ("lowLongOfPSN", c_int)] 539 psn = ProcessSerialNumber() 540 psn_p = pointer(psn) 541 if ( (app_services.GetCurrentProcess(psn_p) < 0) or 542 (app_services.SetFrontProcess(psn_p) < 0) ): 543 reason = "cannot run without OS X gui process" 544 545 # check on every platform whether tkinter can actually do anything 546 if not reason: 547 try: 548 from tkinter import Tk 549 root = Tk() 550 root.withdraw() 551 root.update() 552 root.destroy() 553 except Exception as e: 554 err_string = str(e) 555 if len(err_string) > 50: 556 err_string = err_string[:50] + ' [...]' 557 reason = 'Tk unavailable due to {}: {}'.format(type(e).__name__, 558 err_string) 559 560 _is_gui_available.reason = reason 561 _is_gui_available.result = not reason 562 563 return _is_gui_available.result 564 565def is_resource_enabled(resource): 566 """Test whether a resource is enabled. 567 568 Known resources are set by regrtest.py. If not running under regrtest.py, 569 all resources are assumed enabled unless use_resources has been set. 570 """ 571 return use_resources is None or resource in use_resources 572 573def requires(resource, msg=None): 574 """Raise ResourceDenied if the specified resource is not available.""" 575 if not is_resource_enabled(resource): 576 if msg is None: 577 msg = "Use of the %r resource not enabled" % resource 578 raise ResourceDenied(msg) 579 if resource == 'gui' and not _is_gui_available(): 580 raise ResourceDenied(_is_gui_available.reason) 581 582def _requires_unix_version(sysname, min_version): 583 """Decorator raising SkipTest if the OS is `sysname` and the version is less 584 than `min_version`. 585 586 For example, @_requires_unix_version('FreeBSD', (7, 2)) raises SkipTest if 587 the FreeBSD version is less than 7.2. 588 """ 589 import platform 590 min_version_txt = '.'.join(map(str, min_version)) 591 version_txt = platform.release().split('-', 1)[0] 592 if platform.system() == sysname: 593 try: 594 version = tuple(map(int, version_txt.split('.'))) 595 except ValueError: 596 skip = False 597 else: 598 skip = version < min_version 599 else: 600 skip = False 601 602 return unittest.skipIf( 603 skip, 604 f"{sysname} version {min_version_txt} or higher required, not " 605 f"{version_txt}" 606 ) 607 608 609def requires_freebsd_version(*min_version): 610 """Decorator raising SkipTest if the OS is FreeBSD and the FreeBSD version is 611 less than `min_version`. 612 613 For example, @requires_freebsd_version(7, 2) raises SkipTest if the FreeBSD 614 version is less than 7.2. 615 """ 616 return _requires_unix_version('FreeBSD', min_version) 617 618def requires_linux_version(*min_version): 619 """Decorator raising SkipTest if the OS is Linux and the Linux version is 620 less than `min_version`. 621 622 For example, @requires_linux_version(2, 6, 32) raises SkipTest if the Linux 623 version is less than 2.6.32. 624 """ 625 return _requires_unix_version('Linux', min_version) 626 627def requires_mac_ver(*min_version): 628 """Decorator raising SkipTest if the OS is Mac OS X and the OS X 629 version if less than min_version. 630 631 For example, @requires_mac_ver(10, 5) raises SkipTest if the OS X version 632 is lesser than 10.5. 633 """ 634 def decorator(func): 635 @functools.wraps(func) 636 def wrapper(*args, **kw): 637 if sys.platform == 'darwin': 638 version_txt = platform.mac_ver()[0] 639 try: 640 version = tuple(map(int, version_txt.split('.'))) 641 except ValueError: 642 pass 643 else: 644 if version < min_version: 645 min_version_txt = '.'.join(map(str, min_version)) 646 raise unittest.SkipTest( 647 "Mac OS X %s or higher required, not %s" 648 % (min_version_txt, version_txt)) 649 return func(*args, **kw) 650 wrapper.min_version = min_version 651 return wrapper 652 return decorator 653 654 655def requires_hashdigest(digestname, openssl=None): 656 """Decorator raising SkipTest if a hashing algorithm is not available 657 658 The hashing algorithm could be missing or blocked by a strict crypto 659 policy. 660 661 If 'openssl' is True, then the decorator checks that OpenSSL provides 662 the algorithm. Otherwise the check falls back to built-in 663 implementations. 664 665 ValueError: [digital envelope routines: EVP_DigestInit_ex] disabled for FIPS 666 ValueError: unsupported hash type md4 667 """ 668 def decorator(func): 669 @functools.wraps(func) 670 def wrapper(*args, **kwargs): 671 try: 672 if openssl and _hashlib is not None: 673 _hashlib.new(digestname) 674 else: 675 hashlib.new(digestname) 676 except ValueError: 677 raise unittest.SkipTest( 678 f"hash digest '{digestname}' is not available." 679 ) 680 return func(*args, **kwargs) 681 return wrapper 682 return decorator 683 684 685HOST = "localhost" 686HOSTv4 = "127.0.0.1" 687HOSTv6 = "::1" 688 689 690def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM): 691 """Returns an unused port that should be suitable for binding. This is 692 achieved by creating a temporary socket with the same family and type as 693 the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to 694 the specified host address (defaults to 0.0.0.0) with the port set to 0, 695 eliciting an unused ephemeral port from the OS. The temporary socket is 696 then closed and deleted, and the ephemeral port is returned. 697 698 Either this method or bind_port() should be used for any tests where a 699 server socket needs to be bound to a particular port for the duration of 700 the test. Which one to use depends on whether the calling code is creating 701 a python socket, or if an unused port needs to be provided in a constructor 702 or passed to an external program (i.e. the -accept argument to openssl's 703 s_server mode). Always prefer bind_port() over find_unused_port() where 704 possible. Hard coded ports should *NEVER* be used. As soon as a server 705 socket is bound to a hard coded port, the ability to run multiple instances 706 of the test simultaneously on the same host is compromised, which makes the 707 test a ticking time bomb in a buildbot environment. On Unix buildbots, this 708 may simply manifest as a failed test, which can be recovered from without 709 intervention in most cases, but on Windows, the entire python process can 710 completely and utterly wedge, requiring someone to log in to the buildbot 711 and manually kill the affected process. 712 713 (This is easy to reproduce on Windows, unfortunately, and can be traced to 714 the SO_REUSEADDR socket option having different semantics on Windows versus 715 Unix/Linux. On Unix, you can't have two AF_INET SOCK_STREAM sockets bind, 716 listen and then accept connections on identical host/ports. An EADDRINUSE 717 OSError will be raised at some point (depending on the platform and 718 the order bind and listen were called on each socket). 719 720 However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE 721 will ever be raised when attempting to bind two identical host/ports. When 722 accept() is called on each socket, the second caller's process will steal 723 the port from the first caller, leaving them both in an awkwardly wedged 724 state where they'll no longer respond to any signals or graceful kills, and 725 must be forcibly killed via OpenProcess()/TerminateProcess(). 726 727 The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option 728 instead of SO_REUSEADDR, which effectively affords the same semantics as 729 SO_REUSEADDR on Unix. Given the propensity of Unix developers in the Open 730 Source world compared to Windows ones, this is a common mistake. A quick 731 look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when 732 openssl.exe is called with the 's_server' option, for example. See 733 http://bugs.python.org/issue2550 for more info. The following site also 734 has a very thorough description about the implications of both REUSEADDR 735 and EXCLUSIVEADDRUSE on Windows: 736 http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx) 737 738 XXX: although this approach is a vast improvement on previous attempts to 739 elicit unused ports, it rests heavily on the assumption that the ephemeral 740 port returned to us by the OS won't immediately be dished back out to some 741 other process when we close and delete our temporary socket but before our 742 calling code has a chance to bind the returned port. We can deal with this 743 issue if/when we come across it. 744 """ 745 746 with socket.socket(family, socktype) as tempsock: 747 port = bind_port(tempsock) 748 del tempsock 749 return port 750 751def bind_port(sock, host=HOST): 752 """Bind the socket to a free port and return the port number. Relies on 753 ephemeral ports in order to ensure we are using an unbound port. This is 754 important as many tests may be running simultaneously, especially in a 755 buildbot environment. This method raises an exception if the sock.family 756 is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR 757 or SO_REUSEPORT set on it. Tests should *never* set these socket options 758 for TCP/IP sockets. The only case for setting these options is testing 759 multicasting via multiple UDP sockets. 760 761 Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e. 762 on Windows), it will be set on the socket. This will prevent anyone else 763 from bind()'ing to our host/port for the duration of the test. 764 """ 765 766 if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM: 767 if hasattr(socket, 'SO_REUSEADDR'): 768 if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1: 769 raise TestFailed("tests should never set the SO_REUSEADDR " \ 770 "socket option on TCP/IP sockets!") 771 if hasattr(socket, 'SO_REUSEPORT'): 772 try: 773 if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1: 774 raise TestFailed("tests should never set the SO_REUSEPORT " \ 775 "socket option on TCP/IP sockets!") 776 except OSError: 777 # Python's socket module was compiled using modern headers 778 # thus defining SO_REUSEPORT but this process is running 779 # under an older kernel that does not support SO_REUSEPORT. 780 pass 781 if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'): 782 sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1) 783 784 sock.bind((host, 0)) 785 port = sock.getsockname()[1] 786 return port 787 788def bind_unix_socket(sock, addr): 789 """Bind a unix socket, raising SkipTest if PermissionError is raised.""" 790 assert sock.family == socket.AF_UNIX 791 try: 792 sock.bind(addr) 793 except PermissionError: 794 sock.close() 795 raise unittest.SkipTest('cannot bind AF_UNIX sockets') 796 797def _is_ipv6_enabled(): 798 """Check whether IPv6 is enabled on this host.""" 799 if socket.has_ipv6: 800 sock = None 801 try: 802 sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) 803 sock.bind((HOSTv6, 0)) 804 return True 805 except OSError: 806 pass 807 finally: 808 if sock: 809 sock.close() 810 return False 811 812IPV6_ENABLED = _is_ipv6_enabled() 813 814def system_must_validate_cert(f): 815 """Skip the test on TLS certificate validation failures.""" 816 @functools.wraps(f) 817 def dec(*args, **kwargs): 818 try: 819 f(*args, **kwargs) 820 except OSError as e: 821 if "CERTIFICATE_VERIFY_FAILED" in str(e): 822 raise unittest.SkipTest("system does not contain " 823 "necessary certificates") 824 raise 825 return dec 826 827# A constant likely larger than the underlying OS pipe buffer size, to 828# make writes blocking. 829# Windows limit seems to be around 512 B, and many Unix kernels have a 830# 64 KiB pipe buffer size or 16 * PAGE_SIZE: take a few megs to be sure. 831# (see issue #17835 for a discussion of this number). 832PIPE_MAX_SIZE = 4 * 1024 * 1024 + 1 833 834# A constant likely larger than the underlying OS socket buffer size, to make 835# writes blocking. 836# The socket buffer sizes can usually be tuned system-wide (e.g. through sysctl 837# on Linux), or on a per-socket basis (SO_SNDBUF/SO_RCVBUF). See issue #18643 838# for a discussion of this number). 839SOCK_MAX_SIZE = 16 * 1024 * 1024 + 1 840 841# decorator for skipping tests on non-IEEE 754 platforms 842requires_IEEE_754 = unittest.skipUnless( 843 float.__getformat__("double").startswith("IEEE"), 844 "test requires IEEE 754 doubles") 845 846requires_zlib = unittest.skipUnless(zlib, 'requires zlib') 847 848requires_gzip = unittest.skipUnless(gzip, 'requires gzip') 849 850requires_bz2 = unittest.skipUnless(bz2, 'requires bz2') 851 852requires_lzma = unittest.skipUnless(lzma, 'requires lzma') 853 854is_jython = sys.platform.startswith('java') 855 856is_android = hasattr(sys, 'getandroidapilevel') 857 858if sys.platform != 'win32': 859 unix_shell = '/system/bin/sh' if is_android else '/bin/sh' 860else: 861 unix_shell = None 862 863# Filename used for testing 864if os.name == 'java': 865 # Jython disallows @ in module names 866 TESTFN = '$test' 867else: 868 TESTFN = '@test' 869 870# Disambiguate TESTFN for parallel testing, while letting it remain a valid 871# module name. 872TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid()) 873 874# Define the URL of a dedicated HTTP server for the network tests. 875# The URL must use clear-text HTTP: no redirection to encrypted HTTPS. 876TEST_HTTP_URL = "http://www.pythontest.net" 877 878# FS_NONASCII: non-ASCII character encodable by os.fsencode(), 879# or None if there is no such character. 880FS_NONASCII = None 881for character in ( 882 # First try printable and common characters to have a readable filename. 883 # For each character, the encoding list are just example of encodings able 884 # to encode the character (the list is not exhaustive). 885 886 # U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1 887 '\u00E6', 888 # U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3 889 '\u0130', 890 # U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257 891 '\u0141', 892 # U+03C6 (Greek Small Letter Phi): cp1253 893 '\u03C6', 894 # U+041A (Cyrillic Capital Letter Ka): cp1251 895 '\u041A', 896 # U+05D0 (Hebrew Letter Alef): Encodable to cp424 897 '\u05D0', 898 # U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic 899 '\u060C', 900 # U+062A (Arabic Letter Teh): cp720 901 '\u062A', 902 # U+0E01 (Thai Character Ko Kai): cp874 903 '\u0E01', 904 905 # Then try more "special" characters. "special" because they may be 906 # interpreted or displayed differently depending on the exact locale 907 # encoding and the font. 908 909 # U+00A0 (No-Break Space) 910 '\u00A0', 911 # U+20AC (Euro Sign) 912 '\u20AC', 913): 914 try: 915 # If Python is set up to use the legacy 'mbcs' in Windows, 916 # 'replace' error mode is used, and encode() returns b'?' 917 # for characters missing in the ANSI codepage 918 if os.fsdecode(os.fsencode(character)) != character: 919 raise UnicodeError 920 except UnicodeError: 921 pass 922 else: 923 FS_NONASCII = character 924 break 925 926# TESTFN_UNICODE is a non-ascii filename 927TESTFN_UNICODE = TESTFN + "-\xe0\xf2\u0258\u0141\u011f" 928if sys.platform == 'darwin': 929 # In Mac OS X's VFS API file names are, by definition, canonically 930 # decomposed Unicode, encoded using UTF-8. See QA1173: 931 # http://developer.apple.com/mac/library/qa/qa2001/qa1173.html 932 import unicodedata 933 TESTFN_UNICODE = unicodedata.normalize('NFD', TESTFN_UNICODE) 934TESTFN_ENCODING = sys.getfilesystemencoding() 935 936# TESTFN_UNENCODABLE is a filename (str type) that should *not* be able to be 937# encoded by the filesystem encoding (in strict mode). It can be None if we 938# cannot generate such filename. 939TESTFN_UNENCODABLE = None 940if os.name == 'nt': 941 # skip win32s (0) or Windows 9x/ME (1) 942 if sys.getwindowsversion().platform >= 2: 943 # Different kinds of characters from various languages to minimize the 944 # probability that the whole name is encodable to MBCS (issue #9819) 945 TESTFN_UNENCODABLE = TESTFN + "-\u5171\u0141\u2661\u0363\uDC80" 946 try: 947 TESTFN_UNENCODABLE.encode(TESTFN_ENCODING) 948 except UnicodeEncodeError: 949 pass 950 else: 951 print('WARNING: The filename %r CAN be encoded by the filesystem encoding (%s). ' 952 'Unicode filename tests may not be effective' 953 % (TESTFN_UNENCODABLE, TESTFN_ENCODING)) 954 TESTFN_UNENCODABLE = None 955# Mac OS X denies unencodable filenames (invalid utf-8) 956elif sys.platform != 'darwin': 957 try: 958 # ascii and utf-8 cannot encode the byte 0xff 959 b'\xff'.decode(TESTFN_ENCODING) 960 except UnicodeDecodeError: 961 # 0xff will be encoded using the surrogate character u+DCFF 962 TESTFN_UNENCODABLE = TESTFN \ 963 + b'-\xff'.decode(TESTFN_ENCODING, 'surrogateescape') 964 else: 965 # File system encoding (eg. ISO-8859-* encodings) can encode 966 # the byte 0xff. Skip some unicode filename tests. 967 pass 968 969# TESTFN_UNDECODABLE is a filename (bytes type) that should *not* be able to be 970# decoded from the filesystem encoding (in strict mode). It can be None if we 971# cannot generate such filename (ex: the latin1 encoding can decode any byte 972# sequence). On UNIX, TESTFN_UNDECODABLE can be decoded by os.fsdecode() thanks 973# to the surrogateescape error handler (PEP 383), but not from the filesystem 974# encoding in strict mode. 975TESTFN_UNDECODABLE = None 976for name in ( 977 # b'\xff' is not decodable by os.fsdecode() with code page 932. Windows 978 # accepts it to create a file or a directory, or don't accept to enter to 979 # such directory (when the bytes name is used). So test b'\xe7' first: it is 980 # not decodable from cp932. 981 b'\xe7w\xf0', 982 # undecodable from ASCII, UTF-8 983 b'\xff', 984 # undecodable from iso8859-3, iso8859-6, iso8859-7, cp424, iso8859-8, cp856 985 # and cp857 986 b'\xae\xd5' 987 # undecodable from UTF-8 (UNIX and Mac OS X) 988 b'\xed\xb2\x80', b'\xed\xb4\x80', 989 # undecodable from shift_jis, cp869, cp874, cp932, cp1250, cp1251, cp1252, 990 # cp1253, cp1254, cp1255, cp1257, cp1258 991 b'\x81\x98', 992): 993 try: 994 name.decode(TESTFN_ENCODING) 995 except UnicodeDecodeError: 996 TESTFN_UNDECODABLE = os.fsencode(TESTFN) + name 997 break 998 999if FS_NONASCII: 1000 TESTFN_NONASCII = TESTFN + '-' + FS_NONASCII 1001else: 1002 TESTFN_NONASCII = None 1003 1004# Save the initial cwd 1005SAVEDCWD = os.getcwd() 1006 1007# Set by libregrtest/main.py so we can skip tests that are not 1008# useful for PGO 1009PGO = False 1010 1011# Set by libregrtest/main.py if we are running the extended (time consuming) 1012# PGO task. If this is True, PGO is also True. 1013PGO_EXTENDED = False 1014 1015@contextlib.contextmanager 1016def temp_dir(path=None, quiet=False): 1017 """Return a context manager that creates a temporary directory. 1018 1019 Arguments: 1020 1021 path: the directory to create temporarily. If omitted or None, 1022 defaults to creating a temporary directory using tempfile.mkdtemp. 1023 1024 quiet: if False (the default), the context manager raises an exception 1025 on error. Otherwise, if the path is specified and cannot be 1026 created, only a warning is issued. 1027 1028 """ 1029 dir_created = False 1030 if path is None: 1031 path = tempfile.mkdtemp() 1032 dir_created = True 1033 path = os.path.realpath(path) 1034 else: 1035 try: 1036 os.mkdir(path) 1037 dir_created = True 1038 except OSError as exc: 1039 if not quiet: 1040 raise 1041 warnings.warn(f'tests may fail, unable to create ' 1042 f'temporary directory {path!r}: {exc}', 1043 RuntimeWarning, stacklevel=3) 1044 if dir_created: 1045 pid = os.getpid() 1046 try: 1047 yield path 1048 finally: 1049 # In case the process forks, let only the parent remove the 1050 # directory. The child has a different process id. (bpo-30028) 1051 if dir_created and pid == os.getpid(): 1052 rmtree(path) 1053 1054@contextlib.contextmanager 1055def change_cwd(path, quiet=False): 1056 """Return a context manager that changes the current working directory. 1057 1058 Arguments: 1059 1060 path: the directory to use as the temporary current working directory. 1061 1062 quiet: if False (the default), the context manager raises an exception 1063 on error. Otherwise, it issues only a warning and keeps the current 1064 working directory the same. 1065 1066 """ 1067 saved_dir = os.getcwd() 1068 try: 1069 os.chdir(os.path.realpath(path)) 1070 except OSError as exc: 1071 if not quiet: 1072 raise 1073 warnings.warn(f'tests may fail, unable to change the current working ' 1074 f'directory to {path!r}: {exc}', 1075 RuntimeWarning, stacklevel=3) 1076 try: 1077 yield os.getcwd() 1078 finally: 1079 os.chdir(saved_dir) 1080 1081 1082@contextlib.contextmanager 1083def temp_cwd(name='tempcwd', quiet=False): 1084 """ 1085 Context manager that temporarily creates and changes the CWD. 1086 1087 The function temporarily changes the current working directory 1088 after creating a temporary directory in the current directory with 1089 name *name*. If *name* is None, the temporary directory is 1090 created using tempfile.mkdtemp. 1091 1092 If *quiet* is False (default) and it is not possible to 1093 create or change the CWD, an error is raised. If *quiet* is True, 1094 only a warning is raised and the original CWD is used. 1095 1096 """ 1097 with temp_dir(path=name, quiet=quiet) as temp_path: 1098 with change_cwd(temp_path, quiet=quiet) as cwd_dir: 1099 yield cwd_dir 1100 1101if hasattr(os, "umask"): 1102 @contextlib.contextmanager 1103 def temp_umask(umask): 1104 """Context manager that temporarily sets the process umask.""" 1105 oldmask = os.umask(umask) 1106 try: 1107 yield 1108 finally: 1109 os.umask(oldmask) 1110 1111# TEST_HOME_DIR refers to the top level directory of the "test" package 1112# that contains Python's regression test suite 1113TEST_SUPPORT_DIR = os.path.dirname(os.path.abspath(__file__)) 1114TEST_HOME_DIR = os.path.dirname(TEST_SUPPORT_DIR) 1115 1116# TEST_DATA_DIR is used as a target download location for remote resources 1117TEST_DATA_DIR = os.path.join(TEST_HOME_DIR, "data") 1118 1119def findfile(filename, subdir=None): 1120 """Try to find a file on sys.path or in the test directory. If it is not 1121 found the argument passed to the function is returned (this does not 1122 necessarily signal failure; could still be the legitimate path). 1123 1124 Setting *subdir* indicates a relative path to use to find the file 1125 rather than looking directly in the path directories. 1126 """ 1127 if os.path.isabs(filename): 1128 return filename 1129 if subdir is not None: 1130 filename = os.path.join(subdir, filename) 1131 path = [TEST_HOME_DIR] + sys.path 1132 for dn in path: 1133 fn = os.path.join(dn, filename) 1134 if os.path.exists(fn): return fn 1135 return filename 1136 1137def create_empty_file(filename): 1138 """Create an empty file. If the file already exists, truncate it.""" 1139 fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC) 1140 os.close(fd) 1141 1142def sortdict(dict): 1143 "Like repr(dict), but in sorted order." 1144 items = sorted(dict.items()) 1145 reprpairs = ["%r: %r" % pair for pair in items] 1146 withcommas = ", ".join(reprpairs) 1147 return "{%s}" % withcommas 1148 1149def make_bad_fd(): 1150 """ 1151 Create an invalid file descriptor by opening and closing a file and return 1152 its fd. 1153 """ 1154 file = open(TESTFN, "wb") 1155 try: 1156 return file.fileno() 1157 finally: 1158 file.close() 1159 unlink(TESTFN) 1160 1161 1162def check_syntax_error(testcase, statement, errtext='', *, lineno=None, offset=None): 1163 with testcase.assertRaisesRegex(SyntaxError, errtext) as cm: 1164 compile(statement, '<test string>', 'exec') 1165 err = cm.exception 1166 testcase.assertIsNotNone(err.lineno) 1167 if lineno is not None: 1168 testcase.assertEqual(err.lineno, lineno) 1169 testcase.assertIsNotNone(err.offset) 1170 if offset is not None: 1171 testcase.assertEqual(err.offset, offset) 1172 1173def check_syntax_warning(testcase, statement, errtext='', *, lineno=1, offset=None): 1174 # Test also that a warning is emitted only once. 1175 with warnings.catch_warnings(record=True) as warns: 1176 warnings.simplefilter('always', SyntaxWarning) 1177 compile(statement, '<testcase>', 'exec') 1178 testcase.assertEqual(len(warns), 1, warns) 1179 1180 warn, = warns 1181 testcase.assertTrue(issubclass(warn.category, SyntaxWarning), warn.category) 1182 if errtext: 1183 testcase.assertRegex(str(warn.message), errtext) 1184 testcase.assertEqual(warn.filename, '<testcase>') 1185 testcase.assertIsNotNone(warn.lineno) 1186 if lineno is not None: 1187 testcase.assertEqual(warn.lineno, lineno) 1188 1189 # SyntaxWarning should be converted to SyntaxError when raised, 1190 # since the latter contains more information and provides better 1191 # error report. 1192 with warnings.catch_warnings(record=True) as warns: 1193 warnings.simplefilter('error', SyntaxWarning) 1194 check_syntax_error(testcase, statement, errtext, 1195 lineno=lineno, offset=offset) 1196 # No warnings are leaked when a SyntaxError is raised. 1197 testcase.assertEqual(warns, []) 1198 1199 1200def open_urlresource(url, *args, **kw): 1201 import urllib.request, urllib.parse 1202 1203 check = kw.pop('check', None) 1204 1205 filename = urllib.parse.urlparse(url)[2].split('/')[-1] # '/': it's URL! 1206 1207 fn = os.path.join(TEST_DATA_DIR, filename) 1208 1209 def check_valid_file(fn): 1210 f = open(fn, *args, **kw) 1211 if check is None: 1212 return f 1213 elif check(f): 1214 f.seek(0) 1215 return f 1216 f.close() 1217 1218 if os.path.exists(fn): 1219 f = check_valid_file(fn) 1220 if f is not None: 1221 return f 1222 unlink(fn) 1223 1224 # Verify the requirement before downloading the file 1225 requires('urlfetch') 1226 1227 if verbose: 1228 print('\tfetching %s ...' % url, file=get_original_stdout()) 1229 opener = urllib.request.build_opener() 1230 if gzip: 1231 opener.addheaders.append(('Accept-Encoding', 'gzip')) 1232 f = opener.open(url, timeout=15) 1233 if gzip and f.headers.get('Content-Encoding') == 'gzip': 1234 f = gzip.GzipFile(fileobj=f) 1235 try: 1236 with open(fn, "wb") as out: 1237 s = f.read() 1238 while s: 1239 out.write(s) 1240 s = f.read() 1241 finally: 1242 f.close() 1243 1244 f = check_valid_file(fn) 1245 if f is not None: 1246 return f 1247 raise TestFailed('invalid resource %r' % fn) 1248 1249 1250class WarningsRecorder(object): 1251 """Convenience wrapper for the warnings list returned on 1252 entry to the warnings.catch_warnings() context manager. 1253 """ 1254 def __init__(self, warnings_list): 1255 self._warnings = warnings_list 1256 self._last = 0 1257 1258 def __getattr__(self, attr): 1259 if len(self._warnings) > self._last: 1260 return getattr(self._warnings[-1], attr) 1261 elif attr in warnings.WarningMessage._WARNING_DETAILS: 1262 return None 1263 raise AttributeError("%r has no attribute %r" % (self, attr)) 1264 1265 @property 1266 def warnings(self): 1267 return self._warnings[self._last:] 1268 1269 def reset(self): 1270 self._last = len(self._warnings) 1271 1272 1273def _filterwarnings(filters, quiet=False): 1274 """Catch the warnings, then check if all the expected 1275 warnings have been raised and re-raise unexpected warnings. 1276 If 'quiet' is True, only re-raise the unexpected warnings. 1277 """ 1278 # Clear the warning registry of the calling module 1279 # in order to re-raise the warnings. 1280 frame = sys._getframe(2) 1281 registry = frame.f_globals.get('__warningregistry__') 1282 if registry: 1283 registry.clear() 1284 with warnings.catch_warnings(record=True) as w: 1285 # Set filter "always" to record all warnings. Because 1286 # test_warnings swap the module, we need to look up in 1287 # the sys.modules dictionary. 1288 sys.modules['warnings'].simplefilter("always") 1289 yield WarningsRecorder(w) 1290 # Filter the recorded warnings 1291 reraise = list(w) 1292 missing = [] 1293 for msg, cat in filters: 1294 seen = False 1295 for w in reraise[:]: 1296 warning = w.message 1297 # Filter out the matching messages 1298 if (re.match(msg, str(warning), re.I) and 1299 issubclass(warning.__class__, cat)): 1300 seen = True 1301 reraise.remove(w) 1302 if not seen and not quiet: 1303 # This filter caught nothing 1304 missing.append((msg, cat.__name__)) 1305 if reraise: 1306 raise AssertionError("unhandled warning %s" % reraise[0]) 1307 if missing: 1308 raise AssertionError("filter (%r, %s) did not catch any warning" % 1309 missing[0]) 1310 1311 1312@contextlib.contextmanager 1313def check_warnings(*filters, **kwargs): 1314 """Context manager to silence warnings. 1315 1316 Accept 2-tuples as positional arguments: 1317 ("message regexp", WarningCategory) 1318 1319 Optional argument: 1320 - if 'quiet' is True, it does not fail if a filter catches nothing 1321 (default True without argument, 1322 default False if some filters are defined) 1323 1324 Without argument, it defaults to: 1325 check_warnings(("", Warning), quiet=True) 1326 """ 1327 quiet = kwargs.get('quiet') 1328 if not filters: 1329 filters = (("", Warning),) 1330 # Preserve backward compatibility 1331 if quiet is None: 1332 quiet = True 1333 return _filterwarnings(filters, quiet) 1334 1335 1336@contextlib.contextmanager 1337def check_no_warnings(testcase, message='', category=Warning, force_gc=False): 1338 """Context manager to check that no warnings are emitted. 1339 1340 This context manager enables a given warning within its scope 1341 and checks that no warnings are emitted even with that warning 1342 enabled. 1343 1344 If force_gc is True, a garbage collection is attempted before checking 1345 for warnings. This may help to catch warnings emitted when objects 1346 are deleted, such as ResourceWarning. 1347 1348 Other keyword arguments are passed to warnings.filterwarnings(). 1349 """ 1350 with warnings.catch_warnings(record=True) as warns: 1351 warnings.filterwarnings('always', 1352 message=message, 1353 category=category) 1354 yield 1355 if force_gc: 1356 gc_collect() 1357 testcase.assertEqual(warns, []) 1358 1359 1360@contextlib.contextmanager 1361def check_no_resource_warning(testcase): 1362 """Context manager to check that no ResourceWarning is emitted. 1363 1364 Usage: 1365 1366 with check_no_resource_warning(self): 1367 f = open(...) 1368 ... 1369 del f 1370 1371 You must remove the object which may emit ResourceWarning before 1372 the end of the context manager. 1373 """ 1374 with check_no_warnings(testcase, category=ResourceWarning, force_gc=True): 1375 yield 1376 1377 1378class CleanImport(object): 1379 """Context manager to force import to return a new module reference. 1380 1381 This is useful for testing module-level behaviours, such as 1382 the emission of a DeprecationWarning on import. 1383 1384 Use like this: 1385 1386 with CleanImport("foo"): 1387 importlib.import_module("foo") # new reference 1388 """ 1389 1390 def __init__(self, *module_names): 1391 self.original_modules = sys.modules.copy() 1392 for module_name in module_names: 1393 if module_name in sys.modules: 1394 module = sys.modules[module_name] 1395 # It is possible that module_name is just an alias for 1396 # another module (e.g. stub for modules renamed in 3.x). 1397 # In that case, we also need delete the real module to clear 1398 # the import cache. 1399 if module.__name__ != module_name: 1400 del sys.modules[module.__name__] 1401 del sys.modules[module_name] 1402 1403 def __enter__(self): 1404 return self 1405 1406 def __exit__(self, *ignore_exc): 1407 sys.modules.update(self.original_modules) 1408 1409 1410class EnvironmentVarGuard(collections.abc.MutableMapping): 1411 1412 """Class to help protect the environment variable properly. Can be used as 1413 a context manager.""" 1414 1415 def __init__(self): 1416 self._environ = os.environ 1417 self._changed = {} 1418 1419 def __getitem__(self, envvar): 1420 return self._environ[envvar] 1421 1422 def __setitem__(self, envvar, value): 1423 # Remember the initial value on the first access 1424 if envvar not in self._changed: 1425 self._changed[envvar] = self._environ.get(envvar) 1426 self._environ[envvar] = value 1427 1428 def __delitem__(self, envvar): 1429 # Remember the initial value on the first access 1430 if envvar not in self._changed: 1431 self._changed[envvar] = self._environ.get(envvar) 1432 if envvar in self._environ: 1433 del self._environ[envvar] 1434 1435 def keys(self): 1436 return self._environ.keys() 1437 1438 def __iter__(self): 1439 return iter(self._environ) 1440 1441 def __len__(self): 1442 return len(self._environ) 1443 1444 def set(self, envvar, value): 1445 self[envvar] = value 1446 1447 def unset(self, envvar): 1448 del self[envvar] 1449 1450 def __enter__(self): 1451 return self 1452 1453 def __exit__(self, *ignore_exc): 1454 for (k, v) in self._changed.items(): 1455 if v is None: 1456 if k in self._environ: 1457 del self._environ[k] 1458 else: 1459 self._environ[k] = v 1460 os.environ = self._environ 1461 1462 1463class DirsOnSysPath(object): 1464 """Context manager to temporarily add directories to sys.path. 1465 1466 This makes a copy of sys.path, appends any directories given 1467 as positional arguments, then reverts sys.path to the copied 1468 settings when the context ends. 1469 1470 Note that *all* sys.path modifications in the body of the 1471 context manager, including replacement of the object, 1472 will be reverted at the end of the block. 1473 """ 1474 1475 def __init__(self, *paths): 1476 self.original_value = sys.path[:] 1477 self.original_object = sys.path 1478 sys.path.extend(paths) 1479 1480 def __enter__(self): 1481 return self 1482 1483 def __exit__(self, *ignore_exc): 1484 sys.path = self.original_object 1485 sys.path[:] = self.original_value 1486 1487 1488class TransientResource(object): 1489 1490 """Raise ResourceDenied if an exception is raised while the context manager 1491 is in effect that matches the specified exception and attributes.""" 1492 1493 def __init__(self, exc, **kwargs): 1494 self.exc = exc 1495 self.attrs = kwargs 1496 1497 def __enter__(self): 1498 return self 1499 1500 def __exit__(self, type_=None, value=None, traceback=None): 1501 """If type_ is a subclass of self.exc and value has attributes matching 1502 self.attrs, raise ResourceDenied. Otherwise let the exception 1503 propagate (if any).""" 1504 if type_ is not None and issubclass(self.exc, type_): 1505 for attr, attr_value in self.attrs.items(): 1506 if not hasattr(value, attr): 1507 break 1508 if getattr(value, attr) != attr_value: 1509 break 1510 else: 1511 raise ResourceDenied("an optional resource is not available") 1512 1513# Context managers that raise ResourceDenied when various issues 1514# with the Internet connection manifest themselves as exceptions. 1515# XXX deprecate these and use transient_internet() instead 1516time_out = TransientResource(OSError, errno=errno.ETIMEDOUT) 1517socket_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET) 1518ioerror_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET) 1519 1520 1521def get_socket_conn_refused_errs(): 1522 """ 1523 Get the different socket error numbers ('errno') which can be received 1524 when a connection is refused. 1525 """ 1526 errors = [errno.ECONNREFUSED] 1527 if hasattr(errno, 'ENETUNREACH'): 1528 # On Solaris, ENETUNREACH is returned sometimes instead of ECONNREFUSED 1529 errors.append(errno.ENETUNREACH) 1530 if hasattr(errno, 'EADDRNOTAVAIL'): 1531 # bpo-31910: socket.create_connection() fails randomly 1532 # with EADDRNOTAVAIL on Travis CI 1533 errors.append(errno.EADDRNOTAVAIL) 1534 if hasattr(errno, 'EHOSTUNREACH'): 1535 # bpo-37583: The destination host cannot be reached 1536 errors.append(errno.EHOSTUNREACH) 1537 if not IPV6_ENABLED: 1538 errors.append(errno.EAFNOSUPPORT) 1539 return errors 1540 1541 1542@contextlib.contextmanager 1543def transient_internet(resource_name, *, timeout=30.0, errnos=()): 1544 """Return a context manager that raises ResourceDenied when various issues 1545 with the Internet connection manifest themselves as exceptions.""" 1546 default_errnos = [ 1547 ('ECONNREFUSED', 111), 1548 ('ECONNRESET', 104), 1549 ('EHOSTUNREACH', 113), 1550 ('ENETUNREACH', 101), 1551 ('ETIMEDOUT', 110), 1552 # socket.create_connection() fails randomly with 1553 # EADDRNOTAVAIL on Travis CI. 1554 ('EADDRNOTAVAIL', 99), 1555 ] 1556 default_gai_errnos = [ 1557 ('EAI_AGAIN', -3), 1558 ('EAI_FAIL', -4), 1559 ('EAI_NONAME', -2), 1560 ('EAI_NODATA', -5), 1561 # Encountered when trying to resolve IPv6-only hostnames 1562 ('WSANO_DATA', 11004), 1563 ] 1564 1565 denied = ResourceDenied("Resource %r is not available" % resource_name) 1566 captured_errnos = errnos 1567 gai_errnos = [] 1568 if not captured_errnos: 1569 captured_errnos = [getattr(errno, name, num) 1570 for (name, num) in default_errnos] 1571 gai_errnos = [getattr(socket, name, num) 1572 for (name, num) in default_gai_errnos] 1573 1574 def filter_error(err): 1575 n = getattr(err, 'errno', None) 1576 if (isinstance(err, socket.timeout) or 1577 (isinstance(err, socket.gaierror) and n in gai_errnos) or 1578 (isinstance(err, urllib.error.HTTPError) and 1579 500 <= err.code <= 599) or 1580 (isinstance(err, urllib.error.URLError) and 1581 (("ConnectionRefusedError" in err.reason) or 1582 ("TimeoutError" in err.reason) or 1583 ("EOFError" in err.reason))) or 1584 n in captured_errnos): 1585 if not verbose: 1586 sys.stderr.write(denied.args[0] + "\n") 1587 raise denied from err 1588 1589 old_timeout = socket.getdefaulttimeout() 1590 try: 1591 if timeout is not None: 1592 socket.setdefaulttimeout(timeout) 1593 yield 1594 except nntplib.NNTPTemporaryError as err: 1595 if verbose: 1596 sys.stderr.write(denied.args[0] + "\n") 1597 raise denied from err 1598 except OSError as err: 1599 # urllib can wrap original socket errors multiple times (!), we must 1600 # unwrap to get at the original error. 1601 while True: 1602 a = err.args 1603 if len(a) >= 1 and isinstance(a[0], OSError): 1604 err = a[0] 1605 # The error can also be wrapped as args[1]: 1606 # except socket.error as msg: 1607 # raise OSError('socket error', msg).with_traceback(sys.exc_info()[2]) 1608 elif len(a) >= 2 and isinstance(a[1], OSError): 1609 err = a[1] 1610 else: 1611 break 1612 filter_error(err) 1613 raise 1614 # XXX should we catch generic exceptions and look for their 1615 # __cause__ or __context__? 1616 finally: 1617 socket.setdefaulttimeout(old_timeout) 1618 1619 1620@contextlib.contextmanager 1621def captured_output(stream_name): 1622 """Return a context manager used by captured_stdout/stdin/stderr 1623 that temporarily replaces the sys stream *stream_name* with a StringIO.""" 1624 import io 1625 orig_stdout = getattr(sys, stream_name) 1626 setattr(sys, stream_name, io.StringIO()) 1627 try: 1628 yield getattr(sys, stream_name) 1629 finally: 1630 setattr(sys, stream_name, orig_stdout) 1631 1632def captured_stdout(): 1633 """Capture the output of sys.stdout: 1634 1635 with captured_stdout() as stdout: 1636 print("hello") 1637 self.assertEqual(stdout.getvalue(), "hello\\n") 1638 """ 1639 return captured_output("stdout") 1640 1641def captured_stderr(): 1642 """Capture the output of sys.stderr: 1643 1644 with captured_stderr() as stderr: 1645 print("hello", file=sys.stderr) 1646 self.assertEqual(stderr.getvalue(), "hello\\n") 1647 """ 1648 return captured_output("stderr") 1649 1650def captured_stdin(): 1651 """Capture the input to sys.stdin: 1652 1653 with captured_stdin() as stdin: 1654 stdin.write('hello\\n') 1655 stdin.seek(0) 1656 # call test code that consumes from sys.stdin 1657 captured = input() 1658 self.assertEqual(captured, "hello") 1659 """ 1660 return captured_output("stdin") 1661 1662 1663def gc_collect(): 1664 """Force as many objects as possible to be collected. 1665 1666 In non-CPython implementations of Python, this is needed because timely 1667 deallocation is not guaranteed by the garbage collector. (Even in CPython 1668 this can be the case in case of reference cycles.) This means that __del__ 1669 methods may be called later than expected and weakrefs may remain alive for 1670 longer than expected. This function tries its best to force all garbage 1671 objects to disappear. 1672 """ 1673 gc.collect() 1674 if is_jython: 1675 time.sleep(0.1) 1676 gc.collect() 1677 gc.collect() 1678 1679@contextlib.contextmanager 1680def disable_gc(): 1681 have_gc = gc.isenabled() 1682 gc.disable() 1683 try: 1684 yield 1685 finally: 1686 if have_gc: 1687 gc.enable() 1688 1689 1690def python_is_optimized(): 1691 """Find if Python was built with optimizations.""" 1692 cflags = sysconfig.get_config_var('PY_CFLAGS') or '' 1693 final_opt = "" 1694 for opt in cflags.split(): 1695 if opt.startswith('-O'): 1696 final_opt = opt 1697 return final_opt not in ('', '-O0', '-Og') 1698 1699 1700_header = 'nP' 1701_align = '0n' 1702if hasattr(sys, "getobjects"): 1703 _header = '2P' + _header 1704 _align = '0P' 1705_vheader = _header + 'n' 1706 1707def calcobjsize(fmt): 1708 return struct.calcsize(_header + fmt + _align) 1709 1710def calcvobjsize(fmt): 1711 return struct.calcsize(_vheader + fmt + _align) 1712 1713 1714_TPFLAGS_HAVE_GC = 1<<14 1715_TPFLAGS_HEAPTYPE = 1<<9 1716 1717def check_sizeof(test, o, size): 1718 import _testcapi 1719 result = sys.getsizeof(o) 1720 # add GC header size 1721 if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\ 1722 ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))): 1723 size += _testcapi.SIZEOF_PYGC_HEAD 1724 msg = 'wrong size for %s: got %d, expected %d' \ 1725 % (type(o), result, size) 1726 test.assertEqual(result, size, msg) 1727 1728#======================================================================= 1729# Decorator for running a function in a different locale, correctly resetting 1730# it afterwards. 1731 1732def run_with_locale(catstr, *locales): 1733 def decorator(func): 1734 def inner(*args, **kwds): 1735 try: 1736 import locale 1737 category = getattr(locale, catstr) 1738 orig_locale = locale.setlocale(category) 1739 except AttributeError: 1740 # if the test author gives us an invalid category string 1741 raise 1742 except: 1743 # cannot retrieve original locale, so do nothing 1744 locale = orig_locale = None 1745 else: 1746 for loc in locales: 1747 try: 1748 locale.setlocale(category, loc) 1749 break 1750 except: 1751 pass 1752 1753 # now run the function, resetting the locale on exceptions 1754 try: 1755 return func(*args, **kwds) 1756 finally: 1757 if locale and orig_locale: 1758 locale.setlocale(category, orig_locale) 1759 inner.__name__ = func.__name__ 1760 inner.__doc__ = func.__doc__ 1761 return inner 1762 return decorator 1763 1764#======================================================================= 1765# Decorator for running a function in a specific timezone, correctly 1766# resetting it afterwards. 1767 1768def run_with_tz(tz): 1769 def decorator(func): 1770 def inner(*args, **kwds): 1771 try: 1772 tzset = time.tzset 1773 except AttributeError: 1774 raise unittest.SkipTest("tzset required") 1775 if 'TZ' in os.environ: 1776 orig_tz = os.environ['TZ'] 1777 else: 1778 orig_tz = None 1779 os.environ['TZ'] = tz 1780 tzset() 1781 1782 # now run the function, resetting the tz on exceptions 1783 try: 1784 return func(*args, **kwds) 1785 finally: 1786 if orig_tz is None: 1787 del os.environ['TZ'] 1788 else: 1789 os.environ['TZ'] = orig_tz 1790 time.tzset() 1791 1792 inner.__name__ = func.__name__ 1793 inner.__doc__ = func.__doc__ 1794 return inner 1795 return decorator 1796 1797#======================================================================= 1798# Big-memory-test support. Separate from 'resources' because memory use 1799# should be configurable. 1800 1801# Some handy shorthands. Note that these are used for byte-limits as well 1802# as size-limits, in the various bigmem tests 1803_1M = 1024*1024 1804_1G = 1024 * _1M 1805_2G = 2 * _1G 1806_4G = 4 * _1G 1807 1808MAX_Py_ssize_t = sys.maxsize 1809 1810def set_memlimit(limit): 1811 global max_memuse 1812 global real_max_memuse 1813 sizes = { 1814 'k': 1024, 1815 'm': _1M, 1816 'g': _1G, 1817 't': 1024*_1G, 1818 } 1819 m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit, 1820 re.IGNORECASE | re.VERBOSE) 1821 if m is None: 1822 raise ValueError('Invalid memory limit %r' % (limit,)) 1823 memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()]) 1824 real_max_memuse = memlimit 1825 if memlimit > MAX_Py_ssize_t: 1826 memlimit = MAX_Py_ssize_t 1827 if memlimit < _2G - 1: 1828 raise ValueError('Memory limit %r too low to be useful' % (limit,)) 1829 max_memuse = memlimit 1830 1831class _MemoryWatchdog: 1832 """An object which periodically watches the process' memory consumption 1833 and prints it out. 1834 """ 1835 1836 def __init__(self): 1837 self.procfile = '/proc/{pid}/statm'.format(pid=os.getpid()) 1838 self.started = False 1839 1840 def start(self): 1841 try: 1842 f = open(self.procfile, 'r') 1843 except OSError as e: 1844 warnings.warn('/proc not available for stats: {}'.format(e), 1845 RuntimeWarning) 1846 sys.stderr.flush() 1847 return 1848 1849 with f: 1850 watchdog_script = findfile("memory_watchdog.py") 1851 self.mem_watchdog = subprocess.Popen([sys.executable, watchdog_script], 1852 stdin=f, 1853 stderr=subprocess.DEVNULL) 1854 self.started = True 1855 1856 def stop(self): 1857 if self.started: 1858 self.mem_watchdog.terminate() 1859 self.mem_watchdog.wait() 1860 1861 1862def bigmemtest(size, memuse, dry_run=True): 1863 """Decorator for bigmem tests. 1864 1865 'size' is a requested size for the test (in arbitrary, test-interpreted 1866 units.) 'memuse' is the number of bytes per unit for the test, or a good 1867 estimate of it. For example, a test that needs two byte buffers, of 4 GiB 1868 each, could be decorated with @bigmemtest(size=_4G, memuse=2). 1869 1870 The 'size' argument is normally passed to the decorated test method as an 1871 extra argument. If 'dry_run' is true, the value passed to the test method 1872 may be less than the requested value. If 'dry_run' is false, it means the 1873 test doesn't support dummy runs when -M is not specified. 1874 """ 1875 def decorator(f): 1876 def wrapper(self): 1877 size = wrapper.size 1878 memuse = wrapper.memuse 1879 if not real_max_memuse: 1880 maxsize = 5147 1881 else: 1882 maxsize = size 1883 1884 if ((real_max_memuse or not dry_run) 1885 and real_max_memuse < maxsize * memuse): 1886 raise unittest.SkipTest( 1887 "not enough memory: %.1fG minimum needed" 1888 % (size * memuse / (1024 ** 3))) 1889 1890 if real_max_memuse and verbose: 1891 print() 1892 print(" ... expected peak memory use: {peak:.1f}G" 1893 .format(peak=size * memuse / (1024 ** 3))) 1894 watchdog = _MemoryWatchdog() 1895 watchdog.start() 1896 else: 1897 watchdog = None 1898 1899 try: 1900 return f(self, maxsize) 1901 finally: 1902 if watchdog: 1903 watchdog.stop() 1904 1905 wrapper.size = size 1906 wrapper.memuse = memuse 1907 return wrapper 1908 return decorator 1909 1910def bigaddrspacetest(f): 1911 """Decorator for tests that fill the address space.""" 1912 def wrapper(self): 1913 if max_memuse < MAX_Py_ssize_t: 1914 if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31: 1915 raise unittest.SkipTest( 1916 "not enough memory: try a 32-bit build instead") 1917 else: 1918 raise unittest.SkipTest( 1919 "not enough memory: %.1fG minimum needed" 1920 % (MAX_Py_ssize_t / (1024 ** 3))) 1921 else: 1922 return f(self) 1923 return wrapper 1924 1925#======================================================================= 1926# unittest integration. 1927 1928class BasicTestRunner: 1929 def run(self, test): 1930 result = unittest.TestResult() 1931 test(result) 1932 return result 1933 1934def _id(obj): 1935 return obj 1936 1937def requires_resource(resource): 1938 if resource == 'gui' and not _is_gui_available(): 1939 return unittest.skip(_is_gui_available.reason) 1940 if is_resource_enabled(resource): 1941 return _id 1942 else: 1943 return unittest.skip("resource {0!r} is not enabled".format(resource)) 1944 1945def cpython_only(test): 1946 """ 1947 Decorator for tests only applicable on CPython. 1948 """ 1949 return impl_detail(cpython=True)(test) 1950 1951def impl_detail(msg=None, **guards): 1952 if check_impl_detail(**guards): 1953 return _id 1954 if msg is None: 1955 guardnames, default = _parse_guards(guards) 1956 if default: 1957 msg = "implementation detail not available on {0}" 1958 else: 1959 msg = "implementation detail specific to {0}" 1960 guardnames = sorted(guardnames.keys()) 1961 msg = msg.format(' or '.join(guardnames)) 1962 return unittest.skip(msg) 1963 1964def _parse_guards(guards): 1965 # Returns a tuple ({platform_name: run_me}, default_value) 1966 if not guards: 1967 return ({'cpython': True}, False) 1968 is_true = list(guards.values())[0] 1969 assert list(guards.values()) == [is_true] * len(guards) # all True or all False 1970 return (guards, not is_true) 1971 1972# Use the following check to guard CPython's implementation-specific tests -- 1973# or to run them only on the implementation(s) guarded by the arguments. 1974def check_impl_detail(**guards): 1975 """This function returns True or False depending on the host platform. 1976 Examples: 1977 if check_impl_detail(): # only on CPython (default) 1978 if check_impl_detail(jython=True): # only on Jython 1979 if check_impl_detail(cpython=False): # everywhere except on CPython 1980 """ 1981 guards, default = _parse_guards(guards) 1982 return guards.get(platform.python_implementation().lower(), default) 1983 1984 1985def no_tracing(func): 1986 """Decorator to temporarily turn off tracing for the duration of a test.""" 1987 if not hasattr(sys, 'gettrace'): 1988 return func 1989 else: 1990 @functools.wraps(func) 1991 def wrapper(*args, **kwargs): 1992 original_trace = sys.gettrace() 1993 try: 1994 sys.settrace(None) 1995 return func(*args, **kwargs) 1996 finally: 1997 sys.settrace(original_trace) 1998 return wrapper 1999 2000 2001def refcount_test(test): 2002 """Decorator for tests which involve reference counting. 2003 2004 To start, the decorator does not run the test if is not run by CPython. 2005 After that, any trace function is unset during the test to prevent 2006 unexpected refcounts caused by the trace function. 2007 2008 """ 2009 return no_tracing(cpython_only(test)) 2010 2011 2012def _filter_suite(suite, pred): 2013 """Recursively filter test cases in a suite based on a predicate.""" 2014 newtests = [] 2015 for test in suite._tests: 2016 if isinstance(test, unittest.TestSuite): 2017 _filter_suite(test, pred) 2018 newtests.append(test) 2019 else: 2020 if pred(test): 2021 newtests.append(test) 2022 suite._tests = newtests 2023 2024def _run_suite(suite): 2025 """Run tests from a unittest.TestSuite-derived class.""" 2026 runner = get_test_runner(sys.stdout, 2027 verbosity=verbose, 2028 capture_output=(junit_xml_list is not None)) 2029 2030 result = runner.run(suite) 2031 2032 if junit_xml_list is not None: 2033 junit_xml_list.append(result.get_xml_element()) 2034 2035 if not result.testsRun and not result.skipped: 2036 raise TestDidNotRun 2037 if not result.wasSuccessful(): 2038 if len(result.errors) == 1 and not result.failures: 2039 err = result.errors[0][1] 2040 elif len(result.failures) == 1 and not result.errors: 2041 err = result.failures[0][1] 2042 else: 2043 err = "multiple errors occurred" 2044 if not verbose: err += "; run in verbose mode for details" 2045 raise TestFailed(err) 2046 2047 2048# By default, don't filter tests 2049_match_test_func = None 2050 2051_accept_test_patterns = None 2052_ignore_test_patterns = None 2053 2054 2055def match_test(test): 2056 # Function used by support.run_unittest() and regrtest --list-cases 2057 if _match_test_func is None: 2058 return True 2059 else: 2060 return _match_test_func(test.id()) 2061 2062 2063def _is_full_match_test(pattern): 2064 # If a pattern contains at least one dot, it's considered 2065 # as a full test identifier. 2066 # Example: 'test.test_os.FileTests.test_access'. 2067 # 2068 # ignore patterns which contain fnmatch patterns: '*', '?', '[...]' 2069 # or '[!...]'. For example, ignore 'test_access*'. 2070 return ('.' in pattern) and (not re.search(r'[?*\[\]]', pattern)) 2071 2072 2073def set_match_tests(accept_patterns=None, ignore_patterns=None): 2074 global _match_test_func, _accept_test_patterns, _ignore_test_patterns 2075 2076 2077 if accept_patterns is None: 2078 accept_patterns = () 2079 if ignore_patterns is None: 2080 ignore_patterns = () 2081 2082 accept_func = ignore_func = None 2083 2084 if accept_patterns != _accept_test_patterns: 2085 accept_patterns, accept_func = _compile_match_function(accept_patterns) 2086 if ignore_patterns != _ignore_test_patterns: 2087 ignore_patterns, ignore_func = _compile_match_function(ignore_patterns) 2088 2089 # Create a copy since patterns can be mutable and so modified later 2090 _accept_test_patterns = tuple(accept_patterns) 2091 _ignore_test_patterns = tuple(ignore_patterns) 2092 2093 if accept_func is not None or ignore_func is not None: 2094 def match_function(test_id): 2095 accept = True 2096 ignore = False 2097 if accept_func: 2098 accept = accept_func(test_id) 2099 if ignore_func: 2100 ignore = ignore_func(test_id) 2101 return accept and not ignore 2102 2103 _match_test_func = match_function 2104 2105 2106def _compile_match_function(patterns): 2107 if not patterns: 2108 func = None 2109 # set_match_tests(None) behaves as set_match_tests(()) 2110 patterns = () 2111 elif all(map(_is_full_match_test, patterns)): 2112 # Simple case: all patterns are full test identifier. 2113 # The test.bisect_cmd utility only uses such full test identifiers. 2114 func = set(patterns).__contains__ 2115 else: 2116 regex = '|'.join(map(fnmatch.translate, patterns)) 2117 # The search *is* case sensitive on purpose: 2118 # don't use flags=re.IGNORECASE 2119 regex_match = re.compile(regex).match 2120 2121 def match_test_regex(test_id): 2122 if regex_match(test_id): 2123 # The regex matches the whole identifier, for example 2124 # 'test.test_os.FileTests.test_access'. 2125 return True 2126 else: 2127 # Try to match parts of the test identifier. 2128 # For example, split 'test.test_os.FileTests.test_access' 2129 # into: 'test', 'test_os', 'FileTests' and 'test_access'. 2130 return any(map(regex_match, test_id.split("."))) 2131 2132 func = match_test_regex 2133 2134 return patterns, func 2135 2136 2137def run_unittest(*classes): 2138 """Run tests from unittest.TestCase-derived classes.""" 2139 valid_types = (unittest.TestSuite, unittest.TestCase) 2140 suite = unittest.TestSuite() 2141 for cls in classes: 2142 if isinstance(cls, str): 2143 if cls in sys.modules: 2144 suite.addTest(unittest.findTestCases(sys.modules[cls])) 2145 else: 2146 raise ValueError("str arguments must be keys in sys.modules") 2147 elif isinstance(cls, valid_types): 2148 suite.addTest(cls) 2149 else: 2150 suite.addTest(unittest.makeSuite(cls)) 2151 _filter_suite(suite, match_test) 2152 _run_suite(suite) 2153 2154#======================================================================= 2155# Check for the presence of docstrings. 2156 2157# Rather than trying to enumerate all the cases where docstrings may be 2158# disabled, we just check for that directly 2159 2160def _check_docstrings(): 2161 """Just used to check if docstrings are enabled""" 2162 2163MISSING_C_DOCSTRINGS = (check_impl_detail() and 2164 sys.platform != 'win32' and 2165 not sysconfig.get_config_var('WITH_DOC_STRINGS')) 2166 2167HAVE_DOCSTRINGS = (_check_docstrings.__doc__ is not None and 2168 not MISSING_C_DOCSTRINGS) 2169 2170requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS, 2171 "test requires docstrings") 2172 2173 2174#======================================================================= 2175# doctest driver. 2176 2177def run_doctest(module, verbosity=None, optionflags=0): 2178 """Run doctest on the given module. Return (#failures, #tests). 2179 2180 If optional argument verbosity is not specified (or is None), pass 2181 support's belief about verbosity on to doctest. Else doctest's 2182 usual behavior is used (it searches sys.argv for -v). 2183 """ 2184 2185 import doctest 2186 2187 if verbosity is None: 2188 verbosity = verbose 2189 else: 2190 verbosity = None 2191 2192 f, t = doctest.testmod(module, verbose=verbosity, optionflags=optionflags) 2193 if f: 2194 raise TestFailed("%d of %d doctests failed" % (f, t)) 2195 if verbose: 2196 print('doctest (%s) ... %d tests with zero failures' % 2197 (module.__name__, t)) 2198 return f, t 2199 2200 2201#======================================================================= 2202# Support for saving and restoring the imported modules. 2203 2204def print_warning(msg): 2205 # bpo-39983: Print into sys.__stderr__ to display the warning even 2206 # when sys.stderr is captured temporarily by a test 2207 for line in msg.splitlines(): 2208 print(f"Warning -- {line}", file=sys.__stderr__, flush=True) 2209 2210def modules_setup(): 2211 return sys.modules.copy(), 2212 2213def modules_cleanup(oldmodules): 2214 # Encoders/decoders are registered permanently within the internal 2215 # codec cache. If we destroy the corresponding modules their 2216 # globals will be set to None which will trip up the cached functions. 2217 encodings = [(k, v) for k, v in sys.modules.items() 2218 if k.startswith('encodings.')] 2219 sys.modules.clear() 2220 sys.modules.update(encodings) 2221 # XXX: This kind of problem can affect more than just encodings. In particular 2222 # extension modules (such as _ssl) don't cope with reloading properly. 2223 # Really, test modules should be cleaning out the test specific modules they 2224 # know they added (ala test_runpy) rather than relying on this function (as 2225 # test_importhooks and test_pkg do currently). 2226 # Implicitly imported *real* modules should be left alone (see issue 10556). 2227 sys.modules.update(oldmodules) 2228 2229#======================================================================= 2230# Threading support to prevent reporting refleaks when running regrtest.py -R 2231 2232# Flag used by saved_test_environment of test.libregrtest.save_env, 2233# to check if a test modified the environment. The flag should be set to False 2234# before running a new test. 2235# 2236# For example, threading_cleanup() sets the flag is the function fails 2237# to cleanup threads. 2238environment_altered = False 2239 2240# NOTE: we use thread._count() rather than threading.enumerate() (or the 2241# moral equivalent thereof) because a threading.Thread object is still alive 2242# until its __bootstrap() method has returned, even after it has been 2243# unregistered from the threading module. 2244# thread._count(), on the other hand, only gets decremented *after* the 2245# __bootstrap() method has returned, which gives us reliable reference counts 2246# at the end of a test run. 2247 2248def threading_setup(): 2249 return _thread._count(), threading._dangling.copy() 2250 2251def threading_cleanup(*original_values): 2252 global environment_altered 2253 2254 _MAX_COUNT = 100 2255 2256 for count in range(_MAX_COUNT): 2257 values = _thread._count(), threading._dangling 2258 if values == original_values: 2259 break 2260 2261 if not count: 2262 # Display a warning at the first iteration 2263 environment_altered = True 2264 dangling_threads = values[1] 2265 print_warning(f"threading_cleanup() failed to cleanup " 2266 f"{values[0] - original_values[0]} threads " 2267 f"(count: {values[0]}, " 2268 f"dangling: {len(dangling_threads)})") 2269 for thread in dangling_threads: 2270 print_warning(f"Dangling thread: {thread!r}") 2271 2272 # Don't hold references to threads 2273 dangling_threads = None 2274 values = None 2275 2276 time.sleep(0.01) 2277 gc_collect() 2278 2279 2280def reap_threads(func): 2281 """Use this function when threads are being used. This will 2282 ensure that the threads are cleaned up even when the test fails. 2283 """ 2284 @functools.wraps(func) 2285 def decorator(*args): 2286 key = threading_setup() 2287 try: 2288 return func(*args) 2289 finally: 2290 threading_cleanup(*key) 2291 return decorator 2292 2293 2294@contextlib.contextmanager 2295def wait_threads_exit(timeout=60.0): 2296 """ 2297 bpo-31234: Context manager to wait until all threads created in the with 2298 statement exit. 2299 2300 Use _thread.count() to check if threads exited. Indirectly, wait until 2301 threads exit the internal t_bootstrap() C function of the _thread module. 2302 2303 threading_setup() and threading_cleanup() are designed to emit a warning 2304 if a test leaves running threads in the background. This context manager 2305 is designed to cleanup threads started by the _thread.start_new_thread() 2306 which doesn't allow to wait for thread exit, whereas thread.Thread has a 2307 join() method. 2308 """ 2309 old_count = _thread._count() 2310 try: 2311 yield 2312 finally: 2313 start_time = time.monotonic() 2314 deadline = start_time + timeout 2315 while True: 2316 count = _thread._count() 2317 if count <= old_count: 2318 break 2319 if time.monotonic() > deadline: 2320 dt = time.monotonic() - start_time 2321 msg = (f"wait_threads() failed to cleanup {count - old_count} " 2322 f"threads after {dt:.1f} seconds " 2323 f"(count: {count}, old count: {old_count})") 2324 raise AssertionError(msg) 2325 time.sleep(0.010) 2326 gc_collect() 2327 2328 2329def join_thread(thread, timeout=30.0): 2330 """Join a thread. Raise an AssertionError if the thread is still alive 2331 after timeout seconds. 2332 """ 2333 thread.join(timeout) 2334 if thread.is_alive(): 2335 msg = f"failed to join the thread in {timeout:.1f} seconds" 2336 raise AssertionError(msg) 2337 2338 2339def reap_children(): 2340 """Use this function at the end of test_main() whenever sub-processes 2341 are started. This will help ensure that no extra children (zombies) 2342 stick around to hog resources and create problems when looking 2343 for refleaks. 2344 """ 2345 global environment_altered 2346 2347 # Need os.waitpid(-1, os.WNOHANG): Windows is not supported 2348 if not (hasattr(os, 'waitpid') and hasattr(os, 'WNOHANG')): 2349 return 2350 2351 # Reap all our dead child processes so we don't leave zombies around. 2352 # These hog resources and might be causing some of the buildbots to die. 2353 while True: 2354 try: 2355 # Read the exit status of any child process which already completed 2356 pid, status = os.waitpid(-1, os.WNOHANG) 2357 except OSError: 2358 break 2359 2360 if pid == 0: 2361 break 2362 2363 print_warning(f"reap_children() reaped child process {pid}") 2364 environment_altered = True 2365 2366 2367@contextlib.contextmanager 2368def start_threads(threads, unlock=None): 2369 threads = list(threads) 2370 started = [] 2371 try: 2372 try: 2373 for t in threads: 2374 t.start() 2375 started.append(t) 2376 except: 2377 if verbose: 2378 print("Can't start %d threads, only %d threads started" % 2379 (len(threads), len(started))) 2380 raise 2381 yield 2382 finally: 2383 try: 2384 if unlock: 2385 unlock() 2386 endtime = starttime = time.monotonic() 2387 for timeout in range(1, 16): 2388 endtime += 60 2389 for t in started: 2390 t.join(max(endtime - time.monotonic(), 0.01)) 2391 started = [t for t in started if t.is_alive()] 2392 if not started: 2393 break 2394 if verbose: 2395 print('Unable to join %d threads during a period of ' 2396 '%d minutes' % (len(started), timeout)) 2397 finally: 2398 started = [t for t in started if t.is_alive()] 2399 if started: 2400 faulthandler.dump_traceback(sys.stdout) 2401 raise AssertionError('Unable to join %d threads' % len(started)) 2402 2403@contextlib.contextmanager 2404def swap_attr(obj, attr, new_val): 2405 """Temporary swap out an attribute with a new object. 2406 2407 Usage: 2408 with swap_attr(obj, "attr", 5): 2409 ... 2410 2411 This will set obj.attr to 5 for the duration of the with: block, 2412 restoring the old value at the end of the block. If `attr` doesn't 2413 exist on `obj`, it will be created and then deleted at the end of the 2414 block. 2415 2416 The old value (or None if it doesn't exist) will be assigned to the 2417 target of the "as" clause, if there is one. 2418 """ 2419 if hasattr(obj, attr): 2420 real_val = getattr(obj, attr) 2421 setattr(obj, attr, new_val) 2422 try: 2423 yield real_val 2424 finally: 2425 setattr(obj, attr, real_val) 2426 else: 2427 setattr(obj, attr, new_val) 2428 try: 2429 yield 2430 finally: 2431 if hasattr(obj, attr): 2432 delattr(obj, attr) 2433 2434@contextlib.contextmanager 2435def swap_item(obj, item, new_val): 2436 """Temporary swap out an item with a new object. 2437 2438 Usage: 2439 with swap_item(obj, "item", 5): 2440 ... 2441 2442 This will set obj["item"] to 5 for the duration of the with: block, 2443 restoring the old value at the end of the block. If `item` doesn't 2444 exist on `obj`, it will be created and then deleted at the end of the 2445 block. 2446 2447 The old value (or None if it doesn't exist) will be assigned to the 2448 target of the "as" clause, if there is one. 2449 """ 2450 if item in obj: 2451 real_val = obj[item] 2452 obj[item] = new_val 2453 try: 2454 yield real_val 2455 finally: 2456 obj[item] = real_val 2457 else: 2458 obj[item] = new_val 2459 try: 2460 yield 2461 finally: 2462 if item in obj: 2463 del obj[item] 2464 2465def strip_python_stderr(stderr): 2466 """Strip the stderr of a Python process from potential debug output 2467 emitted by the interpreter. 2468 2469 This will typically be run on the result of the communicate() method 2470 of a subprocess.Popen object. 2471 """ 2472 stderr = re.sub(br"\[\d+ refs, \d+ blocks\]\r?\n?", b"", stderr).strip() 2473 return stderr 2474 2475requires_type_collecting = unittest.skipIf(hasattr(sys, 'getcounts'), 2476 'types are immortal if COUNT_ALLOCS is defined') 2477 2478def args_from_interpreter_flags(): 2479 """Return a list of command-line arguments reproducing the current 2480 settings in sys.flags and sys.warnoptions.""" 2481 return subprocess._args_from_interpreter_flags() 2482 2483def optim_args_from_interpreter_flags(): 2484 """Return a list of command-line arguments reproducing the current 2485 optimization settings in sys.flags.""" 2486 return subprocess._optim_args_from_interpreter_flags() 2487 2488#============================================================ 2489# Support for assertions about logging. 2490#============================================================ 2491 2492class TestHandler(logging.handlers.BufferingHandler): 2493 def __init__(self, matcher): 2494 # BufferingHandler takes a "capacity" argument 2495 # so as to know when to flush. As we're overriding 2496 # shouldFlush anyway, we can set a capacity of zero. 2497 # You can call flush() manually to clear out the 2498 # buffer. 2499 logging.handlers.BufferingHandler.__init__(self, 0) 2500 self.matcher = matcher 2501 2502 def shouldFlush(self): 2503 return False 2504 2505 def emit(self, record): 2506 self.format(record) 2507 self.buffer.append(record.__dict__) 2508 2509 def matches(self, **kwargs): 2510 """ 2511 Look for a saved dict whose keys/values match the supplied arguments. 2512 """ 2513 result = False 2514 for d in self.buffer: 2515 if self.matcher.matches(d, **kwargs): 2516 result = True 2517 break 2518 return result 2519 2520class Matcher(object): 2521 2522 _partial_matches = ('msg', 'message') 2523 2524 def matches(self, d, **kwargs): 2525 """ 2526 Try to match a single dict with the supplied arguments. 2527 2528 Keys whose values are strings and which are in self._partial_matches 2529 will be checked for partial (i.e. substring) matches. You can extend 2530 this scheme to (for example) do regular expression matching, etc. 2531 """ 2532 result = True 2533 for k in kwargs: 2534 v = kwargs[k] 2535 dv = d.get(k) 2536 if not self.match_value(k, dv, v): 2537 result = False 2538 break 2539 return result 2540 2541 def match_value(self, k, dv, v): 2542 """ 2543 Try to match a single stored value (dv) with a supplied value (v). 2544 """ 2545 if type(v) != type(dv): 2546 result = False 2547 elif type(dv) is not str or k not in self._partial_matches: 2548 result = (v == dv) 2549 else: 2550 result = dv.find(v) >= 0 2551 return result 2552 2553 2554_can_symlink = None 2555def can_symlink(): 2556 global _can_symlink 2557 if _can_symlink is not None: 2558 return _can_symlink 2559 symlink_path = TESTFN + "can_symlink" 2560 try: 2561 os.symlink(TESTFN, symlink_path) 2562 can = True 2563 except (OSError, NotImplementedError, AttributeError): 2564 can = False 2565 else: 2566 os.remove(symlink_path) 2567 _can_symlink = can 2568 return can 2569 2570def skip_unless_symlink(test): 2571 """Skip decorator for tests that require functional symlink""" 2572 ok = can_symlink() 2573 msg = "Requires functional symlink implementation" 2574 return test if ok else unittest.skip(msg)(test) 2575 2576_buggy_ucrt = None 2577def skip_if_buggy_ucrt_strfptime(test): 2578 """ 2579 Skip decorator for tests that use buggy strptime/strftime 2580 2581 If the UCRT bugs are present time.localtime().tm_zone will be 2582 an empty string, otherwise we assume the UCRT bugs are fixed 2583 2584 See bpo-37552 [Windows] strptime/strftime return invalid 2585 results with UCRT version 17763.615 2586 """ 2587 global _buggy_ucrt 2588 if _buggy_ucrt is None: 2589 if(sys.platform == 'win32' and 2590 locale.getdefaultlocale()[1] == 'cp65001' and 2591 time.localtime().tm_zone == ''): 2592 _buggy_ucrt = True 2593 else: 2594 _buggy_ucrt = False 2595 return unittest.skip("buggy MSVC UCRT strptime/strftime")(test) if _buggy_ucrt else test 2596 2597class PythonSymlink: 2598 """Creates a symlink for the current Python executable""" 2599 def __init__(self, link=None): 2600 self.link = link or os.path.abspath(TESTFN) 2601 self._linked = [] 2602 self.real = os.path.realpath(sys.executable) 2603 self._also_link = [] 2604 2605 self._env = None 2606 2607 self._platform_specific() 2608 2609 def _platform_specific(self): 2610 pass 2611 2612 if sys.platform == "win32": 2613 def _platform_specific(self): 2614 import _winapi 2615 2616 if os.path.lexists(self.real) and not os.path.exists(self.real): 2617 # App symlink appears to not exist, but we want the 2618 # real executable here anyway 2619 self.real = _winapi.GetModuleFileName(0) 2620 2621 dll = _winapi.GetModuleFileName(sys.dllhandle) 2622 src_dir = os.path.dirname(dll) 2623 dest_dir = os.path.dirname(self.link) 2624 self._also_link.append(( 2625 dll, 2626 os.path.join(dest_dir, os.path.basename(dll)) 2627 )) 2628 for runtime in glob.glob(os.path.join(glob.escape(src_dir), "vcruntime*.dll")): 2629 self._also_link.append(( 2630 runtime, 2631 os.path.join(dest_dir, os.path.basename(runtime)) 2632 )) 2633 2634 self._env = {k.upper(): os.getenv(k) for k in os.environ} 2635 self._env["PYTHONHOME"] = os.path.dirname(self.real) 2636 if sysconfig.is_python_build(True): 2637 self._env["PYTHONPATH"] = os.path.dirname(os.__file__) 2638 2639 def __enter__(self): 2640 os.symlink(self.real, self.link) 2641 self._linked.append(self.link) 2642 for real, link in self._also_link: 2643 os.symlink(real, link) 2644 self._linked.append(link) 2645 return self 2646 2647 def __exit__(self, exc_type, exc_value, exc_tb): 2648 for link in self._linked: 2649 try: 2650 os.remove(link) 2651 except IOError as ex: 2652 if verbose: 2653 print("failed to clean up {}: {}".format(link, ex)) 2654 2655 def _call(self, python, args, env, returncode): 2656 cmd = [python, *args] 2657 p = subprocess.Popen(cmd, stdout=subprocess.PIPE, 2658 stderr=subprocess.PIPE, env=env) 2659 r = p.communicate() 2660 if p.returncode != returncode: 2661 if verbose: 2662 print(repr(r[0])) 2663 print(repr(r[1]), file=sys.stderr) 2664 raise RuntimeError( 2665 'unexpected return code: {0} (0x{0:08X})'.format(p.returncode)) 2666 return r 2667 2668 def call_real(self, *args, returncode=0): 2669 return self._call(self.real, args, None, returncode) 2670 2671 def call_link(self, *args, returncode=0): 2672 return self._call(self.link, args, self._env, returncode) 2673 2674 2675_can_xattr = None 2676def can_xattr(): 2677 global _can_xattr 2678 if _can_xattr is not None: 2679 return _can_xattr 2680 if not hasattr(os, "setxattr"): 2681 can = False 2682 else: 2683 tmp_dir = tempfile.mkdtemp() 2684 tmp_fp, tmp_name = tempfile.mkstemp(dir=tmp_dir) 2685 try: 2686 with open(TESTFN, "wb") as fp: 2687 try: 2688 # TESTFN & tempfile may use different file systems with 2689 # different capabilities 2690 os.setxattr(tmp_fp, b"user.test", b"") 2691 os.setxattr(tmp_name, b"trusted.foo", b"42") 2692 os.setxattr(fp.fileno(), b"user.test", b"") 2693 # Kernels < 2.6.39 don't respect setxattr flags. 2694 kernel_version = platform.release() 2695 m = re.match(r"2.6.(\d{1,2})", kernel_version) 2696 can = m is None or int(m.group(1)) >= 39 2697 except OSError: 2698 can = False 2699 finally: 2700 unlink(TESTFN) 2701 unlink(tmp_name) 2702 rmdir(tmp_dir) 2703 _can_xattr = can 2704 return can 2705 2706def skip_unless_xattr(test): 2707 """Skip decorator for tests that require functional extended attributes""" 2708 ok = can_xattr() 2709 msg = "no non-broken extended attribute support" 2710 return test if ok else unittest.skip(msg)(test) 2711 2712def skip_if_pgo_task(test): 2713 """Skip decorator for tests not run in (non-extended) PGO task""" 2714 ok = not PGO or PGO_EXTENDED 2715 msg = "Not run for (non-extended) PGO task" 2716 return test if ok else unittest.skip(msg)(test) 2717 2718_bind_nix_socket_error = None 2719def skip_unless_bind_unix_socket(test): 2720 """Decorator for tests requiring a functional bind() for unix sockets.""" 2721 if not hasattr(socket, 'AF_UNIX'): 2722 return unittest.skip('No UNIX Sockets')(test) 2723 global _bind_nix_socket_error 2724 if _bind_nix_socket_error is None: 2725 path = TESTFN + "can_bind_unix_socket" 2726 with socket.socket(socket.AF_UNIX) as sock: 2727 try: 2728 sock.bind(path) 2729 _bind_nix_socket_error = False 2730 except OSError as e: 2731 _bind_nix_socket_error = e 2732 finally: 2733 unlink(path) 2734 if _bind_nix_socket_error: 2735 msg = 'Requires a functional unix bind(): %s' % _bind_nix_socket_error 2736 return unittest.skip(msg)(test) 2737 else: 2738 return test 2739 2740 2741def fs_is_case_insensitive(directory): 2742 """Detects if the file system for the specified directory is case-insensitive.""" 2743 with tempfile.NamedTemporaryFile(dir=directory) as base: 2744 base_path = base.name 2745 case_path = base_path.upper() 2746 if case_path == base_path: 2747 case_path = base_path.lower() 2748 try: 2749 return os.path.samefile(base_path, case_path) 2750 except FileNotFoundError: 2751 return False 2752 2753 2754def detect_api_mismatch(ref_api, other_api, *, ignore=()): 2755 """Returns the set of items in ref_api not in other_api, except for a 2756 defined list of items to be ignored in this check. 2757 2758 By default this skips private attributes beginning with '_' but 2759 includes all magic methods, i.e. those starting and ending in '__'. 2760 """ 2761 missing_items = set(dir(ref_api)) - set(dir(other_api)) 2762 if ignore: 2763 missing_items -= set(ignore) 2764 missing_items = set(m for m in missing_items 2765 if not m.startswith('_') or m.endswith('__')) 2766 return missing_items 2767 2768 2769def check__all__(test_case, module, name_of_module=None, extra=(), 2770 blacklist=()): 2771 """Assert that the __all__ variable of 'module' contains all public names. 2772 2773 The module's public names (its API) are detected automatically based on 2774 whether they match the public name convention and were defined in 2775 'module'. 2776 2777 The 'name_of_module' argument can specify (as a string or tuple thereof) 2778 what module(s) an API could be defined in in order to be detected as a 2779 public API. One case for this is when 'module' imports part of its public 2780 API from other modules, possibly a C backend (like 'csv' and its '_csv'). 2781 2782 The 'extra' argument can be a set of names that wouldn't otherwise be 2783 automatically detected as "public", like objects without a proper 2784 '__module__' attribute. If provided, it will be added to the 2785 automatically detected ones. 2786 2787 The 'blacklist' argument can be a set of names that must not be treated 2788 as part of the public API even though their names indicate otherwise. 2789 2790 Usage: 2791 import bar 2792 import foo 2793 import unittest 2794 from test import support 2795 2796 class MiscTestCase(unittest.TestCase): 2797 def test__all__(self): 2798 support.check__all__(self, foo) 2799 2800 class OtherTestCase(unittest.TestCase): 2801 def test__all__(self): 2802 extra = {'BAR_CONST', 'FOO_CONST'} 2803 blacklist = {'baz'} # Undocumented name. 2804 # bar imports part of its API from _bar. 2805 support.check__all__(self, bar, ('bar', '_bar'), 2806 extra=extra, blacklist=blacklist) 2807 2808 """ 2809 2810 if name_of_module is None: 2811 name_of_module = (module.__name__, ) 2812 elif isinstance(name_of_module, str): 2813 name_of_module = (name_of_module, ) 2814 2815 expected = set(extra) 2816 2817 for name in dir(module): 2818 if name.startswith('_') or name in blacklist: 2819 continue 2820 obj = getattr(module, name) 2821 if (getattr(obj, '__module__', None) in name_of_module or 2822 (not hasattr(obj, '__module__') and 2823 not isinstance(obj, types.ModuleType))): 2824 expected.add(name) 2825 test_case.assertCountEqual(module.__all__, expected) 2826 2827 2828def suppress_msvcrt_asserts(verbose=False): 2829 try: 2830 import msvcrt 2831 except ImportError: 2832 return 2833 2834 msvcrt.SetErrorMode(msvcrt.SEM_FAILCRITICALERRORS 2835 | msvcrt.SEM_NOALIGNMENTFAULTEXCEPT 2836 | msvcrt.SEM_NOGPFAULTERRORBOX 2837 | msvcrt.SEM_NOOPENFILEERRORBOX) 2838 2839 # CrtSetReportMode() is only available in debug build 2840 if hasattr(msvcrt, 'CrtSetReportMode'): 2841 for m in [msvcrt.CRT_WARN, msvcrt.CRT_ERROR, msvcrt.CRT_ASSERT]: 2842 if verbose: 2843 msvcrt.CrtSetReportMode(m, msvcrt.CRTDBG_MODE_FILE) 2844 msvcrt.CrtSetReportFile(m, msvcrt.CRTDBG_FILE_STDERR) 2845 else: 2846 msvcrt.CrtSetReportMode(m, 0) 2847 2848 2849class SuppressCrashReport: 2850 """Try to prevent a crash report from popping up. 2851 2852 On Windows, don't display the Windows Error Reporting dialog. On UNIX, 2853 disable the creation of coredump file. 2854 """ 2855 old_value = None 2856 old_modes = None 2857 2858 def __enter__(self): 2859 """On Windows, disable Windows Error Reporting dialogs using 2860 SetErrorMode() and CrtSetReportMode(). 2861 2862 On UNIX, try to save the previous core file size limit, then set 2863 soft limit to 0. 2864 """ 2865 if sys.platform.startswith('win'): 2866 # see http://msdn.microsoft.com/en-us/library/windows/desktop/ms680621.aspx 2867 # GetErrorMode is not available on Windows XP and Windows Server 2003, 2868 # but SetErrorMode returns the previous value, so we can use that 2869 try: 2870 import msvcrt 2871 except ImportError: 2872 return 2873 2874 self.old_value = msvcrt.SetErrorMode(msvcrt.SEM_NOGPFAULTERRORBOX) 2875 2876 msvcrt.SetErrorMode(self.old_value | msvcrt.SEM_NOGPFAULTERRORBOX) 2877 2878 # bpo-23314: Suppress assert dialogs in debug builds. 2879 # CrtSetReportMode() is only available in debug build. 2880 if hasattr(msvcrt, 'CrtSetReportMode'): 2881 self.old_modes = {} 2882 for report_type in [msvcrt.CRT_WARN, 2883 msvcrt.CRT_ERROR, 2884 msvcrt.CRT_ASSERT]: 2885 old_mode = msvcrt.CrtSetReportMode(report_type, 2886 msvcrt.CRTDBG_MODE_FILE) 2887 old_file = msvcrt.CrtSetReportFile(report_type, 2888 msvcrt.CRTDBG_FILE_STDERR) 2889 self.old_modes[report_type] = old_mode, old_file 2890 2891 else: 2892 if resource is not None: 2893 try: 2894 self.old_value = resource.getrlimit(resource.RLIMIT_CORE) 2895 resource.setrlimit(resource.RLIMIT_CORE, 2896 (0, self.old_value[1])) 2897 except (ValueError, OSError): 2898 pass 2899 2900 if sys.platform == 'darwin': 2901 # Check if the 'Crash Reporter' on OSX was configured 2902 # in 'Developer' mode and warn that it will get triggered 2903 # when it is. 2904 # 2905 # This assumes that this context manager is used in tests 2906 # that might trigger the next manager. 2907 cmd = ['/usr/bin/defaults', 'read', 2908 'com.apple.CrashReporter', 'DialogType'] 2909 proc = subprocess.Popen(cmd, 2910 stdout=subprocess.PIPE, 2911 stderr=subprocess.PIPE) 2912 with proc: 2913 stdout = proc.communicate()[0] 2914 if stdout.strip() == b'developer': 2915 print("this test triggers the Crash Reporter, " 2916 "that is intentional", end='', flush=True) 2917 2918 return self 2919 2920 def __exit__(self, *ignore_exc): 2921 """Restore Windows ErrorMode or core file behavior to initial value.""" 2922 if self.old_value is None: 2923 return 2924 2925 if sys.platform.startswith('win'): 2926 import msvcrt 2927 msvcrt.SetErrorMode(self.old_value) 2928 2929 if self.old_modes: 2930 for report_type, (old_mode, old_file) in self.old_modes.items(): 2931 msvcrt.CrtSetReportMode(report_type, old_mode) 2932 msvcrt.CrtSetReportFile(report_type, old_file) 2933 else: 2934 if resource is not None: 2935 try: 2936 resource.setrlimit(resource.RLIMIT_CORE, self.old_value) 2937 except (ValueError, OSError): 2938 pass 2939 2940 2941def patch(test_instance, object_to_patch, attr_name, new_value): 2942 """Override 'object_to_patch'.'attr_name' with 'new_value'. 2943 2944 Also, add a cleanup procedure to 'test_instance' to restore 2945 'object_to_patch' value for 'attr_name'. 2946 The 'attr_name' should be a valid attribute for 'object_to_patch'. 2947 2948 """ 2949 # check that 'attr_name' is a real attribute for 'object_to_patch' 2950 # will raise AttributeError if it does not exist 2951 getattr(object_to_patch, attr_name) 2952 2953 # keep a copy of the old value 2954 attr_is_local = False 2955 try: 2956 old_value = object_to_patch.__dict__[attr_name] 2957 except (AttributeError, KeyError): 2958 old_value = getattr(object_to_patch, attr_name, None) 2959 else: 2960 attr_is_local = True 2961 2962 # restore the value when the test is done 2963 def cleanup(): 2964 if attr_is_local: 2965 setattr(object_to_patch, attr_name, old_value) 2966 else: 2967 delattr(object_to_patch, attr_name) 2968 2969 test_instance.addCleanup(cleanup) 2970 2971 # actually override the attribute 2972 setattr(object_to_patch, attr_name, new_value) 2973 2974 2975def run_in_subinterp(code): 2976 """ 2977 Run code in a subinterpreter. Raise unittest.SkipTest if the tracemalloc 2978 module is enabled. 2979 """ 2980 # Issue #10915, #15751: PyGILState_*() functions don't work with 2981 # sub-interpreters, the tracemalloc module uses these functions internally 2982 try: 2983 import tracemalloc 2984 except ImportError: 2985 pass 2986 else: 2987 if tracemalloc.is_tracing(): 2988 raise unittest.SkipTest("run_in_subinterp() cannot be used " 2989 "if tracemalloc module is tracing " 2990 "memory allocations") 2991 import _testcapi 2992 return _testcapi.run_in_subinterp(code) 2993 2994 2995def check_free_after_iterating(test, iter, cls, args=()): 2996 class A(cls): 2997 def __del__(self): 2998 nonlocal done 2999 done = True 3000 try: 3001 next(it) 3002 except StopIteration: 3003 pass 3004 3005 done = False 3006 it = iter(A(*args)) 3007 # Issue 26494: Shouldn't crash 3008 test.assertRaises(StopIteration, next, it) 3009 # The sequence should be deallocated just after the end of iterating 3010 gc_collect() 3011 test.assertTrue(done) 3012 3013 3014def missing_compiler_executable(cmd_names=[]): 3015 """Check if the compiler components used to build the interpreter exist. 3016 3017 Check for the existence of the compiler executables whose names are listed 3018 in 'cmd_names' or all the compiler executables when 'cmd_names' is empty 3019 and return the first missing executable or None when none is found 3020 missing. 3021 3022 """ 3023 from distutils import ccompiler, sysconfig, spawn 3024 compiler = ccompiler.new_compiler() 3025 sysconfig.customize_compiler(compiler) 3026 for name in compiler.executables: 3027 if cmd_names and name not in cmd_names: 3028 continue 3029 cmd = getattr(compiler, name) 3030 if cmd_names: 3031 assert cmd is not None, \ 3032 "the '%s' executable is not configured" % name 3033 elif not cmd: 3034 continue 3035 if spawn.find_executable(cmd[0]) is None: 3036 return cmd[0] 3037 3038 3039_is_android_emulator = None 3040def setswitchinterval(interval): 3041 # Setting a very low gil interval on the Android emulator causes python 3042 # to hang (issue #26939). 3043 minimum_interval = 1e-5 3044 if is_android and interval < minimum_interval: 3045 global _is_android_emulator 3046 if _is_android_emulator is None: 3047 _is_android_emulator = (subprocess.check_output( 3048 ['getprop', 'ro.kernel.qemu']).strip() == b'1') 3049 if _is_android_emulator: 3050 interval = minimum_interval 3051 return sys.setswitchinterval(interval) 3052 3053 3054@contextlib.contextmanager 3055def disable_faulthandler(): 3056 # use sys.__stderr__ instead of sys.stderr, since regrtest replaces 3057 # sys.stderr with a StringIO which has no file descriptor when a test 3058 # is run with -W/--verbose3. 3059 fd = sys.__stderr__.fileno() 3060 3061 is_enabled = faulthandler.is_enabled() 3062 try: 3063 faulthandler.disable() 3064 yield 3065 finally: 3066 if is_enabled: 3067 faulthandler.enable(file=fd, all_threads=True) 3068 3069 3070def fd_count(): 3071 """Count the number of open file descriptors. 3072 """ 3073 if sys.platform.startswith(('linux', 'freebsd')): 3074 try: 3075 names = os.listdir("/proc/self/fd") 3076 # Subtract one because listdir() internally opens a file 3077 # descriptor to list the content of the /proc/self/fd/ directory. 3078 return len(names) - 1 3079 except FileNotFoundError: 3080 pass 3081 3082 MAXFD = 256 3083 if hasattr(os, 'sysconf'): 3084 try: 3085 MAXFD = os.sysconf("SC_OPEN_MAX") 3086 except OSError: 3087 pass 3088 3089 old_modes = None 3090 if sys.platform == 'win32': 3091 # bpo-25306, bpo-31009: Call CrtSetReportMode() to not kill the process 3092 # on invalid file descriptor if Python is compiled in debug mode 3093 try: 3094 import msvcrt 3095 msvcrt.CrtSetReportMode 3096 except (AttributeError, ImportError): 3097 # no msvcrt or a release build 3098 pass 3099 else: 3100 old_modes = {} 3101 for report_type in (msvcrt.CRT_WARN, 3102 msvcrt.CRT_ERROR, 3103 msvcrt.CRT_ASSERT): 3104 old_modes[report_type] = msvcrt.CrtSetReportMode(report_type, 0) 3105 3106 try: 3107 count = 0 3108 for fd in range(MAXFD): 3109 try: 3110 # Prefer dup() over fstat(). fstat() can require input/output 3111 # whereas dup() doesn't. 3112 fd2 = os.dup(fd) 3113 except OSError as e: 3114 if e.errno != errno.EBADF: 3115 raise 3116 else: 3117 os.close(fd2) 3118 count += 1 3119 finally: 3120 if old_modes is not None: 3121 for report_type in (msvcrt.CRT_WARN, 3122 msvcrt.CRT_ERROR, 3123 msvcrt.CRT_ASSERT): 3124 msvcrt.CrtSetReportMode(report_type, old_modes[report_type]) 3125 3126 return count 3127 3128 3129class SaveSignals: 3130 """ 3131 Save and restore signal handlers. 3132 3133 This class is only able to save/restore signal handlers registered 3134 by the Python signal module: see bpo-13285 for "external" signal 3135 handlers. 3136 """ 3137 3138 def __init__(self): 3139 import signal 3140 self.signal = signal 3141 self.signals = signal.valid_signals() 3142 # SIGKILL and SIGSTOP signals cannot be ignored nor caught 3143 for signame in ('SIGKILL', 'SIGSTOP'): 3144 try: 3145 signum = getattr(signal, signame) 3146 except AttributeError: 3147 continue 3148 self.signals.remove(signum) 3149 self.handlers = {} 3150 3151 def save(self): 3152 for signum in self.signals: 3153 handler = self.signal.getsignal(signum) 3154 if handler is None: 3155 # getsignal() returns None if a signal handler was not 3156 # registered by the Python signal module, 3157 # and the handler is not SIG_DFL nor SIG_IGN. 3158 # 3159 # Ignore the signal: we cannot restore the handler. 3160 continue 3161 self.handlers[signum] = handler 3162 3163 def restore(self): 3164 for signum, handler in self.handlers.items(): 3165 self.signal.signal(signum, handler) 3166 3167 3168def with_pymalloc(): 3169 import _testcapi 3170 return _testcapi.WITH_PYMALLOC 3171 3172 3173class FakePath: 3174 """Simple implementing of the path protocol. 3175 """ 3176 def __init__(self, path): 3177 self.path = path 3178 3179 def __repr__(self): 3180 return f'<FakePath {self.path!r}>' 3181 3182 def __fspath__(self): 3183 if (isinstance(self.path, BaseException) or 3184 isinstance(self.path, type) and 3185 issubclass(self.path, BaseException)): 3186 raise self.path 3187 else: 3188 return self.path 3189 3190 3191class _ALWAYS_EQ: 3192 """ 3193 Object that is equal to anything. 3194 """ 3195 def __eq__(self, other): 3196 return True 3197 def __ne__(self, other): 3198 return False 3199 3200ALWAYS_EQ = _ALWAYS_EQ() 3201 3202@functools.total_ordering 3203class _LARGEST: 3204 """ 3205 Object that is greater than anything (except itself). 3206 """ 3207 def __eq__(self, other): 3208 return isinstance(other, _LARGEST) 3209 def __lt__(self, other): 3210 return False 3211 3212LARGEST = _LARGEST() 3213 3214@functools.total_ordering 3215class _SMALLEST: 3216 """ 3217 Object that is less than anything (except itself). 3218 """ 3219 def __eq__(self, other): 3220 return isinstance(other, _SMALLEST) 3221 def __gt__(self, other): 3222 return False 3223 3224SMALLEST = _SMALLEST() 3225 3226def maybe_get_event_loop_policy(): 3227 """Return the global event loop policy if one is set, else return None.""" 3228 return asyncio.events._event_loop_policy 3229 3230# Helpers for testing hashing. 3231NHASHBITS = sys.hash_info.width # number of bits in hash() result 3232assert NHASHBITS in (32, 64) 3233 3234# Return mean and sdev of number of collisions when tossing nballs balls 3235# uniformly at random into nbins bins. By definition, the number of 3236# collisions is the number of balls minus the number of occupied bins at 3237# the end. 3238def collision_stats(nbins, nballs): 3239 n, k = nbins, nballs 3240 # prob a bin empty after k trials = (1 - 1/n)**k 3241 # mean # empty is then n * (1 - 1/n)**k 3242 # so mean # occupied is n - n * (1 - 1/n)**k 3243 # so collisions = k - (n - n*(1 - 1/n)**k) 3244 # 3245 # For the variance: 3246 # n*(n-1)*(1-2/n)**k + meanempty - meanempty**2 = 3247 # n*(n-1)*(1-2/n)**k + meanempty * (1 - meanempty) 3248 # 3249 # Massive cancellation occurs, and, e.g., for a 64-bit hash code 3250 # 1-1/2**64 rounds uselessly to 1.0. Rather than make heroic (and 3251 # error-prone) efforts to rework the naive formulas to avoid those, 3252 # we use the `decimal` module to get plenty of extra precision. 3253 # 3254 # Note: the exact values are straightforward to compute with 3255 # rationals, but in context that's unbearably slow, requiring 3256 # multi-million bit arithmetic. 3257 import decimal 3258 with decimal.localcontext() as ctx: 3259 bits = n.bit_length() * 2 # bits in n**2 3260 # At least that many bits will likely cancel out. 3261 # Use that many decimal digits instead. 3262 ctx.prec = max(bits, 30) 3263 dn = decimal.Decimal(n) 3264 p1empty = ((dn - 1) / dn) ** k 3265 meanempty = n * p1empty 3266 occupied = n - meanempty 3267 collisions = k - occupied 3268 var = dn*(dn-1)*((dn-2)/dn)**k + meanempty * (1 - meanempty) 3269 return float(collisions), float(var.sqrt()) 3270 3271 3272class catch_unraisable_exception: 3273 """ 3274 Context manager catching unraisable exception using sys.unraisablehook. 3275 3276 Storing the exception value (cm.unraisable.exc_value) creates a reference 3277 cycle. The reference cycle is broken explicitly when the context manager 3278 exits. 3279 3280 Storing the object (cm.unraisable.object) can resurrect it if it is set to 3281 an object which is being finalized. Exiting the context manager clears the 3282 stored object. 3283 3284 Usage: 3285 3286 with support.catch_unraisable_exception() as cm: 3287 # code creating an "unraisable exception" 3288 ... 3289 3290 # check the unraisable exception: use cm.unraisable 3291 ... 3292 3293 # cm.unraisable attribute no longer exists at this point 3294 # (to break a reference cycle) 3295 """ 3296 3297 def __init__(self): 3298 self.unraisable = None 3299 self._old_hook = None 3300 3301 def _hook(self, unraisable): 3302 # Storing unraisable.object can resurrect an object which is being 3303 # finalized. Storing unraisable.exc_value creates a reference cycle. 3304 self.unraisable = unraisable 3305 3306 def __enter__(self): 3307 self._old_hook = sys.unraisablehook 3308 sys.unraisablehook = self._hook 3309 return self 3310 3311 def __exit__(self, *exc_info): 3312 sys.unraisablehook = self._old_hook 3313 del self.unraisable 3314 3315 3316class catch_threading_exception: 3317 """ 3318 Context manager catching threading.Thread exception using 3319 threading.excepthook. 3320 3321 Attributes set when an exception is catched: 3322 3323 * exc_type 3324 * exc_value 3325 * exc_traceback 3326 * thread 3327 3328 See threading.excepthook() documentation for these attributes. 3329 3330 These attributes are deleted at the context manager exit. 3331 3332 Usage: 3333 3334 with support.catch_threading_exception() as cm: 3335 # code spawning a thread which raises an exception 3336 ... 3337 3338 # check the thread exception, use cm attributes: 3339 # exc_type, exc_value, exc_traceback, thread 3340 ... 3341 3342 # exc_type, exc_value, exc_traceback, thread attributes of cm no longer 3343 # exists at this point 3344 # (to avoid reference cycles) 3345 """ 3346 3347 def __init__(self): 3348 self.exc_type = None 3349 self.exc_value = None 3350 self.exc_traceback = None 3351 self.thread = None 3352 self._old_hook = None 3353 3354 def _hook(self, args): 3355 self.exc_type = args.exc_type 3356 self.exc_value = args.exc_value 3357 self.exc_traceback = args.exc_traceback 3358 self.thread = args.thread 3359 3360 def __enter__(self): 3361 self._old_hook = threading.excepthook 3362 threading.excepthook = self._hook 3363 return self 3364 3365 def __exit__(self, *exc_info): 3366 threading.excepthook = self._old_hook 3367 del self.exc_type 3368 del self.exc_value 3369 del self.exc_traceback 3370 del self.thread 3371 3372 3373@contextlib.contextmanager 3374def save_restore_warnings_filters(): 3375 old_filters = warnings.filters[:] 3376 try: 3377 yield 3378 finally: 3379 warnings.filters[:] = old_filters 3380 3381 3382def skip_if_broken_multiprocessing_synchronize(): 3383 """ 3384 Skip tests if the multiprocessing.synchronize module is missing, if there 3385 is no available semaphore implementation, or if creating a lock raises an 3386 OSError (on Linux only). 3387 """ 3388 3389 # Skip tests if the _multiprocessing extension is missing. 3390 import_module('_multiprocessing') 3391 3392 # Skip tests if there is no available semaphore implementation: 3393 # multiprocessing.synchronize requires _multiprocessing.SemLock. 3394 synchronize = import_module('multiprocessing.synchronize') 3395 3396 if sys.platform == "linux": 3397 try: 3398 # bpo-38377: On Linux, creating a semaphore fails with OSError 3399 # if the current user does not have the permission to create 3400 # a file in /dev/shm/ directory. 3401 synchronize.Lock(ctx=None) 3402 except OSError as exc: 3403 raise unittest.SkipTest(f"broken multiprocessing SemLock: {exc!r}") 3404