1"""Supporting definitions for the Python regression tests."""
2
3if __name__ != 'test.support':
4    raise ImportError('support must be imported from the test package')
5
6import asyncio.events
7import collections.abc
8import contextlib
9import errno
10import faulthandler
11import fnmatch
12import functools
13import gc
14import glob
15import hashlib
16import importlib
17import importlib.util
18import locale
19import logging.handlers
20import nntplib
21import os
22import platform
23import re
24import shutil
25import socket
26import stat
27import struct
28import subprocess
29import sys
30import sysconfig
31import tempfile
32import _thread
33import threading
34import time
35import types
36import unittest
37import urllib.error
38import warnings
39
40from .testresult import get_test_runner
41
42try:
43    import multiprocessing.process
44except ImportError:
45    multiprocessing = None
46
47try:
48    import zlib
49except ImportError:
50    zlib = None
51
52try:
53    import gzip
54except ImportError:
55    gzip = None
56
57try:
58    import bz2
59except ImportError:
60    bz2 = None
61
62try:
63    import lzma
64except ImportError:
65    lzma = None
66
67try:
68    import resource
69except ImportError:
70    resource = None
71
72try:
73    import _hashlib
74except ImportError:
75    _hashlib = None
76
77__all__ = [
78    # globals
79    "PIPE_MAX_SIZE", "verbose", "max_memuse", "use_resources", "failfast",
80    # exceptions
81    "Error", "TestFailed", "TestDidNotRun", "ResourceDenied",
82    # imports
83    "import_module", "import_fresh_module", "CleanImport",
84    # modules
85    "unload", "forget",
86    # io
87    "record_original_stdout", "get_original_stdout", "captured_stdout",
88    "captured_stdin", "captured_stderr",
89    # filesystem
90    "TESTFN", "SAVEDCWD", "unlink", "rmtree", "temp_cwd", "findfile",
91    "create_empty_file", "can_symlink", "fs_is_case_insensitive",
92    # unittest
93    "is_resource_enabled", "requires", "requires_freebsd_version",
94    "requires_linux_version", "requires_mac_ver", "requires_hashdigest",
95    "check_syntax_error", "check_syntax_warning",
96    "TransientResource", "time_out", "socket_peer_reset", "ioerror_peer_reset",
97    "transient_internet", "BasicTestRunner", "run_unittest", "run_doctest",
98    "skip_unless_symlink", "requires_gzip", "requires_bz2", "requires_lzma",
99    "bigmemtest", "bigaddrspacetest", "cpython_only", "get_attribute",
100    "requires_IEEE_754", "skip_unless_xattr", "requires_zlib",
101    "anticipate_failure", "load_package_tests", "detect_api_mismatch",
102    "check__all__", "skip_unless_bind_unix_socket", "skip_if_buggy_ucrt_strfptime",
103    "ignore_warnings",
104    # sys
105    "is_jython", "is_android", "check_impl_detail", "unix_shell",
106    "setswitchinterval",
107    # network
108    "HOST", "IPV6_ENABLED", "find_unused_port", "bind_port", "open_urlresource",
109    "bind_unix_socket",
110    # processes
111    'temp_umask', "reap_children",
112    # logging
113    "TestHandler",
114    # threads
115    "threading_setup", "threading_cleanup", "reap_threads", "start_threads",
116    # miscellaneous
117    "check_warnings", "check_no_resource_warning", "check_no_warnings",
118    "EnvironmentVarGuard",
119    "run_with_locale", "swap_item",
120    "swap_attr", "Matcher", "set_memlimit", "SuppressCrashReport", "sortdict",
121    "run_with_tz", "PGO", "missing_compiler_executable", "fd_count",
122    "ALWAYS_EQ", "LARGEST", "SMALLEST"
123    ]
124
125class Error(Exception):
126    """Base class for regression test exceptions."""
127
128class TestFailed(Error):
129    """Test failed."""
130
131class TestDidNotRun(Error):
132    """Test did not run any subtests."""
133
134class ResourceDenied(unittest.SkipTest):
135    """Test skipped because it requested a disallowed resource.
136
137    This is raised when a test calls requires() for a resource that
138    has not be enabled.  It is used to distinguish between expected
139    and unexpected skips.
140    """
141
142@contextlib.contextmanager
143def _ignore_deprecated_imports(ignore=True):
144    """Context manager to suppress package and module deprecation
145    warnings when importing them.
146
147    If ignore is False, this context manager has no effect.
148    """
149    if ignore:
150        with warnings.catch_warnings():
151            warnings.filterwarnings("ignore", ".+ (module|package)",
152                                    DeprecationWarning)
153            yield
154    else:
155        yield
156
157
158def ignore_warnings(*, category):
159    """Decorator to suppress deprecation warnings.
160
161    Use of context managers to hide warnings make diffs
162    more noisy and tools like 'git blame' less useful.
163    """
164    def decorator(test):
165        @functools.wraps(test)
166        def wrapper(self, *args, **kwargs):
167            with warnings.catch_warnings():
168                warnings.simplefilter('ignore', category=category)
169                return test(self, *args, **kwargs)
170        return wrapper
171    return decorator
172
173
174def import_module(name, deprecated=False, *, required_on=()):
175    """Import and return the module to be tested, raising SkipTest if
176    it is not available.
177
178    If deprecated is True, any module or package deprecation messages
179    will be suppressed. If a module is required on a platform but optional for
180    others, set required_on to an iterable of platform prefixes which will be
181    compared against sys.platform.
182    """
183    with _ignore_deprecated_imports(deprecated):
184        try:
185            return importlib.import_module(name)
186        except ImportError as msg:
187            if sys.platform.startswith(tuple(required_on)):
188                raise
189            raise unittest.SkipTest(str(msg))
190
191
192def _save_and_remove_module(name, orig_modules):
193    """Helper function to save and remove a module from sys.modules
194
195    Raise ImportError if the module can't be imported.
196    """
197    # try to import the module and raise an error if it can't be imported
198    if name not in sys.modules:
199        __import__(name)
200        del sys.modules[name]
201    for modname in list(sys.modules):
202        if modname == name or modname.startswith(name + '.'):
203            orig_modules[modname] = sys.modules[modname]
204            del sys.modules[modname]
205
206def _save_and_block_module(name, orig_modules):
207    """Helper function to save and block a module in sys.modules
208
209    Return True if the module was in sys.modules, False otherwise.
210    """
211    saved = True
212    try:
213        orig_modules[name] = sys.modules[name]
214    except KeyError:
215        saved = False
216    sys.modules[name] = None
217    return saved
218
219
220def anticipate_failure(condition):
221    """Decorator to mark a test that is known to be broken in some cases
222
223       Any use of this decorator should have a comment identifying the
224       associated tracker issue.
225    """
226    if condition:
227        return unittest.expectedFailure
228    return lambda f: f
229
230def load_package_tests(pkg_dir, loader, standard_tests, pattern):
231    """Generic load_tests implementation for simple test packages.
232
233    Most packages can implement load_tests using this function as follows:
234
235       def load_tests(*args):
236           return load_package_tests(os.path.dirname(__file__), *args)
237    """
238    if pattern is None:
239        pattern = "test*"
240    top_dir = os.path.dirname(              # Lib
241                  os.path.dirname(              # test
242                      os.path.dirname(__file__)))   # support
243    package_tests = loader.discover(start_dir=pkg_dir,
244                                    top_level_dir=top_dir,
245                                    pattern=pattern)
246    standard_tests.addTests(package_tests)
247    return standard_tests
248
249
250def import_fresh_module(name, fresh=(), blocked=(), deprecated=False):
251    """Import and return a module, deliberately bypassing sys.modules.
252
253    This function imports and returns a fresh copy of the named Python module
254    by removing the named module from sys.modules before doing the import.
255    Note that unlike reload, the original module is not affected by
256    this operation.
257
258    *fresh* is an iterable of additional module names that are also removed
259    from the sys.modules cache before doing the import.
260
261    *blocked* is an iterable of module names that are replaced with None
262    in the module cache during the import to ensure that attempts to import
263    them raise ImportError.
264
265    The named module and any modules named in the *fresh* and *blocked*
266    parameters are saved before starting the import and then reinserted into
267    sys.modules when the fresh import is complete.
268
269    Module and package deprecation messages are suppressed during this import
270    if *deprecated* is True.
271
272    This function will raise ImportError if the named module cannot be
273    imported.
274    """
275    # NOTE: test_heapq, test_json and test_warnings include extra sanity checks
276    # to make sure that this utility function is working as expected
277    with _ignore_deprecated_imports(deprecated):
278        # Keep track of modules saved for later restoration as well
279        # as those which just need a blocking entry removed
280        orig_modules = {}
281        names_to_remove = []
282        _save_and_remove_module(name, orig_modules)
283        try:
284            for fresh_name in fresh:
285                _save_and_remove_module(fresh_name, orig_modules)
286            for blocked_name in blocked:
287                if not _save_and_block_module(blocked_name, orig_modules):
288                    names_to_remove.append(blocked_name)
289            fresh_module = importlib.import_module(name)
290        except ImportError:
291            fresh_module = None
292        finally:
293            for orig_name, module in orig_modules.items():
294                sys.modules[orig_name] = module
295            for name_to_remove in names_to_remove:
296                del sys.modules[name_to_remove]
297        return fresh_module
298
299
300def get_attribute(obj, name):
301    """Get an attribute, raising SkipTest if AttributeError is raised."""
302    try:
303        attribute = getattr(obj, name)
304    except AttributeError:
305        raise unittest.SkipTest("object %r has no attribute %r" % (obj, name))
306    else:
307        return attribute
308
309verbose = 1              # Flag set to 0 by regrtest.py
310use_resources = None     # Flag set to [] by regrtest.py
311max_memuse = 0           # Disable bigmem tests (they will still be run with
312                         # small sizes, to make sure they work.)
313real_max_memuse = 0
314junit_xml_list = None    # list of testsuite XML elements
315failfast = False
316
317# _original_stdout is meant to hold stdout at the time regrtest began.
318# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
319# The point is to have some flavor of stdout the user can actually see.
320_original_stdout = None
321def record_original_stdout(stdout):
322    global _original_stdout
323    _original_stdout = stdout
324
325def get_original_stdout():
326    return _original_stdout or sys.stdout
327
328def unload(name):
329    try:
330        del sys.modules[name]
331    except KeyError:
332        pass
333
334def _force_run(path, func, *args):
335    try:
336        return func(*args)
337    except OSError as err:
338        if verbose >= 2:
339            print('%s: %s' % (err.__class__.__name__, err))
340            print('re-run %s%r' % (func.__name__, args))
341        os.chmod(path, stat.S_IRWXU)
342        return func(*args)
343
344if sys.platform.startswith("win"):
345    def _waitfor(func, pathname, waitall=False):
346        # Perform the operation
347        func(pathname)
348        # Now setup the wait loop
349        if waitall:
350            dirname = pathname
351        else:
352            dirname, name = os.path.split(pathname)
353            dirname = dirname or '.'
354        # Check for `pathname` to be removed from the filesystem.
355        # The exponential backoff of the timeout amounts to a total
356        # of ~1 second after which the deletion is probably an error
357        # anyway.
358        # Testing on an i7@4.3GHz shows that usually only 1 iteration is
359        # required when contention occurs.
360        timeout = 0.001
361        while timeout < 1.0:
362            # Note we are only testing for the existence of the file(s) in
363            # the contents of the directory regardless of any security or
364            # access rights.  If we have made it this far, we have sufficient
365            # permissions to do that much using Python's equivalent of the
366            # Windows API FindFirstFile.
367            # Other Windows APIs can fail or give incorrect results when
368            # dealing with files that are pending deletion.
369            L = os.listdir(dirname)
370            if not (L if waitall else name in L):
371                return
372            # Increase the timeout and try again
373            time.sleep(timeout)
374            timeout *= 2
375        warnings.warn('tests may fail, delete still pending for ' + pathname,
376                      RuntimeWarning, stacklevel=4)
377
378    def _unlink(filename):
379        _waitfor(os.unlink, filename)
380
381    def _rmdir(dirname):
382        _waitfor(os.rmdir, dirname)
383
384    def _rmtree(path):
385        def _rmtree_inner(path):
386            for name in _force_run(path, os.listdir, path):
387                fullname = os.path.join(path, name)
388                try:
389                    mode = os.lstat(fullname).st_mode
390                except OSError as exc:
391                    print("support.rmtree(): os.lstat(%r) failed with %s" % (fullname, exc),
392                          file=sys.__stderr__)
393                    mode = 0
394                if stat.S_ISDIR(mode):
395                    _waitfor(_rmtree_inner, fullname, waitall=True)
396                    _force_run(fullname, os.rmdir, fullname)
397                else:
398                    _force_run(fullname, os.unlink, fullname)
399        _waitfor(_rmtree_inner, path, waitall=True)
400        _waitfor(lambda p: _force_run(p, os.rmdir, p), path)
401
402    def _longpath(path):
403        try:
404            import ctypes
405        except ImportError:
406            # No ctypes means we can't expands paths.
407            pass
408        else:
409            buffer = ctypes.create_unicode_buffer(len(path) * 2)
410            length = ctypes.windll.kernel32.GetLongPathNameW(path, buffer,
411                                                             len(buffer))
412            if length:
413                return buffer[:length]
414        return path
415else:
416    _unlink = os.unlink
417    _rmdir = os.rmdir
418
419    def _rmtree(path):
420        try:
421            shutil.rmtree(path)
422            return
423        except OSError:
424            pass
425
426        def _rmtree_inner(path):
427            for name in _force_run(path, os.listdir, path):
428                fullname = os.path.join(path, name)
429                try:
430                    mode = os.lstat(fullname).st_mode
431                except OSError:
432                    mode = 0
433                if stat.S_ISDIR(mode):
434                    _rmtree_inner(fullname)
435                    _force_run(path, os.rmdir, fullname)
436                else:
437                    _force_run(path, os.unlink, fullname)
438        _rmtree_inner(path)
439        os.rmdir(path)
440
441    def _longpath(path):
442        return path
443
444def unlink(filename):
445    try:
446        _unlink(filename)
447    except (FileNotFoundError, NotADirectoryError):
448        pass
449
450def rmdir(dirname):
451    try:
452        _rmdir(dirname)
453    except FileNotFoundError:
454        pass
455
456def rmtree(path):
457    try:
458        _rmtree(path)
459    except FileNotFoundError:
460        pass
461
462def make_legacy_pyc(source):
463    """Move a PEP 3147/488 pyc file to its legacy pyc location.
464
465    :param source: The file system path to the source file.  The source file
466        does not need to exist, however the PEP 3147/488 pyc file must exist.
467    :return: The file system path to the legacy pyc file.
468    """
469    pyc_file = importlib.util.cache_from_source(source)
470    up_one = os.path.dirname(os.path.abspath(source))
471    legacy_pyc = os.path.join(up_one, source + 'c')
472    os.rename(pyc_file, legacy_pyc)
473    return legacy_pyc
474
475def forget(modname):
476    """'Forget' a module was ever imported.
477
478    This removes the module from sys.modules and deletes any PEP 3147/488 or
479    legacy .pyc files.
480    """
481    unload(modname)
482    for dirname in sys.path:
483        source = os.path.join(dirname, modname + '.py')
484        # It doesn't matter if they exist or not, unlink all possible
485        # combinations of PEP 3147/488 and legacy pyc files.
486        unlink(source + 'c')
487        for opt in ('', 1, 2):
488            unlink(importlib.util.cache_from_source(source, optimization=opt))
489
490# Check whether a gui is actually available
491def _is_gui_available():
492    if hasattr(_is_gui_available, 'result'):
493        return _is_gui_available.result
494    reason = None
495    if sys.platform.startswith('win'):
496        # if Python is running as a service (such as the buildbot service),
497        # gui interaction may be disallowed
498        import ctypes
499        import ctypes.wintypes
500        UOI_FLAGS = 1
501        WSF_VISIBLE = 0x0001
502        class USEROBJECTFLAGS(ctypes.Structure):
503            _fields_ = [("fInherit", ctypes.wintypes.BOOL),
504                        ("fReserved", ctypes.wintypes.BOOL),
505                        ("dwFlags", ctypes.wintypes.DWORD)]
506        dll = ctypes.windll.user32
507        h = dll.GetProcessWindowStation()
508        if not h:
509            raise ctypes.WinError()
510        uof = USEROBJECTFLAGS()
511        needed = ctypes.wintypes.DWORD()
512        res = dll.GetUserObjectInformationW(h,
513            UOI_FLAGS,
514            ctypes.byref(uof),
515            ctypes.sizeof(uof),
516            ctypes.byref(needed))
517        if not res:
518            raise ctypes.WinError()
519        if not bool(uof.dwFlags & WSF_VISIBLE):
520            reason = "gui not available (WSF_VISIBLE flag not set)"
521    elif sys.platform == 'darwin':
522        # The Aqua Tk implementations on OS X can abort the process if
523        # being called in an environment where a window server connection
524        # cannot be made, for instance when invoked by a buildbot or ssh
525        # process not running under the same user id as the current console
526        # user.  To avoid that, raise an exception if the window manager
527        # connection is not available.
528        from ctypes import cdll, c_int, pointer, Structure
529        from ctypes.util import find_library
530
531        app_services = cdll.LoadLibrary(find_library("ApplicationServices"))
532
533        if app_services.CGMainDisplayID() == 0:
534            reason = "gui tests cannot run without OS X window manager"
535        else:
536            class ProcessSerialNumber(Structure):
537                _fields_ = [("highLongOfPSN", c_int),
538                            ("lowLongOfPSN", c_int)]
539            psn = ProcessSerialNumber()
540            psn_p = pointer(psn)
541            if (  (app_services.GetCurrentProcess(psn_p) < 0) or
542                  (app_services.SetFrontProcess(psn_p) < 0) ):
543                reason = "cannot run without OS X gui process"
544
545    # check on every platform whether tkinter can actually do anything
546    if not reason:
547        try:
548            from tkinter import Tk
549            root = Tk()
550            root.withdraw()
551            root.update()
552            root.destroy()
553        except Exception as e:
554            err_string = str(e)
555            if len(err_string) > 50:
556                err_string = err_string[:50] + ' [...]'
557            reason = 'Tk unavailable due to {}: {}'.format(type(e).__name__,
558                                                           err_string)
559
560    _is_gui_available.reason = reason
561    _is_gui_available.result = not reason
562
563    return _is_gui_available.result
564
565def is_resource_enabled(resource):
566    """Test whether a resource is enabled.
567
568    Known resources are set by regrtest.py.  If not running under regrtest.py,
569    all resources are assumed enabled unless use_resources has been set.
570    """
571    return use_resources is None or resource in use_resources
572
573def requires(resource, msg=None):
574    """Raise ResourceDenied if the specified resource is not available."""
575    if not is_resource_enabled(resource):
576        if msg is None:
577            msg = "Use of the %r resource not enabled" % resource
578        raise ResourceDenied(msg)
579    if resource == 'gui' and not _is_gui_available():
580        raise ResourceDenied(_is_gui_available.reason)
581
582def _requires_unix_version(sysname, min_version):
583    """Decorator raising SkipTest if the OS is `sysname` and the version is less
584    than `min_version`.
585
586    For example, @_requires_unix_version('FreeBSD', (7, 2)) raises SkipTest if
587    the FreeBSD version is less than 7.2.
588    """
589    import platform
590    min_version_txt = '.'.join(map(str, min_version))
591    version_txt = platform.release().split('-', 1)[0]
592    if platform.system() == sysname:
593        try:
594            version = tuple(map(int, version_txt.split('.')))
595        except ValueError:
596            skip = False
597        else:
598            skip = version < min_version
599    else:
600        skip = False
601
602    return unittest.skipIf(
603        skip,
604        f"{sysname} version {min_version_txt} or higher required, not "
605        f"{version_txt}"
606    )
607
608
609def requires_freebsd_version(*min_version):
610    """Decorator raising SkipTest if the OS is FreeBSD and the FreeBSD version is
611    less than `min_version`.
612
613    For example, @requires_freebsd_version(7, 2) raises SkipTest if the FreeBSD
614    version is less than 7.2.
615    """
616    return _requires_unix_version('FreeBSD', min_version)
617
618def requires_linux_version(*min_version):
619    """Decorator raising SkipTest if the OS is Linux and the Linux version is
620    less than `min_version`.
621
622    For example, @requires_linux_version(2, 6, 32) raises SkipTest if the Linux
623    version is less than 2.6.32.
624    """
625    return _requires_unix_version('Linux', min_version)
626
627def requires_mac_ver(*min_version):
628    """Decorator raising SkipTest if the OS is Mac OS X and the OS X
629    version if less than min_version.
630
631    For example, @requires_mac_ver(10, 5) raises SkipTest if the OS X version
632    is lesser than 10.5.
633    """
634    def decorator(func):
635        @functools.wraps(func)
636        def wrapper(*args, **kw):
637            if sys.platform == 'darwin':
638                version_txt = platform.mac_ver()[0]
639                try:
640                    version = tuple(map(int, version_txt.split('.')))
641                except ValueError:
642                    pass
643                else:
644                    if version < min_version:
645                        min_version_txt = '.'.join(map(str, min_version))
646                        raise unittest.SkipTest(
647                            "Mac OS X %s or higher required, not %s"
648                            % (min_version_txt, version_txt))
649            return func(*args, **kw)
650        wrapper.min_version = min_version
651        return wrapper
652    return decorator
653
654
655def requires_hashdigest(digestname, openssl=None):
656    """Decorator raising SkipTest if a hashing algorithm is not available
657
658    The hashing algorithm could be missing or blocked by a strict crypto
659    policy.
660
661    If 'openssl' is True, then the decorator checks that OpenSSL provides
662    the algorithm. Otherwise the check falls back to built-in
663    implementations.
664
665    ValueError: [digital envelope routines: EVP_DigestInit_ex] disabled for FIPS
666    ValueError: unsupported hash type md4
667    """
668    def decorator(func):
669        @functools.wraps(func)
670        def wrapper(*args, **kwargs):
671            try:
672                if openssl and _hashlib is not None:
673                    _hashlib.new(digestname)
674                else:
675                    hashlib.new(digestname)
676            except ValueError:
677                raise unittest.SkipTest(
678                    f"hash digest '{digestname}' is not available."
679                )
680            return func(*args, **kwargs)
681        return wrapper
682    return decorator
683
684
685HOST = "localhost"
686HOSTv4 = "127.0.0.1"
687HOSTv6 = "::1"
688
689
690def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
691    """Returns an unused port that should be suitable for binding.  This is
692    achieved by creating a temporary socket with the same family and type as
693    the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
694    the specified host address (defaults to 0.0.0.0) with the port set to 0,
695    eliciting an unused ephemeral port from the OS.  The temporary socket is
696    then closed and deleted, and the ephemeral port is returned.
697
698    Either this method or bind_port() should be used for any tests where a
699    server socket needs to be bound to a particular port for the duration of
700    the test.  Which one to use depends on whether the calling code is creating
701    a python socket, or if an unused port needs to be provided in a constructor
702    or passed to an external program (i.e. the -accept argument to openssl's
703    s_server mode).  Always prefer bind_port() over find_unused_port() where
704    possible.  Hard coded ports should *NEVER* be used.  As soon as a server
705    socket is bound to a hard coded port, the ability to run multiple instances
706    of the test simultaneously on the same host is compromised, which makes the
707    test a ticking time bomb in a buildbot environment. On Unix buildbots, this
708    may simply manifest as a failed test, which can be recovered from without
709    intervention in most cases, but on Windows, the entire python process can
710    completely and utterly wedge, requiring someone to log in to the buildbot
711    and manually kill the affected process.
712
713    (This is easy to reproduce on Windows, unfortunately, and can be traced to
714    the SO_REUSEADDR socket option having different semantics on Windows versus
715    Unix/Linux.  On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
716    listen and then accept connections on identical host/ports.  An EADDRINUSE
717    OSError will be raised at some point (depending on the platform and
718    the order bind and listen were called on each socket).
719
720    However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
721    will ever be raised when attempting to bind two identical host/ports. When
722    accept() is called on each socket, the second caller's process will steal
723    the port from the first caller, leaving them both in an awkwardly wedged
724    state where they'll no longer respond to any signals or graceful kills, and
725    must be forcibly killed via OpenProcess()/TerminateProcess().
726
727    The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
728    instead of SO_REUSEADDR, which effectively affords the same semantics as
729    SO_REUSEADDR on Unix.  Given the propensity of Unix developers in the Open
730    Source world compared to Windows ones, this is a common mistake.  A quick
731    look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
732    openssl.exe is called with the 's_server' option, for example. See
733    http://bugs.python.org/issue2550 for more info.  The following site also
734    has a very thorough description about the implications of both REUSEADDR
735    and EXCLUSIVEADDRUSE on Windows:
736    http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx)
737
738    XXX: although this approach is a vast improvement on previous attempts to
739    elicit unused ports, it rests heavily on the assumption that the ephemeral
740    port returned to us by the OS won't immediately be dished back out to some
741    other process when we close and delete our temporary socket but before our
742    calling code has a chance to bind the returned port.  We can deal with this
743    issue if/when we come across it.
744    """
745
746    with socket.socket(family, socktype) as tempsock:
747        port = bind_port(tempsock)
748    del tempsock
749    return port
750
751def bind_port(sock, host=HOST):
752    """Bind the socket to a free port and return the port number.  Relies on
753    ephemeral ports in order to ensure we are using an unbound port.  This is
754    important as many tests may be running simultaneously, especially in a
755    buildbot environment.  This method raises an exception if the sock.family
756    is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
757    or SO_REUSEPORT set on it.  Tests should *never* set these socket options
758    for TCP/IP sockets.  The only case for setting these options is testing
759    multicasting via multiple UDP sockets.
760
761    Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
762    on Windows), it will be set on the socket.  This will prevent anyone else
763    from bind()'ing to our host/port for the duration of the test.
764    """
765
766    if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
767        if hasattr(socket, 'SO_REUSEADDR'):
768            if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
769                raise TestFailed("tests should never set the SO_REUSEADDR "   \
770                                 "socket option on TCP/IP sockets!")
771        if hasattr(socket, 'SO_REUSEPORT'):
772            try:
773                if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
774                    raise TestFailed("tests should never set the SO_REUSEPORT "   \
775                                     "socket option on TCP/IP sockets!")
776            except OSError:
777                # Python's socket module was compiled using modern headers
778                # thus defining SO_REUSEPORT but this process is running
779                # under an older kernel that does not support SO_REUSEPORT.
780                pass
781        if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
782            sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
783
784    sock.bind((host, 0))
785    port = sock.getsockname()[1]
786    return port
787
788def bind_unix_socket(sock, addr):
789    """Bind a unix socket, raising SkipTest if PermissionError is raised."""
790    assert sock.family == socket.AF_UNIX
791    try:
792        sock.bind(addr)
793    except PermissionError:
794        sock.close()
795        raise unittest.SkipTest('cannot bind AF_UNIX sockets')
796
797def _is_ipv6_enabled():
798    """Check whether IPv6 is enabled on this host."""
799    if socket.has_ipv6:
800        sock = None
801        try:
802            sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
803            sock.bind((HOSTv6, 0))
804            return True
805        except OSError:
806            pass
807        finally:
808            if sock:
809                sock.close()
810    return False
811
812IPV6_ENABLED = _is_ipv6_enabled()
813
814def system_must_validate_cert(f):
815    """Skip the test on TLS certificate validation failures."""
816    @functools.wraps(f)
817    def dec(*args, **kwargs):
818        try:
819            f(*args, **kwargs)
820        except OSError as e:
821            if "CERTIFICATE_VERIFY_FAILED" in str(e):
822                raise unittest.SkipTest("system does not contain "
823                                        "necessary certificates")
824            raise
825    return dec
826
827# A constant likely larger than the underlying OS pipe buffer size, to
828# make writes blocking.
829# Windows limit seems to be around 512 B, and many Unix kernels have a
830# 64 KiB pipe buffer size or 16 * PAGE_SIZE: take a few megs to be sure.
831# (see issue #17835 for a discussion of this number).
832PIPE_MAX_SIZE = 4 * 1024 * 1024 + 1
833
834# A constant likely larger than the underlying OS socket buffer size, to make
835# writes blocking.
836# The socket buffer sizes can usually be tuned system-wide (e.g. through sysctl
837# on Linux), or on a per-socket basis (SO_SNDBUF/SO_RCVBUF). See issue #18643
838# for a discussion of this number).
839SOCK_MAX_SIZE = 16 * 1024 * 1024 + 1
840
841# decorator for skipping tests on non-IEEE 754 platforms
842requires_IEEE_754 = unittest.skipUnless(
843    float.__getformat__("double").startswith("IEEE"),
844    "test requires IEEE 754 doubles")
845
846requires_zlib = unittest.skipUnless(zlib, 'requires zlib')
847
848requires_gzip = unittest.skipUnless(gzip, 'requires gzip')
849
850requires_bz2 = unittest.skipUnless(bz2, 'requires bz2')
851
852requires_lzma = unittest.skipUnless(lzma, 'requires lzma')
853
854is_jython = sys.platform.startswith('java')
855
856is_android = hasattr(sys, 'getandroidapilevel')
857
858if sys.platform != 'win32':
859    unix_shell = '/system/bin/sh' if is_android else '/bin/sh'
860else:
861    unix_shell = None
862
863# Filename used for testing
864if os.name == 'java':
865    # Jython disallows @ in module names
866    TESTFN = '$test'
867else:
868    TESTFN = '@test'
869
870# Disambiguate TESTFN for parallel testing, while letting it remain a valid
871# module name.
872TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid())
873
874# Define the URL of a dedicated HTTP server for the network tests.
875# The URL must use clear-text HTTP: no redirection to encrypted HTTPS.
876TEST_HTTP_URL = "http://www.pythontest.net"
877
878# FS_NONASCII: non-ASCII character encodable by os.fsencode(),
879# or None if there is no such character.
880FS_NONASCII = None
881for character in (
882    # First try printable and common characters to have a readable filename.
883    # For each character, the encoding list are just example of encodings able
884    # to encode the character (the list is not exhaustive).
885
886    # U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1
887    '\u00E6',
888    # U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3
889    '\u0130',
890    # U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257
891    '\u0141',
892    # U+03C6 (Greek Small Letter Phi): cp1253
893    '\u03C6',
894    # U+041A (Cyrillic Capital Letter Ka): cp1251
895    '\u041A',
896    # U+05D0 (Hebrew Letter Alef): Encodable to cp424
897    '\u05D0',
898    # U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic
899    '\u060C',
900    # U+062A (Arabic Letter Teh): cp720
901    '\u062A',
902    # U+0E01 (Thai Character Ko Kai): cp874
903    '\u0E01',
904
905    # Then try more "special" characters. "special" because they may be
906    # interpreted or displayed differently depending on the exact locale
907    # encoding and the font.
908
909    # U+00A0 (No-Break Space)
910    '\u00A0',
911    # U+20AC (Euro Sign)
912    '\u20AC',
913):
914    try:
915        # If Python is set up to use the legacy 'mbcs' in Windows,
916        # 'replace' error mode is used, and encode() returns b'?'
917        # for characters missing in the ANSI codepage
918        if os.fsdecode(os.fsencode(character)) != character:
919            raise UnicodeError
920    except UnicodeError:
921        pass
922    else:
923        FS_NONASCII = character
924        break
925
926# TESTFN_UNICODE is a non-ascii filename
927TESTFN_UNICODE = TESTFN + "-\xe0\xf2\u0258\u0141\u011f"
928if sys.platform == 'darwin':
929    # In Mac OS X's VFS API file names are, by definition, canonically
930    # decomposed Unicode, encoded using UTF-8. See QA1173:
931    # http://developer.apple.com/mac/library/qa/qa2001/qa1173.html
932    import unicodedata
933    TESTFN_UNICODE = unicodedata.normalize('NFD', TESTFN_UNICODE)
934TESTFN_ENCODING = sys.getfilesystemencoding()
935
936# TESTFN_UNENCODABLE is a filename (str type) that should *not* be able to be
937# encoded by the filesystem encoding (in strict mode). It can be None if we
938# cannot generate such filename.
939TESTFN_UNENCODABLE = None
940if os.name == 'nt':
941    # skip win32s (0) or Windows 9x/ME (1)
942    if sys.getwindowsversion().platform >= 2:
943        # Different kinds of characters from various languages to minimize the
944        # probability that the whole name is encodable to MBCS (issue #9819)
945        TESTFN_UNENCODABLE = TESTFN + "-\u5171\u0141\u2661\u0363\uDC80"
946        try:
947            TESTFN_UNENCODABLE.encode(TESTFN_ENCODING)
948        except UnicodeEncodeError:
949            pass
950        else:
951            print('WARNING: The filename %r CAN be encoded by the filesystem encoding (%s). '
952                  'Unicode filename tests may not be effective'
953                  % (TESTFN_UNENCODABLE, TESTFN_ENCODING))
954            TESTFN_UNENCODABLE = None
955# Mac OS X denies unencodable filenames (invalid utf-8)
956elif sys.platform != 'darwin':
957    try:
958        # ascii and utf-8 cannot encode the byte 0xff
959        b'\xff'.decode(TESTFN_ENCODING)
960    except UnicodeDecodeError:
961        # 0xff will be encoded using the surrogate character u+DCFF
962        TESTFN_UNENCODABLE = TESTFN \
963            + b'-\xff'.decode(TESTFN_ENCODING, 'surrogateescape')
964    else:
965        # File system encoding (eg. ISO-8859-* encodings) can encode
966        # the byte 0xff. Skip some unicode filename tests.
967        pass
968
969# TESTFN_UNDECODABLE is a filename (bytes type) that should *not* be able to be
970# decoded from the filesystem encoding (in strict mode). It can be None if we
971# cannot generate such filename (ex: the latin1 encoding can decode any byte
972# sequence). On UNIX, TESTFN_UNDECODABLE can be decoded by os.fsdecode() thanks
973# to the surrogateescape error handler (PEP 383), but not from the filesystem
974# encoding in strict mode.
975TESTFN_UNDECODABLE = None
976for name in (
977    # b'\xff' is not decodable by os.fsdecode() with code page 932. Windows
978    # accepts it to create a file or a directory, or don't accept to enter to
979    # such directory (when the bytes name is used). So test b'\xe7' first: it is
980    # not decodable from cp932.
981    b'\xe7w\xf0',
982    # undecodable from ASCII, UTF-8
983    b'\xff',
984    # undecodable from iso8859-3, iso8859-6, iso8859-7, cp424, iso8859-8, cp856
985    # and cp857
986    b'\xae\xd5'
987    # undecodable from UTF-8 (UNIX and Mac OS X)
988    b'\xed\xb2\x80', b'\xed\xb4\x80',
989    # undecodable from shift_jis, cp869, cp874, cp932, cp1250, cp1251, cp1252,
990    # cp1253, cp1254, cp1255, cp1257, cp1258
991    b'\x81\x98',
992):
993    try:
994        name.decode(TESTFN_ENCODING)
995    except UnicodeDecodeError:
996        TESTFN_UNDECODABLE = os.fsencode(TESTFN) + name
997        break
998
999if FS_NONASCII:
1000    TESTFN_NONASCII = TESTFN + '-' + FS_NONASCII
1001else:
1002    TESTFN_NONASCII = None
1003
1004# Save the initial cwd
1005SAVEDCWD = os.getcwd()
1006
1007# Set by libregrtest/main.py so we can skip tests that are not
1008# useful for PGO
1009PGO = False
1010
1011# Set by libregrtest/main.py if we are running the extended (time consuming)
1012# PGO task.  If this is True, PGO is also True.
1013PGO_EXTENDED = False
1014
1015@contextlib.contextmanager
1016def temp_dir(path=None, quiet=False):
1017    """Return a context manager that creates a temporary directory.
1018
1019    Arguments:
1020
1021      path: the directory to create temporarily.  If omitted or None,
1022        defaults to creating a temporary directory using tempfile.mkdtemp.
1023
1024      quiet: if False (the default), the context manager raises an exception
1025        on error.  Otherwise, if the path is specified and cannot be
1026        created, only a warning is issued.
1027
1028    """
1029    dir_created = False
1030    if path is None:
1031        path = tempfile.mkdtemp()
1032        dir_created = True
1033        path = os.path.realpath(path)
1034    else:
1035        try:
1036            os.mkdir(path)
1037            dir_created = True
1038        except OSError as exc:
1039            if not quiet:
1040                raise
1041            warnings.warn(f'tests may fail, unable to create '
1042                          f'temporary directory {path!r}: {exc}',
1043                          RuntimeWarning, stacklevel=3)
1044    if dir_created:
1045        pid = os.getpid()
1046    try:
1047        yield path
1048    finally:
1049        # In case the process forks, let only the parent remove the
1050        # directory. The child has a different process id. (bpo-30028)
1051        if dir_created and pid == os.getpid():
1052            rmtree(path)
1053
1054@contextlib.contextmanager
1055def change_cwd(path, quiet=False):
1056    """Return a context manager that changes the current working directory.
1057
1058    Arguments:
1059
1060      path: the directory to use as the temporary current working directory.
1061
1062      quiet: if False (the default), the context manager raises an exception
1063        on error.  Otherwise, it issues only a warning and keeps the current
1064        working directory the same.
1065
1066    """
1067    saved_dir = os.getcwd()
1068    try:
1069        os.chdir(os.path.realpath(path))
1070    except OSError as exc:
1071        if not quiet:
1072            raise
1073        warnings.warn(f'tests may fail, unable to change the current working '
1074                      f'directory to {path!r}: {exc}',
1075                      RuntimeWarning, stacklevel=3)
1076    try:
1077        yield os.getcwd()
1078    finally:
1079        os.chdir(saved_dir)
1080
1081
1082@contextlib.contextmanager
1083def temp_cwd(name='tempcwd', quiet=False):
1084    """
1085    Context manager that temporarily creates and changes the CWD.
1086
1087    The function temporarily changes the current working directory
1088    after creating a temporary directory in the current directory with
1089    name *name*.  If *name* is None, the temporary directory is
1090    created using tempfile.mkdtemp.
1091
1092    If *quiet* is False (default) and it is not possible to
1093    create or change the CWD, an error is raised.  If *quiet* is True,
1094    only a warning is raised and the original CWD is used.
1095
1096    """
1097    with temp_dir(path=name, quiet=quiet) as temp_path:
1098        with change_cwd(temp_path, quiet=quiet) as cwd_dir:
1099            yield cwd_dir
1100
1101if hasattr(os, "umask"):
1102    @contextlib.contextmanager
1103    def temp_umask(umask):
1104        """Context manager that temporarily sets the process umask."""
1105        oldmask = os.umask(umask)
1106        try:
1107            yield
1108        finally:
1109            os.umask(oldmask)
1110
1111# TEST_HOME_DIR refers to the top level directory of the "test" package
1112# that contains Python's regression test suite
1113TEST_SUPPORT_DIR = os.path.dirname(os.path.abspath(__file__))
1114TEST_HOME_DIR = os.path.dirname(TEST_SUPPORT_DIR)
1115
1116# TEST_DATA_DIR is used as a target download location for remote resources
1117TEST_DATA_DIR = os.path.join(TEST_HOME_DIR, "data")
1118
1119def findfile(filename, subdir=None):
1120    """Try to find a file on sys.path or in the test directory.  If it is not
1121    found the argument passed to the function is returned (this does not
1122    necessarily signal failure; could still be the legitimate path).
1123
1124    Setting *subdir* indicates a relative path to use to find the file
1125    rather than looking directly in the path directories.
1126    """
1127    if os.path.isabs(filename):
1128        return filename
1129    if subdir is not None:
1130        filename = os.path.join(subdir, filename)
1131    path = [TEST_HOME_DIR] + sys.path
1132    for dn in path:
1133        fn = os.path.join(dn, filename)
1134        if os.path.exists(fn): return fn
1135    return filename
1136
1137def create_empty_file(filename):
1138    """Create an empty file. If the file already exists, truncate it."""
1139    fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
1140    os.close(fd)
1141
1142def sortdict(dict):
1143    "Like repr(dict), but in sorted order."
1144    items = sorted(dict.items())
1145    reprpairs = ["%r: %r" % pair for pair in items]
1146    withcommas = ", ".join(reprpairs)
1147    return "{%s}" % withcommas
1148
1149def make_bad_fd():
1150    """
1151    Create an invalid file descriptor by opening and closing a file and return
1152    its fd.
1153    """
1154    file = open(TESTFN, "wb")
1155    try:
1156        return file.fileno()
1157    finally:
1158        file.close()
1159        unlink(TESTFN)
1160
1161
1162def check_syntax_error(testcase, statement, errtext='', *, lineno=None, offset=None):
1163    with testcase.assertRaisesRegex(SyntaxError, errtext) as cm:
1164        compile(statement, '<test string>', 'exec')
1165    err = cm.exception
1166    testcase.assertIsNotNone(err.lineno)
1167    if lineno is not None:
1168        testcase.assertEqual(err.lineno, lineno)
1169    testcase.assertIsNotNone(err.offset)
1170    if offset is not None:
1171        testcase.assertEqual(err.offset, offset)
1172
1173def check_syntax_warning(testcase, statement, errtext='', *, lineno=1, offset=None):
1174    # Test also that a warning is emitted only once.
1175    with warnings.catch_warnings(record=True) as warns:
1176        warnings.simplefilter('always', SyntaxWarning)
1177        compile(statement, '<testcase>', 'exec')
1178    testcase.assertEqual(len(warns), 1, warns)
1179
1180    warn, = warns
1181    testcase.assertTrue(issubclass(warn.category, SyntaxWarning), warn.category)
1182    if errtext:
1183        testcase.assertRegex(str(warn.message), errtext)
1184    testcase.assertEqual(warn.filename, '<testcase>')
1185    testcase.assertIsNotNone(warn.lineno)
1186    if lineno is not None:
1187        testcase.assertEqual(warn.lineno, lineno)
1188
1189    # SyntaxWarning should be converted to SyntaxError when raised,
1190    # since the latter contains more information and provides better
1191    # error report.
1192    with warnings.catch_warnings(record=True) as warns:
1193        warnings.simplefilter('error', SyntaxWarning)
1194        check_syntax_error(testcase, statement, errtext,
1195                           lineno=lineno, offset=offset)
1196    # No warnings are leaked when a SyntaxError is raised.
1197    testcase.assertEqual(warns, [])
1198
1199
1200def open_urlresource(url, *args, **kw):
1201    import urllib.request, urllib.parse
1202
1203    check = kw.pop('check', None)
1204
1205    filename = urllib.parse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
1206
1207    fn = os.path.join(TEST_DATA_DIR, filename)
1208
1209    def check_valid_file(fn):
1210        f = open(fn, *args, **kw)
1211        if check is None:
1212            return f
1213        elif check(f):
1214            f.seek(0)
1215            return f
1216        f.close()
1217
1218    if os.path.exists(fn):
1219        f = check_valid_file(fn)
1220        if f is not None:
1221            return f
1222        unlink(fn)
1223
1224    # Verify the requirement before downloading the file
1225    requires('urlfetch')
1226
1227    if verbose:
1228        print('\tfetching %s ...' % url, file=get_original_stdout())
1229    opener = urllib.request.build_opener()
1230    if gzip:
1231        opener.addheaders.append(('Accept-Encoding', 'gzip'))
1232    f = opener.open(url, timeout=15)
1233    if gzip and f.headers.get('Content-Encoding') == 'gzip':
1234        f = gzip.GzipFile(fileobj=f)
1235    try:
1236        with open(fn, "wb") as out:
1237            s = f.read()
1238            while s:
1239                out.write(s)
1240                s = f.read()
1241    finally:
1242        f.close()
1243
1244    f = check_valid_file(fn)
1245    if f is not None:
1246        return f
1247    raise TestFailed('invalid resource %r' % fn)
1248
1249
1250class WarningsRecorder(object):
1251    """Convenience wrapper for the warnings list returned on
1252       entry to the warnings.catch_warnings() context manager.
1253    """
1254    def __init__(self, warnings_list):
1255        self._warnings = warnings_list
1256        self._last = 0
1257
1258    def __getattr__(self, attr):
1259        if len(self._warnings) > self._last:
1260            return getattr(self._warnings[-1], attr)
1261        elif attr in warnings.WarningMessage._WARNING_DETAILS:
1262            return None
1263        raise AttributeError("%r has no attribute %r" % (self, attr))
1264
1265    @property
1266    def warnings(self):
1267        return self._warnings[self._last:]
1268
1269    def reset(self):
1270        self._last = len(self._warnings)
1271
1272
1273def _filterwarnings(filters, quiet=False):
1274    """Catch the warnings, then check if all the expected
1275    warnings have been raised and re-raise unexpected warnings.
1276    If 'quiet' is True, only re-raise the unexpected warnings.
1277    """
1278    # Clear the warning registry of the calling module
1279    # in order to re-raise the warnings.
1280    frame = sys._getframe(2)
1281    registry = frame.f_globals.get('__warningregistry__')
1282    if registry:
1283        registry.clear()
1284    with warnings.catch_warnings(record=True) as w:
1285        # Set filter "always" to record all warnings.  Because
1286        # test_warnings swap the module, we need to look up in
1287        # the sys.modules dictionary.
1288        sys.modules['warnings'].simplefilter("always")
1289        yield WarningsRecorder(w)
1290    # Filter the recorded warnings
1291    reraise = list(w)
1292    missing = []
1293    for msg, cat in filters:
1294        seen = False
1295        for w in reraise[:]:
1296            warning = w.message
1297            # Filter out the matching messages
1298            if (re.match(msg, str(warning), re.I) and
1299                issubclass(warning.__class__, cat)):
1300                seen = True
1301                reraise.remove(w)
1302        if not seen and not quiet:
1303            # This filter caught nothing
1304            missing.append((msg, cat.__name__))
1305    if reraise:
1306        raise AssertionError("unhandled warning %s" % reraise[0])
1307    if missing:
1308        raise AssertionError("filter (%r, %s) did not catch any warning" %
1309                             missing[0])
1310
1311
1312@contextlib.contextmanager
1313def check_warnings(*filters, **kwargs):
1314    """Context manager to silence warnings.
1315
1316    Accept 2-tuples as positional arguments:
1317        ("message regexp", WarningCategory)
1318
1319    Optional argument:
1320     - if 'quiet' is True, it does not fail if a filter catches nothing
1321        (default True without argument,
1322         default False if some filters are defined)
1323
1324    Without argument, it defaults to:
1325        check_warnings(("", Warning), quiet=True)
1326    """
1327    quiet = kwargs.get('quiet')
1328    if not filters:
1329        filters = (("", Warning),)
1330        # Preserve backward compatibility
1331        if quiet is None:
1332            quiet = True
1333    return _filterwarnings(filters, quiet)
1334
1335
1336@contextlib.contextmanager
1337def check_no_warnings(testcase, message='', category=Warning, force_gc=False):
1338    """Context manager to check that no warnings are emitted.
1339
1340    This context manager enables a given warning within its scope
1341    and checks that no warnings are emitted even with that warning
1342    enabled.
1343
1344    If force_gc is True, a garbage collection is attempted before checking
1345    for warnings. This may help to catch warnings emitted when objects
1346    are deleted, such as ResourceWarning.
1347
1348    Other keyword arguments are passed to warnings.filterwarnings().
1349    """
1350    with warnings.catch_warnings(record=True) as warns:
1351        warnings.filterwarnings('always',
1352                                message=message,
1353                                category=category)
1354        yield
1355        if force_gc:
1356            gc_collect()
1357    testcase.assertEqual(warns, [])
1358
1359
1360@contextlib.contextmanager
1361def check_no_resource_warning(testcase):
1362    """Context manager to check that no ResourceWarning is emitted.
1363
1364    Usage:
1365
1366        with check_no_resource_warning(self):
1367            f = open(...)
1368            ...
1369            del f
1370
1371    You must remove the object which may emit ResourceWarning before
1372    the end of the context manager.
1373    """
1374    with check_no_warnings(testcase, category=ResourceWarning, force_gc=True):
1375        yield
1376
1377
1378class CleanImport(object):
1379    """Context manager to force import to return a new module reference.
1380
1381    This is useful for testing module-level behaviours, such as
1382    the emission of a DeprecationWarning on import.
1383
1384    Use like this:
1385
1386        with CleanImport("foo"):
1387            importlib.import_module("foo") # new reference
1388    """
1389
1390    def __init__(self, *module_names):
1391        self.original_modules = sys.modules.copy()
1392        for module_name in module_names:
1393            if module_name in sys.modules:
1394                module = sys.modules[module_name]
1395                # It is possible that module_name is just an alias for
1396                # another module (e.g. stub for modules renamed in 3.x).
1397                # In that case, we also need delete the real module to clear
1398                # the import cache.
1399                if module.__name__ != module_name:
1400                    del sys.modules[module.__name__]
1401                del sys.modules[module_name]
1402
1403    def __enter__(self):
1404        return self
1405
1406    def __exit__(self, *ignore_exc):
1407        sys.modules.update(self.original_modules)
1408
1409
1410class EnvironmentVarGuard(collections.abc.MutableMapping):
1411
1412    """Class to help protect the environment variable properly.  Can be used as
1413    a context manager."""
1414
1415    def __init__(self):
1416        self._environ = os.environ
1417        self._changed = {}
1418
1419    def __getitem__(self, envvar):
1420        return self._environ[envvar]
1421
1422    def __setitem__(self, envvar, value):
1423        # Remember the initial value on the first access
1424        if envvar not in self._changed:
1425            self._changed[envvar] = self._environ.get(envvar)
1426        self._environ[envvar] = value
1427
1428    def __delitem__(self, envvar):
1429        # Remember the initial value on the first access
1430        if envvar not in self._changed:
1431            self._changed[envvar] = self._environ.get(envvar)
1432        if envvar in self._environ:
1433            del self._environ[envvar]
1434
1435    def keys(self):
1436        return self._environ.keys()
1437
1438    def __iter__(self):
1439        return iter(self._environ)
1440
1441    def __len__(self):
1442        return len(self._environ)
1443
1444    def set(self, envvar, value):
1445        self[envvar] = value
1446
1447    def unset(self, envvar):
1448        del self[envvar]
1449
1450    def __enter__(self):
1451        return self
1452
1453    def __exit__(self, *ignore_exc):
1454        for (k, v) in self._changed.items():
1455            if v is None:
1456                if k in self._environ:
1457                    del self._environ[k]
1458            else:
1459                self._environ[k] = v
1460        os.environ = self._environ
1461
1462
1463class DirsOnSysPath(object):
1464    """Context manager to temporarily add directories to sys.path.
1465
1466    This makes a copy of sys.path, appends any directories given
1467    as positional arguments, then reverts sys.path to the copied
1468    settings when the context ends.
1469
1470    Note that *all* sys.path modifications in the body of the
1471    context manager, including replacement of the object,
1472    will be reverted at the end of the block.
1473    """
1474
1475    def __init__(self, *paths):
1476        self.original_value = sys.path[:]
1477        self.original_object = sys.path
1478        sys.path.extend(paths)
1479
1480    def __enter__(self):
1481        return self
1482
1483    def __exit__(self, *ignore_exc):
1484        sys.path = self.original_object
1485        sys.path[:] = self.original_value
1486
1487
1488class TransientResource(object):
1489
1490    """Raise ResourceDenied if an exception is raised while the context manager
1491    is in effect that matches the specified exception and attributes."""
1492
1493    def __init__(self, exc, **kwargs):
1494        self.exc = exc
1495        self.attrs = kwargs
1496
1497    def __enter__(self):
1498        return self
1499
1500    def __exit__(self, type_=None, value=None, traceback=None):
1501        """If type_ is a subclass of self.exc and value has attributes matching
1502        self.attrs, raise ResourceDenied.  Otherwise let the exception
1503        propagate (if any)."""
1504        if type_ is not None and issubclass(self.exc, type_):
1505            for attr, attr_value in self.attrs.items():
1506                if not hasattr(value, attr):
1507                    break
1508                if getattr(value, attr) != attr_value:
1509                    break
1510            else:
1511                raise ResourceDenied("an optional resource is not available")
1512
1513# Context managers that raise ResourceDenied when various issues
1514# with the Internet connection manifest themselves as exceptions.
1515# XXX deprecate these and use transient_internet() instead
1516time_out = TransientResource(OSError, errno=errno.ETIMEDOUT)
1517socket_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET)
1518ioerror_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET)
1519
1520
1521def get_socket_conn_refused_errs():
1522    """
1523    Get the different socket error numbers ('errno') which can be received
1524    when a connection is refused.
1525    """
1526    errors = [errno.ECONNREFUSED]
1527    if hasattr(errno, 'ENETUNREACH'):
1528        # On Solaris, ENETUNREACH is returned sometimes instead of ECONNREFUSED
1529        errors.append(errno.ENETUNREACH)
1530    if hasattr(errno, 'EADDRNOTAVAIL'):
1531        # bpo-31910: socket.create_connection() fails randomly
1532        # with EADDRNOTAVAIL on Travis CI
1533        errors.append(errno.EADDRNOTAVAIL)
1534    if hasattr(errno, 'EHOSTUNREACH'):
1535        # bpo-37583: The destination host cannot be reached
1536        errors.append(errno.EHOSTUNREACH)
1537    if not IPV6_ENABLED:
1538        errors.append(errno.EAFNOSUPPORT)
1539    return errors
1540
1541
1542@contextlib.contextmanager
1543def transient_internet(resource_name, *, timeout=30.0, errnos=()):
1544    """Return a context manager that raises ResourceDenied when various issues
1545    with the Internet connection manifest themselves as exceptions."""
1546    default_errnos = [
1547        ('ECONNREFUSED', 111),
1548        ('ECONNRESET', 104),
1549        ('EHOSTUNREACH', 113),
1550        ('ENETUNREACH', 101),
1551        ('ETIMEDOUT', 110),
1552        # socket.create_connection() fails randomly with
1553        # EADDRNOTAVAIL on Travis CI.
1554        ('EADDRNOTAVAIL', 99),
1555    ]
1556    default_gai_errnos = [
1557        ('EAI_AGAIN', -3),
1558        ('EAI_FAIL', -4),
1559        ('EAI_NONAME', -2),
1560        ('EAI_NODATA', -5),
1561        # Encountered when trying to resolve IPv6-only hostnames
1562        ('WSANO_DATA', 11004),
1563    ]
1564
1565    denied = ResourceDenied("Resource %r is not available" % resource_name)
1566    captured_errnos = errnos
1567    gai_errnos = []
1568    if not captured_errnos:
1569        captured_errnos = [getattr(errno, name, num)
1570                           for (name, num) in default_errnos]
1571        gai_errnos = [getattr(socket, name, num)
1572                      for (name, num) in default_gai_errnos]
1573
1574    def filter_error(err):
1575        n = getattr(err, 'errno', None)
1576        if (isinstance(err, socket.timeout) or
1577            (isinstance(err, socket.gaierror) and n in gai_errnos) or
1578            (isinstance(err, urllib.error.HTTPError) and
1579             500 <= err.code <= 599) or
1580            (isinstance(err, urllib.error.URLError) and
1581                 (("ConnectionRefusedError" in err.reason) or
1582                  ("TimeoutError" in err.reason) or
1583                  ("EOFError" in err.reason))) or
1584            n in captured_errnos):
1585            if not verbose:
1586                sys.stderr.write(denied.args[0] + "\n")
1587            raise denied from err
1588
1589    old_timeout = socket.getdefaulttimeout()
1590    try:
1591        if timeout is not None:
1592            socket.setdefaulttimeout(timeout)
1593        yield
1594    except nntplib.NNTPTemporaryError as err:
1595        if verbose:
1596            sys.stderr.write(denied.args[0] + "\n")
1597        raise denied from err
1598    except OSError as err:
1599        # urllib can wrap original socket errors multiple times (!), we must
1600        # unwrap to get at the original error.
1601        while True:
1602            a = err.args
1603            if len(a) >= 1 and isinstance(a[0], OSError):
1604                err = a[0]
1605            # The error can also be wrapped as args[1]:
1606            #    except socket.error as msg:
1607            #        raise OSError('socket error', msg).with_traceback(sys.exc_info()[2])
1608            elif len(a) >= 2 and isinstance(a[1], OSError):
1609                err = a[1]
1610            else:
1611                break
1612        filter_error(err)
1613        raise
1614    # XXX should we catch generic exceptions and look for their
1615    # __cause__ or __context__?
1616    finally:
1617        socket.setdefaulttimeout(old_timeout)
1618
1619
1620@contextlib.contextmanager
1621def captured_output(stream_name):
1622    """Return a context manager used by captured_stdout/stdin/stderr
1623    that temporarily replaces the sys stream *stream_name* with a StringIO."""
1624    import io
1625    orig_stdout = getattr(sys, stream_name)
1626    setattr(sys, stream_name, io.StringIO())
1627    try:
1628        yield getattr(sys, stream_name)
1629    finally:
1630        setattr(sys, stream_name, orig_stdout)
1631
1632def captured_stdout():
1633    """Capture the output of sys.stdout:
1634
1635       with captured_stdout() as stdout:
1636           print("hello")
1637       self.assertEqual(stdout.getvalue(), "hello\\n")
1638    """
1639    return captured_output("stdout")
1640
1641def captured_stderr():
1642    """Capture the output of sys.stderr:
1643
1644       with captured_stderr() as stderr:
1645           print("hello", file=sys.stderr)
1646       self.assertEqual(stderr.getvalue(), "hello\\n")
1647    """
1648    return captured_output("stderr")
1649
1650def captured_stdin():
1651    """Capture the input to sys.stdin:
1652
1653       with captured_stdin() as stdin:
1654           stdin.write('hello\\n')
1655           stdin.seek(0)
1656           # call test code that consumes from sys.stdin
1657           captured = input()
1658       self.assertEqual(captured, "hello")
1659    """
1660    return captured_output("stdin")
1661
1662
1663def gc_collect():
1664    """Force as many objects as possible to be collected.
1665
1666    In non-CPython implementations of Python, this is needed because timely
1667    deallocation is not guaranteed by the garbage collector.  (Even in CPython
1668    this can be the case in case of reference cycles.)  This means that __del__
1669    methods may be called later than expected and weakrefs may remain alive for
1670    longer than expected.  This function tries its best to force all garbage
1671    objects to disappear.
1672    """
1673    gc.collect()
1674    if is_jython:
1675        time.sleep(0.1)
1676    gc.collect()
1677    gc.collect()
1678
1679@contextlib.contextmanager
1680def disable_gc():
1681    have_gc = gc.isenabled()
1682    gc.disable()
1683    try:
1684        yield
1685    finally:
1686        if have_gc:
1687            gc.enable()
1688
1689
1690def python_is_optimized():
1691    """Find if Python was built with optimizations."""
1692    cflags = sysconfig.get_config_var('PY_CFLAGS') or ''
1693    final_opt = ""
1694    for opt in cflags.split():
1695        if opt.startswith('-O'):
1696            final_opt = opt
1697    return final_opt not in ('', '-O0', '-Og')
1698
1699
1700_header = 'nP'
1701_align = '0n'
1702if hasattr(sys, "getobjects"):
1703    _header = '2P' + _header
1704    _align = '0P'
1705_vheader = _header + 'n'
1706
1707def calcobjsize(fmt):
1708    return struct.calcsize(_header + fmt + _align)
1709
1710def calcvobjsize(fmt):
1711    return struct.calcsize(_vheader + fmt + _align)
1712
1713
1714_TPFLAGS_HAVE_GC = 1<<14
1715_TPFLAGS_HEAPTYPE = 1<<9
1716
1717def check_sizeof(test, o, size):
1718    import _testcapi
1719    result = sys.getsizeof(o)
1720    # add GC header size
1721    if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\
1722        ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))):
1723        size += _testcapi.SIZEOF_PYGC_HEAD
1724    msg = 'wrong size for %s: got %d, expected %d' \
1725            % (type(o), result, size)
1726    test.assertEqual(result, size, msg)
1727
1728#=======================================================================
1729# Decorator for running a function in a different locale, correctly resetting
1730# it afterwards.
1731
1732def run_with_locale(catstr, *locales):
1733    def decorator(func):
1734        def inner(*args, **kwds):
1735            try:
1736                import locale
1737                category = getattr(locale, catstr)
1738                orig_locale = locale.setlocale(category)
1739            except AttributeError:
1740                # if the test author gives us an invalid category string
1741                raise
1742            except:
1743                # cannot retrieve original locale, so do nothing
1744                locale = orig_locale = None
1745            else:
1746                for loc in locales:
1747                    try:
1748                        locale.setlocale(category, loc)
1749                        break
1750                    except:
1751                        pass
1752
1753            # now run the function, resetting the locale on exceptions
1754            try:
1755                return func(*args, **kwds)
1756            finally:
1757                if locale and orig_locale:
1758                    locale.setlocale(category, orig_locale)
1759        inner.__name__ = func.__name__
1760        inner.__doc__ = func.__doc__
1761        return inner
1762    return decorator
1763
1764#=======================================================================
1765# Decorator for running a function in a specific timezone, correctly
1766# resetting it afterwards.
1767
1768def run_with_tz(tz):
1769    def decorator(func):
1770        def inner(*args, **kwds):
1771            try:
1772                tzset = time.tzset
1773            except AttributeError:
1774                raise unittest.SkipTest("tzset required")
1775            if 'TZ' in os.environ:
1776                orig_tz = os.environ['TZ']
1777            else:
1778                orig_tz = None
1779            os.environ['TZ'] = tz
1780            tzset()
1781
1782            # now run the function, resetting the tz on exceptions
1783            try:
1784                return func(*args, **kwds)
1785            finally:
1786                if orig_tz is None:
1787                    del os.environ['TZ']
1788                else:
1789                    os.environ['TZ'] = orig_tz
1790                time.tzset()
1791
1792        inner.__name__ = func.__name__
1793        inner.__doc__ = func.__doc__
1794        return inner
1795    return decorator
1796
1797#=======================================================================
1798# Big-memory-test support. Separate from 'resources' because memory use
1799# should be configurable.
1800
1801# Some handy shorthands. Note that these are used for byte-limits as well
1802# as size-limits, in the various bigmem tests
1803_1M = 1024*1024
1804_1G = 1024 * _1M
1805_2G = 2 * _1G
1806_4G = 4 * _1G
1807
1808MAX_Py_ssize_t = sys.maxsize
1809
1810def set_memlimit(limit):
1811    global max_memuse
1812    global real_max_memuse
1813    sizes = {
1814        'k': 1024,
1815        'm': _1M,
1816        'g': _1G,
1817        't': 1024*_1G,
1818    }
1819    m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
1820                 re.IGNORECASE | re.VERBOSE)
1821    if m is None:
1822        raise ValueError('Invalid memory limit %r' % (limit,))
1823    memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
1824    real_max_memuse = memlimit
1825    if memlimit > MAX_Py_ssize_t:
1826        memlimit = MAX_Py_ssize_t
1827    if memlimit < _2G - 1:
1828        raise ValueError('Memory limit %r too low to be useful' % (limit,))
1829    max_memuse = memlimit
1830
1831class _MemoryWatchdog:
1832    """An object which periodically watches the process' memory consumption
1833    and prints it out.
1834    """
1835
1836    def __init__(self):
1837        self.procfile = '/proc/{pid}/statm'.format(pid=os.getpid())
1838        self.started = False
1839
1840    def start(self):
1841        try:
1842            f = open(self.procfile, 'r')
1843        except OSError as e:
1844            warnings.warn('/proc not available for stats: {}'.format(e),
1845                          RuntimeWarning)
1846            sys.stderr.flush()
1847            return
1848
1849        with f:
1850            watchdog_script = findfile("memory_watchdog.py")
1851            self.mem_watchdog = subprocess.Popen([sys.executable, watchdog_script],
1852                                                 stdin=f,
1853                                                 stderr=subprocess.DEVNULL)
1854        self.started = True
1855
1856    def stop(self):
1857        if self.started:
1858            self.mem_watchdog.terminate()
1859            self.mem_watchdog.wait()
1860
1861
1862def bigmemtest(size, memuse, dry_run=True):
1863    """Decorator for bigmem tests.
1864
1865    'size' is a requested size for the test (in arbitrary, test-interpreted
1866    units.) 'memuse' is the number of bytes per unit for the test, or a good
1867    estimate of it. For example, a test that needs two byte buffers, of 4 GiB
1868    each, could be decorated with @bigmemtest(size=_4G, memuse=2).
1869
1870    The 'size' argument is normally passed to the decorated test method as an
1871    extra argument. If 'dry_run' is true, the value passed to the test method
1872    may be less than the requested value. If 'dry_run' is false, it means the
1873    test doesn't support dummy runs when -M is not specified.
1874    """
1875    def decorator(f):
1876        def wrapper(self):
1877            size = wrapper.size
1878            memuse = wrapper.memuse
1879            if not real_max_memuse:
1880                maxsize = 5147
1881            else:
1882                maxsize = size
1883
1884            if ((real_max_memuse or not dry_run)
1885                and real_max_memuse < maxsize * memuse):
1886                raise unittest.SkipTest(
1887                    "not enough memory: %.1fG minimum needed"
1888                    % (size * memuse / (1024 ** 3)))
1889
1890            if real_max_memuse and verbose:
1891                print()
1892                print(" ... expected peak memory use: {peak:.1f}G"
1893                      .format(peak=size * memuse / (1024 ** 3)))
1894                watchdog = _MemoryWatchdog()
1895                watchdog.start()
1896            else:
1897                watchdog = None
1898
1899            try:
1900                return f(self, maxsize)
1901            finally:
1902                if watchdog:
1903                    watchdog.stop()
1904
1905        wrapper.size = size
1906        wrapper.memuse = memuse
1907        return wrapper
1908    return decorator
1909
1910def bigaddrspacetest(f):
1911    """Decorator for tests that fill the address space."""
1912    def wrapper(self):
1913        if max_memuse < MAX_Py_ssize_t:
1914            if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31:
1915                raise unittest.SkipTest(
1916                    "not enough memory: try a 32-bit build instead")
1917            else:
1918                raise unittest.SkipTest(
1919                    "not enough memory: %.1fG minimum needed"
1920                    % (MAX_Py_ssize_t / (1024 ** 3)))
1921        else:
1922            return f(self)
1923    return wrapper
1924
1925#=======================================================================
1926# unittest integration.
1927
1928class BasicTestRunner:
1929    def run(self, test):
1930        result = unittest.TestResult()
1931        test(result)
1932        return result
1933
1934def _id(obj):
1935    return obj
1936
1937def requires_resource(resource):
1938    if resource == 'gui' and not _is_gui_available():
1939        return unittest.skip(_is_gui_available.reason)
1940    if is_resource_enabled(resource):
1941        return _id
1942    else:
1943        return unittest.skip("resource {0!r} is not enabled".format(resource))
1944
1945def cpython_only(test):
1946    """
1947    Decorator for tests only applicable on CPython.
1948    """
1949    return impl_detail(cpython=True)(test)
1950
1951def impl_detail(msg=None, **guards):
1952    if check_impl_detail(**guards):
1953        return _id
1954    if msg is None:
1955        guardnames, default = _parse_guards(guards)
1956        if default:
1957            msg = "implementation detail not available on {0}"
1958        else:
1959            msg = "implementation detail specific to {0}"
1960        guardnames = sorted(guardnames.keys())
1961        msg = msg.format(' or '.join(guardnames))
1962    return unittest.skip(msg)
1963
1964def _parse_guards(guards):
1965    # Returns a tuple ({platform_name: run_me}, default_value)
1966    if not guards:
1967        return ({'cpython': True}, False)
1968    is_true = list(guards.values())[0]
1969    assert list(guards.values()) == [is_true] * len(guards)   # all True or all False
1970    return (guards, not is_true)
1971
1972# Use the following check to guard CPython's implementation-specific tests --
1973# or to run them only on the implementation(s) guarded by the arguments.
1974def check_impl_detail(**guards):
1975    """This function returns True or False depending on the host platform.
1976       Examples:
1977          if check_impl_detail():               # only on CPython (default)
1978          if check_impl_detail(jython=True):    # only on Jython
1979          if check_impl_detail(cpython=False):  # everywhere except on CPython
1980    """
1981    guards, default = _parse_guards(guards)
1982    return guards.get(platform.python_implementation().lower(), default)
1983
1984
1985def no_tracing(func):
1986    """Decorator to temporarily turn off tracing for the duration of a test."""
1987    if not hasattr(sys, 'gettrace'):
1988        return func
1989    else:
1990        @functools.wraps(func)
1991        def wrapper(*args, **kwargs):
1992            original_trace = sys.gettrace()
1993            try:
1994                sys.settrace(None)
1995                return func(*args, **kwargs)
1996            finally:
1997                sys.settrace(original_trace)
1998        return wrapper
1999
2000
2001def refcount_test(test):
2002    """Decorator for tests which involve reference counting.
2003
2004    To start, the decorator does not run the test if is not run by CPython.
2005    After that, any trace function is unset during the test to prevent
2006    unexpected refcounts caused by the trace function.
2007
2008    """
2009    return no_tracing(cpython_only(test))
2010
2011
2012def _filter_suite(suite, pred):
2013    """Recursively filter test cases in a suite based on a predicate."""
2014    newtests = []
2015    for test in suite._tests:
2016        if isinstance(test, unittest.TestSuite):
2017            _filter_suite(test, pred)
2018            newtests.append(test)
2019        else:
2020            if pred(test):
2021                newtests.append(test)
2022    suite._tests = newtests
2023
2024def _run_suite(suite):
2025    """Run tests from a unittest.TestSuite-derived class."""
2026    runner = get_test_runner(sys.stdout,
2027                             verbosity=verbose,
2028                             capture_output=(junit_xml_list is not None))
2029
2030    result = runner.run(suite)
2031
2032    if junit_xml_list is not None:
2033        junit_xml_list.append(result.get_xml_element())
2034
2035    if not result.testsRun and not result.skipped:
2036        raise TestDidNotRun
2037    if not result.wasSuccessful():
2038        if len(result.errors) == 1 and not result.failures:
2039            err = result.errors[0][1]
2040        elif len(result.failures) == 1 and not result.errors:
2041            err = result.failures[0][1]
2042        else:
2043            err = "multiple errors occurred"
2044            if not verbose: err += "; run in verbose mode for details"
2045        raise TestFailed(err)
2046
2047
2048# By default, don't filter tests
2049_match_test_func = None
2050
2051_accept_test_patterns = None
2052_ignore_test_patterns = None
2053
2054
2055def match_test(test):
2056    # Function used by support.run_unittest() and regrtest --list-cases
2057    if _match_test_func is None:
2058        return True
2059    else:
2060        return _match_test_func(test.id())
2061
2062
2063def _is_full_match_test(pattern):
2064    # If a pattern contains at least one dot, it's considered
2065    # as a full test identifier.
2066    # Example: 'test.test_os.FileTests.test_access'.
2067    #
2068    # ignore patterns which contain fnmatch patterns: '*', '?', '[...]'
2069    # or '[!...]'. For example, ignore 'test_access*'.
2070    return ('.' in pattern) and (not re.search(r'[?*\[\]]', pattern))
2071
2072
2073def set_match_tests(accept_patterns=None, ignore_patterns=None):
2074    global _match_test_func, _accept_test_patterns, _ignore_test_patterns
2075
2076
2077    if accept_patterns is None:
2078        accept_patterns = ()
2079    if ignore_patterns is None:
2080        ignore_patterns = ()
2081
2082    accept_func = ignore_func = None
2083
2084    if accept_patterns != _accept_test_patterns:
2085        accept_patterns, accept_func = _compile_match_function(accept_patterns)
2086    if ignore_patterns != _ignore_test_patterns:
2087        ignore_patterns, ignore_func = _compile_match_function(ignore_patterns)
2088
2089    # Create a copy since patterns can be mutable and so modified later
2090    _accept_test_patterns = tuple(accept_patterns)
2091    _ignore_test_patterns = tuple(ignore_patterns)
2092
2093    if accept_func is not None or ignore_func is not None:
2094        def match_function(test_id):
2095            accept = True
2096            ignore = False
2097            if accept_func:
2098                accept = accept_func(test_id)
2099            if ignore_func:
2100                ignore = ignore_func(test_id)
2101            return accept and not ignore
2102
2103        _match_test_func = match_function
2104
2105
2106def _compile_match_function(patterns):
2107    if not patterns:
2108        func = None
2109        # set_match_tests(None) behaves as set_match_tests(())
2110        patterns = ()
2111    elif all(map(_is_full_match_test, patterns)):
2112        # Simple case: all patterns are full test identifier.
2113        # The test.bisect_cmd utility only uses such full test identifiers.
2114        func = set(patterns).__contains__
2115    else:
2116        regex = '|'.join(map(fnmatch.translate, patterns))
2117        # The search *is* case sensitive on purpose:
2118        # don't use flags=re.IGNORECASE
2119        regex_match = re.compile(regex).match
2120
2121        def match_test_regex(test_id):
2122            if regex_match(test_id):
2123                # The regex matches the whole identifier, for example
2124                # 'test.test_os.FileTests.test_access'.
2125                return True
2126            else:
2127                # Try to match parts of the test identifier.
2128                # For example, split 'test.test_os.FileTests.test_access'
2129                # into: 'test', 'test_os', 'FileTests' and 'test_access'.
2130                return any(map(regex_match, test_id.split(".")))
2131
2132        func = match_test_regex
2133
2134    return patterns, func
2135
2136
2137def run_unittest(*classes):
2138    """Run tests from unittest.TestCase-derived classes."""
2139    valid_types = (unittest.TestSuite, unittest.TestCase)
2140    suite = unittest.TestSuite()
2141    for cls in classes:
2142        if isinstance(cls, str):
2143            if cls in sys.modules:
2144                suite.addTest(unittest.findTestCases(sys.modules[cls]))
2145            else:
2146                raise ValueError("str arguments must be keys in sys.modules")
2147        elif isinstance(cls, valid_types):
2148            suite.addTest(cls)
2149        else:
2150            suite.addTest(unittest.makeSuite(cls))
2151    _filter_suite(suite, match_test)
2152    _run_suite(suite)
2153
2154#=======================================================================
2155# Check for the presence of docstrings.
2156
2157# Rather than trying to enumerate all the cases where docstrings may be
2158# disabled, we just check for that directly
2159
2160def _check_docstrings():
2161    """Just used to check if docstrings are enabled"""
2162
2163MISSING_C_DOCSTRINGS = (check_impl_detail() and
2164                        sys.platform != 'win32' and
2165                        not sysconfig.get_config_var('WITH_DOC_STRINGS'))
2166
2167HAVE_DOCSTRINGS = (_check_docstrings.__doc__ is not None and
2168                   not MISSING_C_DOCSTRINGS)
2169
2170requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS,
2171                                          "test requires docstrings")
2172
2173
2174#=======================================================================
2175# doctest driver.
2176
2177def run_doctest(module, verbosity=None, optionflags=0):
2178    """Run doctest on the given module.  Return (#failures, #tests).
2179
2180    If optional argument verbosity is not specified (or is None), pass
2181    support's belief about verbosity on to doctest.  Else doctest's
2182    usual behavior is used (it searches sys.argv for -v).
2183    """
2184
2185    import doctest
2186
2187    if verbosity is None:
2188        verbosity = verbose
2189    else:
2190        verbosity = None
2191
2192    f, t = doctest.testmod(module, verbose=verbosity, optionflags=optionflags)
2193    if f:
2194        raise TestFailed("%d of %d doctests failed" % (f, t))
2195    if verbose:
2196        print('doctest (%s) ... %d tests with zero failures' %
2197              (module.__name__, t))
2198    return f, t
2199
2200
2201#=======================================================================
2202# Support for saving and restoring the imported modules.
2203
2204def print_warning(msg):
2205    # bpo-39983: Print into sys.__stderr__ to display the warning even
2206    # when sys.stderr is captured temporarily by a test
2207    for line in msg.splitlines():
2208        print(f"Warning -- {line}", file=sys.__stderr__, flush=True)
2209
2210def modules_setup():
2211    return sys.modules.copy(),
2212
2213def modules_cleanup(oldmodules):
2214    # Encoders/decoders are registered permanently within the internal
2215    # codec cache. If we destroy the corresponding modules their
2216    # globals will be set to None which will trip up the cached functions.
2217    encodings = [(k, v) for k, v in sys.modules.items()
2218                 if k.startswith('encodings.')]
2219    sys.modules.clear()
2220    sys.modules.update(encodings)
2221    # XXX: This kind of problem can affect more than just encodings. In particular
2222    # extension modules (such as _ssl) don't cope with reloading properly.
2223    # Really, test modules should be cleaning out the test specific modules they
2224    # know they added (ala test_runpy) rather than relying on this function (as
2225    # test_importhooks and test_pkg do currently).
2226    # Implicitly imported *real* modules should be left alone (see issue 10556).
2227    sys.modules.update(oldmodules)
2228
2229#=======================================================================
2230# Threading support to prevent reporting refleaks when running regrtest.py -R
2231
2232# Flag used by saved_test_environment of test.libregrtest.save_env,
2233# to check if a test modified the environment. The flag should be set to False
2234# before running a new test.
2235#
2236# For example, threading_cleanup() sets the flag is the function fails
2237# to cleanup threads.
2238environment_altered = False
2239
2240# NOTE: we use thread._count() rather than threading.enumerate() (or the
2241# moral equivalent thereof) because a threading.Thread object is still alive
2242# until its __bootstrap() method has returned, even after it has been
2243# unregistered from the threading module.
2244# thread._count(), on the other hand, only gets decremented *after* the
2245# __bootstrap() method has returned, which gives us reliable reference counts
2246# at the end of a test run.
2247
2248def threading_setup():
2249    return _thread._count(), threading._dangling.copy()
2250
2251def threading_cleanup(*original_values):
2252    global environment_altered
2253
2254    _MAX_COUNT = 100
2255
2256    for count in range(_MAX_COUNT):
2257        values = _thread._count(), threading._dangling
2258        if values == original_values:
2259            break
2260
2261        if not count:
2262            # Display a warning at the first iteration
2263            environment_altered = True
2264            dangling_threads = values[1]
2265            print_warning(f"threading_cleanup() failed to cleanup "
2266                          f"{values[0] - original_values[0]} threads "
2267                          f"(count: {values[0]}, "
2268                          f"dangling: {len(dangling_threads)})")
2269            for thread in dangling_threads:
2270                print_warning(f"Dangling thread: {thread!r}")
2271
2272            # Don't hold references to threads
2273            dangling_threads = None
2274        values = None
2275
2276        time.sleep(0.01)
2277        gc_collect()
2278
2279
2280def reap_threads(func):
2281    """Use this function when threads are being used.  This will
2282    ensure that the threads are cleaned up even when the test fails.
2283    """
2284    @functools.wraps(func)
2285    def decorator(*args):
2286        key = threading_setup()
2287        try:
2288            return func(*args)
2289        finally:
2290            threading_cleanup(*key)
2291    return decorator
2292
2293
2294@contextlib.contextmanager
2295def wait_threads_exit(timeout=60.0):
2296    """
2297    bpo-31234: Context manager to wait until all threads created in the with
2298    statement exit.
2299
2300    Use _thread.count() to check if threads exited. Indirectly, wait until
2301    threads exit the internal t_bootstrap() C function of the _thread module.
2302
2303    threading_setup() and threading_cleanup() are designed to emit a warning
2304    if a test leaves running threads in the background. This context manager
2305    is designed to cleanup threads started by the _thread.start_new_thread()
2306    which doesn't allow to wait for thread exit, whereas thread.Thread has a
2307    join() method.
2308    """
2309    old_count = _thread._count()
2310    try:
2311        yield
2312    finally:
2313        start_time = time.monotonic()
2314        deadline = start_time + timeout
2315        while True:
2316            count = _thread._count()
2317            if count <= old_count:
2318                break
2319            if time.monotonic() > deadline:
2320                dt = time.monotonic() - start_time
2321                msg = (f"wait_threads() failed to cleanup {count - old_count} "
2322                       f"threads after {dt:.1f} seconds "
2323                       f"(count: {count}, old count: {old_count})")
2324                raise AssertionError(msg)
2325            time.sleep(0.010)
2326            gc_collect()
2327
2328
2329def join_thread(thread, timeout=30.0):
2330    """Join a thread. Raise an AssertionError if the thread is still alive
2331    after timeout seconds.
2332    """
2333    thread.join(timeout)
2334    if thread.is_alive():
2335        msg = f"failed to join the thread in {timeout:.1f} seconds"
2336        raise AssertionError(msg)
2337
2338
2339def reap_children():
2340    """Use this function at the end of test_main() whenever sub-processes
2341    are started.  This will help ensure that no extra children (zombies)
2342    stick around to hog resources and create problems when looking
2343    for refleaks.
2344    """
2345    global environment_altered
2346
2347    # Need os.waitpid(-1, os.WNOHANG): Windows is not supported
2348    if not (hasattr(os, 'waitpid') and hasattr(os, 'WNOHANG')):
2349        return
2350
2351    # Reap all our dead child processes so we don't leave zombies around.
2352    # These hog resources and might be causing some of the buildbots to die.
2353    while True:
2354        try:
2355            # Read the exit status of any child process which already completed
2356            pid, status = os.waitpid(-1, os.WNOHANG)
2357        except OSError:
2358            break
2359
2360        if pid == 0:
2361            break
2362
2363        print_warning(f"reap_children() reaped child process {pid}")
2364        environment_altered = True
2365
2366
2367@contextlib.contextmanager
2368def start_threads(threads, unlock=None):
2369    threads = list(threads)
2370    started = []
2371    try:
2372        try:
2373            for t in threads:
2374                t.start()
2375                started.append(t)
2376        except:
2377            if verbose:
2378                print("Can't start %d threads, only %d threads started" %
2379                      (len(threads), len(started)))
2380            raise
2381        yield
2382    finally:
2383        try:
2384            if unlock:
2385                unlock()
2386            endtime = starttime = time.monotonic()
2387            for timeout in range(1, 16):
2388                endtime += 60
2389                for t in started:
2390                    t.join(max(endtime - time.monotonic(), 0.01))
2391                started = [t for t in started if t.is_alive()]
2392                if not started:
2393                    break
2394                if verbose:
2395                    print('Unable to join %d threads during a period of '
2396                          '%d minutes' % (len(started), timeout))
2397        finally:
2398            started = [t for t in started if t.is_alive()]
2399            if started:
2400                faulthandler.dump_traceback(sys.stdout)
2401                raise AssertionError('Unable to join %d threads' % len(started))
2402
2403@contextlib.contextmanager
2404def swap_attr(obj, attr, new_val):
2405    """Temporary swap out an attribute with a new object.
2406
2407    Usage:
2408        with swap_attr(obj, "attr", 5):
2409            ...
2410
2411        This will set obj.attr to 5 for the duration of the with: block,
2412        restoring the old value at the end of the block. If `attr` doesn't
2413        exist on `obj`, it will be created and then deleted at the end of the
2414        block.
2415
2416        The old value (or None if it doesn't exist) will be assigned to the
2417        target of the "as" clause, if there is one.
2418    """
2419    if hasattr(obj, attr):
2420        real_val = getattr(obj, attr)
2421        setattr(obj, attr, new_val)
2422        try:
2423            yield real_val
2424        finally:
2425            setattr(obj, attr, real_val)
2426    else:
2427        setattr(obj, attr, new_val)
2428        try:
2429            yield
2430        finally:
2431            if hasattr(obj, attr):
2432                delattr(obj, attr)
2433
2434@contextlib.contextmanager
2435def swap_item(obj, item, new_val):
2436    """Temporary swap out an item with a new object.
2437
2438    Usage:
2439        with swap_item(obj, "item", 5):
2440            ...
2441
2442        This will set obj["item"] to 5 for the duration of the with: block,
2443        restoring the old value at the end of the block. If `item` doesn't
2444        exist on `obj`, it will be created and then deleted at the end of the
2445        block.
2446
2447        The old value (or None if it doesn't exist) will be assigned to the
2448        target of the "as" clause, if there is one.
2449    """
2450    if item in obj:
2451        real_val = obj[item]
2452        obj[item] = new_val
2453        try:
2454            yield real_val
2455        finally:
2456            obj[item] = real_val
2457    else:
2458        obj[item] = new_val
2459        try:
2460            yield
2461        finally:
2462            if item in obj:
2463                del obj[item]
2464
2465def strip_python_stderr(stderr):
2466    """Strip the stderr of a Python process from potential debug output
2467    emitted by the interpreter.
2468
2469    This will typically be run on the result of the communicate() method
2470    of a subprocess.Popen object.
2471    """
2472    stderr = re.sub(br"\[\d+ refs, \d+ blocks\]\r?\n?", b"", stderr).strip()
2473    return stderr
2474
2475requires_type_collecting = unittest.skipIf(hasattr(sys, 'getcounts'),
2476                        'types are immortal if COUNT_ALLOCS is defined')
2477
2478def args_from_interpreter_flags():
2479    """Return a list of command-line arguments reproducing the current
2480    settings in sys.flags and sys.warnoptions."""
2481    return subprocess._args_from_interpreter_flags()
2482
2483def optim_args_from_interpreter_flags():
2484    """Return a list of command-line arguments reproducing the current
2485    optimization settings in sys.flags."""
2486    return subprocess._optim_args_from_interpreter_flags()
2487
2488#============================================================
2489# Support for assertions about logging.
2490#============================================================
2491
2492class TestHandler(logging.handlers.BufferingHandler):
2493    def __init__(self, matcher):
2494        # BufferingHandler takes a "capacity" argument
2495        # so as to know when to flush. As we're overriding
2496        # shouldFlush anyway, we can set a capacity of zero.
2497        # You can call flush() manually to clear out the
2498        # buffer.
2499        logging.handlers.BufferingHandler.__init__(self, 0)
2500        self.matcher = matcher
2501
2502    def shouldFlush(self):
2503        return False
2504
2505    def emit(self, record):
2506        self.format(record)
2507        self.buffer.append(record.__dict__)
2508
2509    def matches(self, **kwargs):
2510        """
2511        Look for a saved dict whose keys/values match the supplied arguments.
2512        """
2513        result = False
2514        for d in self.buffer:
2515            if self.matcher.matches(d, **kwargs):
2516                result = True
2517                break
2518        return result
2519
2520class Matcher(object):
2521
2522    _partial_matches = ('msg', 'message')
2523
2524    def matches(self, d, **kwargs):
2525        """
2526        Try to match a single dict with the supplied arguments.
2527
2528        Keys whose values are strings and which are in self._partial_matches
2529        will be checked for partial (i.e. substring) matches. You can extend
2530        this scheme to (for example) do regular expression matching, etc.
2531        """
2532        result = True
2533        for k in kwargs:
2534            v = kwargs[k]
2535            dv = d.get(k)
2536            if not self.match_value(k, dv, v):
2537                result = False
2538                break
2539        return result
2540
2541    def match_value(self, k, dv, v):
2542        """
2543        Try to match a single stored value (dv) with a supplied value (v).
2544        """
2545        if type(v) != type(dv):
2546            result = False
2547        elif type(dv) is not str or k not in self._partial_matches:
2548            result = (v == dv)
2549        else:
2550            result = dv.find(v) >= 0
2551        return result
2552
2553
2554_can_symlink = None
2555def can_symlink():
2556    global _can_symlink
2557    if _can_symlink is not None:
2558        return _can_symlink
2559    symlink_path = TESTFN + "can_symlink"
2560    try:
2561        os.symlink(TESTFN, symlink_path)
2562        can = True
2563    except (OSError, NotImplementedError, AttributeError):
2564        can = False
2565    else:
2566        os.remove(symlink_path)
2567    _can_symlink = can
2568    return can
2569
2570def skip_unless_symlink(test):
2571    """Skip decorator for tests that require functional symlink"""
2572    ok = can_symlink()
2573    msg = "Requires functional symlink implementation"
2574    return test if ok else unittest.skip(msg)(test)
2575
2576_buggy_ucrt = None
2577def skip_if_buggy_ucrt_strfptime(test):
2578    """
2579    Skip decorator for tests that use buggy strptime/strftime
2580
2581    If the UCRT bugs are present time.localtime().tm_zone will be
2582    an empty string, otherwise we assume the UCRT bugs are fixed
2583
2584    See bpo-37552 [Windows] strptime/strftime return invalid
2585    results with UCRT version 17763.615
2586    """
2587    global _buggy_ucrt
2588    if _buggy_ucrt is None:
2589        if(sys.platform == 'win32' and
2590                locale.getdefaultlocale()[1]  == 'cp65001' and
2591                time.localtime().tm_zone == ''):
2592            _buggy_ucrt = True
2593        else:
2594            _buggy_ucrt = False
2595    return unittest.skip("buggy MSVC UCRT strptime/strftime")(test) if _buggy_ucrt else test
2596
2597class PythonSymlink:
2598    """Creates a symlink for the current Python executable"""
2599    def __init__(self, link=None):
2600        self.link = link or os.path.abspath(TESTFN)
2601        self._linked = []
2602        self.real = os.path.realpath(sys.executable)
2603        self._also_link = []
2604
2605        self._env = None
2606
2607        self._platform_specific()
2608
2609    def _platform_specific(self):
2610        pass
2611
2612    if sys.platform == "win32":
2613        def _platform_specific(self):
2614            import _winapi
2615
2616            if os.path.lexists(self.real) and not os.path.exists(self.real):
2617                # App symlink appears to not exist, but we want the
2618                # real executable here anyway
2619                self.real = _winapi.GetModuleFileName(0)
2620
2621            dll = _winapi.GetModuleFileName(sys.dllhandle)
2622            src_dir = os.path.dirname(dll)
2623            dest_dir = os.path.dirname(self.link)
2624            self._also_link.append((
2625                dll,
2626                os.path.join(dest_dir, os.path.basename(dll))
2627            ))
2628            for runtime in glob.glob(os.path.join(glob.escape(src_dir), "vcruntime*.dll")):
2629                self._also_link.append((
2630                    runtime,
2631                    os.path.join(dest_dir, os.path.basename(runtime))
2632                ))
2633
2634            self._env = {k.upper(): os.getenv(k) for k in os.environ}
2635            self._env["PYTHONHOME"] = os.path.dirname(self.real)
2636            if sysconfig.is_python_build(True):
2637                self._env["PYTHONPATH"] = os.path.dirname(os.__file__)
2638
2639    def __enter__(self):
2640        os.symlink(self.real, self.link)
2641        self._linked.append(self.link)
2642        for real, link in self._also_link:
2643            os.symlink(real, link)
2644            self._linked.append(link)
2645        return self
2646
2647    def __exit__(self, exc_type, exc_value, exc_tb):
2648        for link in self._linked:
2649            try:
2650                os.remove(link)
2651            except IOError as ex:
2652                if verbose:
2653                    print("failed to clean up {}: {}".format(link, ex))
2654
2655    def _call(self, python, args, env, returncode):
2656        cmd = [python, *args]
2657        p = subprocess.Popen(cmd, stdout=subprocess.PIPE,
2658                             stderr=subprocess.PIPE, env=env)
2659        r = p.communicate()
2660        if p.returncode != returncode:
2661            if verbose:
2662                print(repr(r[0]))
2663                print(repr(r[1]), file=sys.stderr)
2664            raise RuntimeError(
2665                'unexpected return code: {0} (0x{0:08X})'.format(p.returncode))
2666        return r
2667
2668    def call_real(self, *args, returncode=0):
2669        return self._call(self.real, args, None, returncode)
2670
2671    def call_link(self, *args, returncode=0):
2672        return self._call(self.link, args, self._env, returncode)
2673
2674
2675_can_xattr = None
2676def can_xattr():
2677    global _can_xattr
2678    if _can_xattr is not None:
2679        return _can_xattr
2680    if not hasattr(os, "setxattr"):
2681        can = False
2682    else:
2683        tmp_dir = tempfile.mkdtemp()
2684        tmp_fp, tmp_name = tempfile.mkstemp(dir=tmp_dir)
2685        try:
2686            with open(TESTFN, "wb") as fp:
2687                try:
2688                    # TESTFN & tempfile may use different file systems with
2689                    # different capabilities
2690                    os.setxattr(tmp_fp, b"user.test", b"")
2691                    os.setxattr(tmp_name, b"trusted.foo", b"42")
2692                    os.setxattr(fp.fileno(), b"user.test", b"")
2693                    # Kernels < 2.6.39 don't respect setxattr flags.
2694                    kernel_version = platform.release()
2695                    m = re.match(r"2.6.(\d{1,2})", kernel_version)
2696                    can = m is None or int(m.group(1)) >= 39
2697                except OSError:
2698                    can = False
2699        finally:
2700            unlink(TESTFN)
2701            unlink(tmp_name)
2702            rmdir(tmp_dir)
2703    _can_xattr = can
2704    return can
2705
2706def skip_unless_xattr(test):
2707    """Skip decorator for tests that require functional extended attributes"""
2708    ok = can_xattr()
2709    msg = "no non-broken extended attribute support"
2710    return test if ok else unittest.skip(msg)(test)
2711
2712def skip_if_pgo_task(test):
2713    """Skip decorator for tests not run in (non-extended) PGO task"""
2714    ok = not PGO or PGO_EXTENDED
2715    msg = "Not run for (non-extended) PGO task"
2716    return test if ok else unittest.skip(msg)(test)
2717
2718_bind_nix_socket_error = None
2719def skip_unless_bind_unix_socket(test):
2720    """Decorator for tests requiring a functional bind() for unix sockets."""
2721    if not hasattr(socket, 'AF_UNIX'):
2722        return unittest.skip('No UNIX Sockets')(test)
2723    global _bind_nix_socket_error
2724    if _bind_nix_socket_error is None:
2725        path = TESTFN + "can_bind_unix_socket"
2726        with socket.socket(socket.AF_UNIX) as sock:
2727            try:
2728                sock.bind(path)
2729                _bind_nix_socket_error = False
2730            except OSError as e:
2731                _bind_nix_socket_error = e
2732            finally:
2733                unlink(path)
2734    if _bind_nix_socket_error:
2735        msg = 'Requires a functional unix bind(): %s' % _bind_nix_socket_error
2736        return unittest.skip(msg)(test)
2737    else:
2738        return test
2739
2740
2741def fs_is_case_insensitive(directory):
2742    """Detects if the file system for the specified directory is case-insensitive."""
2743    with tempfile.NamedTemporaryFile(dir=directory) as base:
2744        base_path = base.name
2745        case_path = base_path.upper()
2746        if case_path == base_path:
2747            case_path = base_path.lower()
2748        try:
2749            return os.path.samefile(base_path, case_path)
2750        except FileNotFoundError:
2751            return False
2752
2753
2754def detect_api_mismatch(ref_api, other_api, *, ignore=()):
2755    """Returns the set of items in ref_api not in other_api, except for a
2756    defined list of items to be ignored in this check.
2757
2758    By default this skips private attributes beginning with '_' but
2759    includes all magic methods, i.e. those starting and ending in '__'.
2760    """
2761    missing_items = set(dir(ref_api)) - set(dir(other_api))
2762    if ignore:
2763        missing_items -= set(ignore)
2764    missing_items = set(m for m in missing_items
2765                        if not m.startswith('_') or m.endswith('__'))
2766    return missing_items
2767
2768
2769def check__all__(test_case, module, name_of_module=None, extra=(),
2770                 blacklist=()):
2771    """Assert that the __all__ variable of 'module' contains all public names.
2772
2773    The module's public names (its API) are detected automatically based on
2774    whether they match the public name convention and were defined in
2775    'module'.
2776
2777    The 'name_of_module' argument can specify (as a string or tuple thereof)
2778    what module(s) an API could be defined in in order to be detected as a
2779    public API. One case for this is when 'module' imports part of its public
2780    API from other modules, possibly a C backend (like 'csv' and its '_csv').
2781
2782    The 'extra' argument can be a set of names that wouldn't otherwise be
2783    automatically detected as "public", like objects without a proper
2784    '__module__' attribute. If provided, it will be added to the
2785    automatically detected ones.
2786
2787    The 'blacklist' argument can be a set of names that must not be treated
2788    as part of the public API even though their names indicate otherwise.
2789
2790    Usage:
2791        import bar
2792        import foo
2793        import unittest
2794        from test import support
2795
2796        class MiscTestCase(unittest.TestCase):
2797            def test__all__(self):
2798                support.check__all__(self, foo)
2799
2800        class OtherTestCase(unittest.TestCase):
2801            def test__all__(self):
2802                extra = {'BAR_CONST', 'FOO_CONST'}
2803                blacklist = {'baz'}  # Undocumented name.
2804                # bar imports part of its API from _bar.
2805                support.check__all__(self, bar, ('bar', '_bar'),
2806                                     extra=extra, blacklist=blacklist)
2807
2808    """
2809
2810    if name_of_module is None:
2811        name_of_module = (module.__name__, )
2812    elif isinstance(name_of_module, str):
2813        name_of_module = (name_of_module, )
2814
2815    expected = set(extra)
2816
2817    for name in dir(module):
2818        if name.startswith('_') or name in blacklist:
2819            continue
2820        obj = getattr(module, name)
2821        if (getattr(obj, '__module__', None) in name_of_module or
2822                (not hasattr(obj, '__module__') and
2823                 not isinstance(obj, types.ModuleType))):
2824            expected.add(name)
2825    test_case.assertCountEqual(module.__all__, expected)
2826
2827
2828def suppress_msvcrt_asserts(verbose=False):
2829    try:
2830        import msvcrt
2831    except ImportError:
2832        return
2833
2834    msvcrt.SetErrorMode(msvcrt.SEM_FAILCRITICALERRORS
2835                        | msvcrt.SEM_NOALIGNMENTFAULTEXCEPT
2836                        | msvcrt.SEM_NOGPFAULTERRORBOX
2837                        | msvcrt.SEM_NOOPENFILEERRORBOX)
2838
2839    # CrtSetReportMode() is only available in debug build
2840    if hasattr(msvcrt, 'CrtSetReportMode'):
2841        for m in [msvcrt.CRT_WARN, msvcrt.CRT_ERROR, msvcrt.CRT_ASSERT]:
2842            if verbose:
2843                msvcrt.CrtSetReportMode(m, msvcrt.CRTDBG_MODE_FILE)
2844                msvcrt.CrtSetReportFile(m, msvcrt.CRTDBG_FILE_STDERR)
2845            else:
2846                msvcrt.CrtSetReportMode(m, 0)
2847
2848
2849class SuppressCrashReport:
2850    """Try to prevent a crash report from popping up.
2851
2852    On Windows, don't display the Windows Error Reporting dialog.  On UNIX,
2853    disable the creation of coredump file.
2854    """
2855    old_value = None
2856    old_modes = None
2857
2858    def __enter__(self):
2859        """On Windows, disable Windows Error Reporting dialogs using
2860        SetErrorMode() and CrtSetReportMode().
2861
2862        On UNIX, try to save the previous core file size limit, then set
2863        soft limit to 0.
2864        """
2865        if sys.platform.startswith('win'):
2866            # see http://msdn.microsoft.com/en-us/library/windows/desktop/ms680621.aspx
2867            # GetErrorMode is not available on Windows XP and Windows Server 2003,
2868            # but SetErrorMode returns the previous value, so we can use that
2869            try:
2870                import msvcrt
2871            except ImportError:
2872                return
2873
2874            self.old_value = msvcrt.SetErrorMode(msvcrt.SEM_NOGPFAULTERRORBOX)
2875
2876            msvcrt.SetErrorMode(self.old_value | msvcrt.SEM_NOGPFAULTERRORBOX)
2877
2878            # bpo-23314: Suppress assert dialogs in debug builds.
2879            # CrtSetReportMode() is only available in debug build.
2880            if hasattr(msvcrt, 'CrtSetReportMode'):
2881                self.old_modes = {}
2882                for report_type in [msvcrt.CRT_WARN,
2883                                    msvcrt.CRT_ERROR,
2884                                    msvcrt.CRT_ASSERT]:
2885                    old_mode = msvcrt.CrtSetReportMode(report_type,
2886                            msvcrt.CRTDBG_MODE_FILE)
2887                    old_file = msvcrt.CrtSetReportFile(report_type,
2888                            msvcrt.CRTDBG_FILE_STDERR)
2889                    self.old_modes[report_type] = old_mode, old_file
2890
2891        else:
2892            if resource is not None:
2893                try:
2894                    self.old_value = resource.getrlimit(resource.RLIMIT_CORE)
2895                    resource.setrlimit(resource.RLIMIT_CORE,
2896                                       (0, self.old_value[1]))
2897                except (ValueError, OSError):
2898                    pass
2899
2900            if sys.platform == 'darwin':
2901                # Check if the 'Crash Reporter' on OSX was configured
2902                # in 'Developer' mode and warn that it will get triggered
2903                # when it is.
2904                #
2905                # This assumes that this context manager is used in tests
2906                # that might trigger the next manager.
2907                cmd = ['/usr/bin/defaults', 'read',
2908                       'com.apple.CrashReporter', 'DialogType']
2909                proc = subprocess.Popen(cmd,
2910                                        stdout=subprocess.PIPE,
2911                                        stderr=subprocess.PIPE)
2912                with proc:
2913                    stdout = proc.communicate()[0]
2914                if stdout.strip() == b'developer':
2915                    print("this test triggers the Crash Reporter, "
2916                          "that is intentional", end='', flush=True)
2917
2918        return self
2919
2920    def __exit__(self, *ignore_exc):
2921        """Restore Windows ErrorMode or core file behavior to initial value."""
2922        if self.old_value is None:
2923            return
2924
2925        if sys.platform.startswith('win'):
2926            import msvcrt
2927            msvcrt.SetErrorMode(self.old_value)
2928
2929            if self.old_modes:
2930                for report_type, (old_mode, old_file) in self.old_modes.items():
2931                    msvcrt.CrtSetReportMode(report_type, old_mode)
2932                    msvcrt.CrtSetReportFile(report_type, old_file)
2933        else:
2934            if resource is not None:
2935                try:
2936                    resource.setrlimit(resource.RLIMIT_CORE, self.old_value)
2937                except (ValueError, OSError):
2938                    pass
2939
2940
2941def patch(test_instance, object_to_patch, attr_name, new_value):
2942    """Override 'object_to_patch'.'attr_name' with 'new_value'.
2943
2944    Also, add a cleanup procedure to 'test_instance' to restore
2945    'object_to_patch' value for 'attr_name'.
2946    The 'attr_name' should be a valid attribute for 'object_to_patch'.
2947
2948    """
2949    # check that 'attr_name' is a real attribute for 'object_to_patch'
2950    # will raise AttributeError if it does not exist
2951    getattr(object_to_patch, attr_name)
2952
2953    # keep a copy of the old value
2954    attr_is_local = False
2955    try:
2956        old_value = object_to_patch.__dict__[attr_name]
2957    except (AttributeError, KeyError):
2958        old_value = getattr(object_to_patch, attr_name, None)
2959    else:
2960        attr_is_local = True
2961
2962    # restore the value when the test is done
2963    def cleanup():
2964        if attr_is_local:
2965            setattr(object_to_patch, attr_name, old_value)
2966        else:
2967            delattr(object_to_patch, attr_name)
2968
2969    test_instance.addCleanup(cleanup)
2970
2971    # actually override the attribute
2972    setattr(object_to_patch, attr_name, new_value)
2973
2974
2975def run_in_subinterp(code):
2976    """
2977    Run code in a subinterpreter. Raise unittest.SkipTest if the tracemalloc
2978    module is enabled.
2979    """
2980    # Issue #10915, #15751: PyGILState_*() functions don't work with
2981    # sub-interpreters, the tracemalloc module uses these functions internally
2982    try:
2983        import tracemalloc
2984    except ImportError:
2985        pass
2986    else:
2987        if tracemalloc.is_tracing():
2988            raise unittest.SkipTest("run_in_subinterp() cannot be used "
2989                                     "if tracemalloc module is tracing "
2990                                     "memory allocations")
2991    import _testcapi
2992    return _testcapi.run_in_subinterp(code)
2993
2994
2995def check_free_after_iterating(test, iter, cls, args=()):
2996    class A(cls):
2997        def __del__(self):
2998            nonlocal done
2999            done = True
3000            try:
3001                next(it)
3002            except StopIteration:
3003                pass
3004
3005    done = False
3006    it = iter(A(*args))
3007    # Issue 26494: Shouldn't crash
3008    test.assertRaises(StopIteration, next, it)
3009    # The sequence should be deallocated just after the end of iterating
3010    gc_collect()
3011    test.assertTrue(done)
3012
3013
3014def missing_compiler_executable(cmd_names=[]):
3015    """Check if the compiler components used to build the interpreter exist.
3016
3017    Check for the existence of the compiler executables whose names are listed
3018    in 'cmd_names' or all the compiler executables when 'cmd_names' is empty
3019    and return the first missing executable or None when none is found
3020    missing.
3021
3022    """
3023    from distutils import ccompiler, sysconfig, spawn
3024    compiler = ccompiler.new_compiler()
3025    sysconfig.customize_compiler(compiler)
3026    for name in compiler.executables:
3027        if cmd_names and name not in cmd_names:
3028            continue
3029        cmd = getattr(compiler, name)
3030        if cmd_names:
3031            assert cmd is not None, \
3032                    "the '%s' executable is not configured" % name
3033        elif not cmd:
3034            continue
3035        if spawn.find_executable(cmd[0]) is None:
3036            return cmd[0]
3037
3038
3039_is_android_emulator = None
3040def setswitchinterval(interval):
3041    # Setting a very low gil interval on the Android emulator causes python
3042    # to hang (issue #26939).
3043    minimum_interval = 1e-5
3044    if is_android and interval < minimum_interval:
3045        global _is_android_emulator
3046        if _is_android_emulator is None:
3047            _is_android_emulator = (subprocess.check_output(
3048                               ['getprop', 'ro.kernel.qemu']).strip() == b'1')
3049        if _is_android_emulator:
3050            interval = minimum_interval
3051    return sys.setswitchinterval(interval)
3052
3053
3054@contextlib.contextmanager
3055def disable_faulthandler():
3056    # use sys.__stderr__ instead of sys.stderr, since regrtest replaces
3057    # sys.stderr with a StringIO which has no file descriptor when a test
3058    # is run with -W/--verbose3.
3059    fd = sys.__stderr__.fileno()
3060
3061    is_enabled = faulthandler.is_enabled()
3062    try:
3063        faulthandler.disable()
3064        yield
3065    finally:
3066        if is_enabled:
3067            faulthandler.enable(file=fd, all_threads=True)
3068
3069
3070def fd_count():
3071    """Count the number of open file descriptors.
3072    """
3073    if sys.platform.startswith(('linux', 'freebsd')):
3074        try:
3075            names = os.listdir("/proc/self/fd")
3076            # Subtract one because listdir() internally opens a file
3077            # descriptor to list the content of the /proc/self/fd/ directory.
3078            return len(names) - 1
3079        except FileNotFoundError:
3080            pass
3081
3082    MAXFD = 256
3083    if hasattr(os, 'sysconf'):
3084        try:
3085            MAXFD = os.sysconf("SC_OPEN_MAX")
3086        except OSError:
3087            pass
3088
3089    old_modes = None
3090    if sys.platform == 'win32':
3091        # bpo-25306, bpo-31009: Call CrtSetReportMode() to not kill the process
3092        # on invalid file descriptor if Python is compiled in debug mode
3093        try:
3094            import msvcrt
3095            msvcrt.CrtSetReportMode
3096        except (AttributeError, ImportError):
3097            # no msvcrt or a release build
3098            pass
3099        else:
3100            old_modes = {}
3101            for report_type in (msvcrt.CRT_WARN,
3102                                msvcrt.CRT_ERROR,
3103                                msvcrt.CRT_ASSERT):
3104                old_modes[report_type] = msvcrt.CrtSetReportMode(report_type, 0)
3105
3106    try:
3107        count = 0
3108        for fd in range(MAXFD):
3109            try:
3110                # Prefer dup() over fstat(). fstat() can require input/output
3111                # whereas dup() doesn't.
3112                fd2 = os.dup(fd)
3113            except OSError as e:
3114                if e.errno != errno.EBADF:
3115                    raise
3116            else:
3117                os.close(fd2)
3118                count += 1
3119    finally:
3120        if old_modes is not None:
3121            for report_type in (msvcrt.CRT_WARN,
3122                                msvcrt.CRT_ERROR,
3123                                msvcrt.CRT_ASSERT):
3124                msvcrt.CrtSetReportMode(report_type, old_modes[report_type])
3125
3126    return count
3127
3128
3129class SaveSignals:
3130    """
3131    Save and restore signal handlers.
3132
3133    This class is only able to save/restore signal handlers registered
3134    by the Python signal module: see bpo-13285 for "external" signal
3135    handlers.
3136    """
3137
3138    def __init__(self):
3139        import signal
3140        self.signal = signal
3141        self.signals = signal.valid_signals()
3142        # SIGKILL and SIGSTOP signals cannot be ignored nor caught
3143        for signame in ('SIGKILL', 'SIGSTOP'):
3144            try:
3145                signum = getattr(signal, signame)
3146            except AttributeError:
3147                continue
3148            self.signals.remove(signum)
3149        self.handlers = {}
3150
3151    def save(self):
3152        for signum in self.signals:
3153            handler = self.signal.getsignal(signum)
3154            if handler is None:
3155                # getsignal() returns None if a signal handler was not
3156                # registered by the Python signal module,
3157                # and the handler is not SIG_DFL nor SIG_IGN.
3158                #
3159                # Ignore the signal: we cannot restore the handler.
3160                continue
3161            self.handlers[signum] = handler
3162
3163    def restore(self):
3164        for signum, handler in self.handlers.items():
3165            self.signal.signal(signum, handler)
3166
3167
3168def with_pymalloc():
3169    import _testcapi
3170    return _testcapi.WITH_PYMALLOC
3171
3172
3173class FakePath:
3174    """Simple implementing of the path protocol.
3175    """
3176    def __init__(self, path):
3177        self.path = path
3178
3179    def __repr__(self):
3180        return f'<FakePath {self.path!r}>'
3181
3182    def __fspath__(self):
3183        if (isinstance(self.path, BaseException) or
3184            isinstance(self.path, type) and
3185                issubclass(self.path, BaseException)):
3186            raise self.path
3187        else:
3188            return self.path
3189
3190
3191class _ALWAYS_EQ:
3192    """
3193    Object that is equal to anything.
3194    """
3195    def __eq__(self, other):
3196        return True
3197    def __ne__(self, other):
3198        return False
3199
3200ALWAYS_EQ = _ALWAYS_EQ()
3201
3202@functools.total_ordering
3203class _LARGEST:
3204    """
3205    Object that is greater than anything (except itself).
3206    """
3207    def __eq__(self, other):
3208        return isinstance(other, _LARGEST)
3209    def __lt__(self, other):
3210        return False
3211
3212LARGEST = _LARGEST()
3213
3214@functools.total_ordering
3215class _SMALLEST:
3216    """
3217    Object that is less than anything (except itself).
3218    """
3219    def __eq__(self, other):
3220        return isinstance(other, _SMALLEST)
3221    def __gt__(self, other):
3222        return False
3223
3224SMALLEST = _SMALLEST()
3225
3226def maybe_get_event_loop_policy():
3227    """Return the global event loop policy if one is set, else return None."""
3228    return asyncio.events._event_loop_policy
3229
3230# Helpers for testing hashing.
3231NHASHBITS = sys.hash_info.width # number of bits in hash() result
3232assert NHASHBITS in (32, 64)
3233
3234# Return mean and sdev of number of collisions when tossing nballs balls
3235# uniformly at random into nbins bins.  By definition, the number of
3236# collisions is the number of balls minus the number of occupied bins at
3237# the end.
3238def collision_stats(nbins, nballs):
3239    n, k = nbins, nballs
3240    # prob a bin empty after k trials = (1 - 1/n)**k
3241    # mean # empty is then n * (1 - 1/n)**k
3242    # so mean # occupied is n - n * (1 - 1/n)**k
3243    # so collisions = k - (n - n*(1 - 1/n)**k)
3244    #
3245    # For the variance:
3246    # n*(n-1)*(1-2/n)**k + meanempty - meanempty**2 =
3247    # n*(n-1)*(1-2/n)**k + meanempty * (1 - meanempty)
3248    #
3249    # Massive cancellation occurs, and, e.g., for a 64-bit hash code
3250    # 1-1/2**64 rounds uselessly to 1.0.  Rather than make heroic (and
3251    # error-prone) efforts to rework the naive formulas to avoid those,
3252    # we use the `decimal` module to get plenty of extra precision.
3253    #
3254    # Note:  the exact values are straightforward to compute with
3255    # rationals, but in context that's unbearably slow, requiring
3256    # multi-million bit arithmetic.
3257    import decimal
3258    with decimal.localcontext() as ctx:
3259        bits = n.bit_length() * 2  # bits in n**2
3260        # At least that many bits will likely cancel out.
3261        # Use that many decimal digits instead.
3262        ctx.prec = max(bits, 30)
3263        dn = decimal.Decimal(n)
3264        p1empty = ((dn - 1) / dn) ** k
3265        meanempty = n * p1empty
3266        occupied = n - meanempty
3267        collisions = k - occupied
3268        var = dn*(dn-1)*((dn-2)/dn)**k + meanempty * (1 - meanempty)
3269        return float(collisions), float(var.sqrt())
3270
3271
3272class catch_unraisable_exception:
3273    """
3274    Context manager catching unraisable exception using sys.unraisablehook.
3275
3276    Storing the exception value (cm.unraisable.exc_value) creates a reference
3277    cycle. The reference cycle is broken explicitly when the context manager
3278    exits.
3279
3280    Storing the object (cm.unraisable.object) can resurrect it if it is set to
3281    an object which is being finalized. Exiting the context manager clears the
3282    stored object.
3283
3284    Usage:
3285
3286        with support.catch_unraisable_exception() as cm:
3287            # code creating an "unraisable exception"
3288            ...
3289
3290            # check the unraisable exception: use cm.unraisable
3291            ...
3292
3293        # cm.unraisable attribute no longer exists at this point
3294        # (to break a reference cycle)
3295    """
3296
3297    def __init__(self):
3298        self.unraisable = None
3299        self._old_hook = None
3300
3301    def _hook(self, unraisable):
3302        # Storing unraisable.object can resurrect an object which is being
3303        # finalized. Storing unraisable.exc_value creates a reference cycle.
3304        self.unraisable = unraisable
3305
3306    def __enter__(self):
3307        self._old_hook = sys.unraisablehook
3308        sys.unraisablehook = self._hook
3309        return self
3310
3311    def __exit__(self, *exc_info):
3312        sys.unraisablehook = self._old_hook
3313        del self.unraisable
3314
3315
3316class catch_threading_exception:
3317    """
3318    Context manager catching threading.Thread exception using
3319    threading.excepthook.
3320
3321    Attributes set when an exception is catched:
3322
3323    * exc_type
3324    * exc_value
3325    * exc_traceback
3326    * thread
3327
3328    See threading.excepthook() documentation for these attributes.
3329
3330    These attributes are deleted at the context manager exit.
3331
3332    Usage:
3333
3334        with support.catch_threading_exception() as cm:
3335            # code spawning a thread which raises an exception
3336            ...
3337
3338            # check the thread exception, use cm attributes:
3339            # exc_type, exc_value, exc_traceback, thread
3340            ...
3341
3342        # exc_type, exc_value, exc_traceback, thread attributes of cm no longer
3343        # exists at this point
3344        # (to avoid reference cycles)
3345    """
3346
3347    def __init__(self):
3348        self.exc_type = None
3349        self.exc_value = None
3350        self.exc_traceback = None
3351        self.thread = None
3352        self._old_hook = None
3353
3354    def _hook(self, args):
3355        self.exc_type = args.exc_type
3356        self.exc_value = args.exc_value
3357        self.exc_traceback = args.exc_traceback
3358        self.thread = args.thread
3359
3360    def __enter__(self):
3361        self._old_hook = threading.excepthook
3362        threading.excepthook = self._hook
3363        return self
3364
3365    def __exit__(self, *exc_info):
3366        threading.excepthook = self._old_hook
3367        del self.exc_type
3368        del self.exc_value
3369        del self.exc_traceback
3370        del self.thread
3371
3372
3373@contextlib.contextmanager
3374def save_restore_warnings_filters():
3375    old_filters = warnings.filters[:]
3376    try:
3377        yield
3378    finally:
3379        warnings.filters[:] = old_filters
3380
3381
3382def skip_if_broken_multiprocessing_synchronize():
3383    """
3384    Skip tests if the multiprocessing.synchronize module is missing, if there
3385    is no available semaphore implementation, or if creating a lock raises an
3386    OSError (on Linux only).
3387    """
3388
3389    # Skip tests if the _multiprocessing extension is missing.
3390    import_module('_multiprocessing')
3391
3392    # Skip tests if there is no available semaphore implementation:
3393    # multiprocessing.synchronize requires _multiprocessing.SemLock.
3394    synchronize = import_module('multiprocessing.synchronize')
3395
3396    if sys.platform == "linux":
3397        try:
3398            # bpo-38377: On Linux, creating a semaphore fails with OSError
3399            # if the current user does not have the permission to create
3400            # a file in /dev/shm/ directory.
3401            synchronize.Lock(ctx=None)
3402        except OSError as exc:
3403            raise unittest.SkipTest(f"broken multiprocessing SemLock: {exc!r}")
3404