1"""Supporting definitions for the Python regression tests."""
2
3if __name__ != 'test.support':
4    raise ImportError('support must be imported from the test package')
5
6import contextlib
7import functools
8import os
9import re
10import stat
11import sys
12import sysconfig
13import time
14import types
15import unittest
16import warnings
17
18from .testresult import get_test_runner
19
20
21try:
22    from _testcapi import unicode_legacy_string
23except ImportError:
24    unicode_legacy_string = None
25
26__all__ = [
27    # globals
28    "PIPE_MAX_SIZE", "verbose", "max_memuse", "use_resources", "failfast",
29    # exceptions
30    "Error", "TestFailed", "TestDidNotRun", "ResourceDenied",
31    # io
32    "record_original_stdout", "get_original_stdout", "captured_stdout",
33    "captured_stdin", "captured_stderr",
34    # unittest
35    "is_resource_enabled", "requires", "requires_freebsd_version",
36    "requires_linux_version", "requires_mac_ver",
37    "check_syntax_error",
38    "BasicTestRunner", "run_unittest", "run_doctest",
39    "requires_gzip", "requires_bz2", "requires_lzma",
40    "bigmemtest", "bigaddrspacetest", "cpython_only", "get_attribute",
41    "requires_IEEE_754", "requires_zlib",
42    "anticipate_failure", "load_package_tests", "detect_api_mismatch",
43    "check__all__", "skip_if_buggy_ucrt_strfptime",
44    "check_disallow_instantiation",
45    # sys
46    "is_jython", "is_android", "check_impl_detail", "unix_shell",
47    "setswitchinterval",
48    # network
49    "open_urlresource",
50    # processes
51    "reap_children",
52    # miscellaneous
53    "run_with_locale", "swap_item", "findfile", "infinite_recursion",
54    "swap_attr", "Matcher", "set_memlimit", "SuppressCrashReport", "sortdict",
55    "run_with_tz", "PGO", "missing_compiler_executable",
56    "ALWAYS_EQ", "NEVER_EQ", "LARGEST", "SMALLEST",
57    "LOOPBACK_TIMEOUT", "INTERNET_TIMEOUT", "SHORT_TIMEOUT", "LONG_TIMEOUT",
58    ]
59
60
61# Timeout in seconds for tests using a network server listening on the network
62# local loopback interface like 127.0.0.1.
63#
64# The timeout is long enough to prevent test failure: it takes into account
65# that the client and the server can run in different threads or even different
66# processes.
67#
68# The timeout should be long enough for connect(), recv() and send() methods
69# of socket.socket.
70LOOPBACK_TIMEOUT = 5.0
71if sys.platform == 'win32' and ' 32 bit (ARM)' in sys.version:
72    # bpo-37553: test_socket.SendfileUsingSendTest is taking longer than 2
73    # seconds on Windows ARM32 buildbot
74    LOOPBACK_TIMEOUT = 10
75elif sys.platform == 'vxworks':
76    LOOPBACK_TIMEOUT = 10
77
78# Timeout in seconds for network requests going to the internet. The timeout is
79# short enough to prevent a test to wait for too long if the internet request
80# is blocked for whatever reason.
81#
82# Usually, a timeout using INTERNET_TIMEOUT should not mark a test as failed,
83# but skip the test instead: see transient_internet().
84INTERNET_TIMEOUT = 60.0
85
86# Timeout in seconds to mark a test as failed if the test takes "too long".
87#
88# The timeout value depends on the regrtest --timeout command line option.
89#
90# If a test using SHORT_TIMEOUT starts to fail randomly on slow buildbots, use
91# LONG_TIMEOUT instead.
92SHORT_TIMEOUT = 30.0
93
94# Timeout in seconds to detect when a test hangs.
95#
96# It is long enough to reduce the risk of test failure on the slowest Python
97# buildbots. It should not be used to mark a test as failed if the test takes
98# "too long". The timeout value depends on the regrtest --timeout command line
99# option.
100LONG_TIMEOUT = 5 * 60.0
101
102# TEST_HOME_DIR refers to the top level directory of the "test" package
103# that contains Python's regression test suite
104TEST_SUPPORT_DIR = os.path.dirname(os.path.abspath(__file__))
105TEST_HOME_DIR = os.path.dirname(TEST_SUPPORT_DIR)
106STDLIB_DIR = os.path.dirname(TEST_HOME_DIR)
107REPO_ROOT = os.path.dirname(STDLIB_DIR)
108
109
110class Error(Exception):
111    """Base class for regression test exceptions."""
112
113class TestFailed(Error):
114    """Test failed."""
115
116class TestFailedWithDetails(TestFailed):
117    """Test failed."""
118    def __init__(self, msg, errors, failures):
119        self.msg = msg
120        self.errors = errors
121        self.failures = failures
122        super().__init__(msg, errors, failures)
123
124    def __str__(self):
125        return self.msg
126
127class TestDidNotRun(Error):
128    """Test did not run any subtests."""
129
130class ResourceDenied(unittest.SkipTest):
131    """Test skipped because it requested a disallowed resource.
132
133    This is raised when a test calls requires() for a resource that
134    has not be enabled.  It is used to distinguish between expected
135    and unexpected skips.
136    """
137
138def anticipate_failure(condition):
139    """Decorator to mark a test that is known to be broken in some cases
140
141       Any use of this decorator should have a comment identifying the
142       associated tracker issue.
143    """
144    if condition:
145        return unittest.expectedFailure
146    return lambda f: f
147
148def load_package_tests(pkg_dir, loader, standard_tests, pattern):
149    """Generic load_tests implementation for simple test packages.
150
151    Most packages can implement load_tests using this function as follows:
152
153       def load_tests(*args):
154           return load_package_tests(os.path.dirname(__file__), *args)
155    """
156    if pattern is None:
157        pattern = "test*"
158    top_dir = STDLIB_DIR
159    package_tests = loader.discover(start_dir=pkg_dir,
160                                    top_level_dir=top_dir,
161                                    pattern=pattern)
162    standard_tests.addTests(package_tests)
163    return standard_tests
164
165
166def get_attribute(obj, name):
167    """Get an attribute, raising SkipTest if AttributeError is raised."""
168    try:
169        attribute = getattr(obj, name)
170    except AttributeError:
171        raise unittest.SkipTest("object %r has no attribute %r" % (obj, name))
172    else:
173        return attribute
174
175verbose = 1              # Flag set to 0 by regrtest.py
176use_resources = None     # Flag set to [] by regrtest.py
177max_memuse = 0           # Disable bigmem tests (they will still be run with
178                         # small sizes, to make sure they work.)
179real_max_memuse = 0
180junit_xml_list = None    # list of testsuite XML elements
181failfast = False
182
183# _original_stdout is meant to hold stdout at the time regrtest began.
184# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
185# The point is to have some flavor of stdout the user can actually see.
186_original_stdout = None
187def record_original_stdout(stdout):
188    global _original_stdout
189    _original_stdout = stdout
190
191def get_original_stdout():
192    return _original_stdout or sys.stdout
193
194
195def _force_run(path, func, *args):
196    try:
197        return func(*args)
198    except OSError as err:
199        if verbose >= 2:
200            print('%s: %s' % (err.__class__.__name__, err))
201            print('re-run %s%r' % (func.__name__, args))
202        os.chmod(path, stat.S_IRWXU)
203        return func(*args)
204
205
206# Check whether a gui is actually available
207def _is_gui_available():
208    if hasattr(_is_gui_available, 'result'):
209        return _is_gui_available.result
210    import platform
211    reason = None
212    if sys.platform.startswith('win') and platform.win32_is_iot():
213        reason = "gui is not available on Windows IoT Core"
214    elif sys.platform.startswith('win'):
215        # if Python is running as a service (such as the buildbot service),
216        # gui interaction may be disallowed
217        import ctypes
218        import ctypes.wintypes
219        UOI_FLAGS = 1
220        WSF_VISIBLE = 0x0001
221        class USEROBJECTFLAGS(ctypes.Structure):
222            _fields_ = [("fInherit", ctypes.wintypes.BOOL),
223                        ("fReserved", ctypes.wintypes.BOOL),
224                        ("dwFlags", ctypes.wintypes.DWORD)]
225        dll = ctypes.windll.user32
226        h = dll.GetProcessWindowStation()
227        if not h:
228            raise ctypes.WinError()
229        uof = USEROBJECTFLAGS()
230        needed = ctypes.wintypes.DWORD()
231        res = dll.GetUserObjectInformationW(h,
232            UOI_FLAGS,
233            ctypes.byref(uof),
234            ctypes.sizeof(uof),
235            ctypes.byref(needed))
236        if not res:
237            raise ctypes.WinError()
238        if not bool(uof.dwFlags & WSF_VISIBLE):
239            reason = "gui not available (WSF_VISIBLE flag not set)"
240    elif sys.platform == 'darwin':
241        # The Aqua Tk implementations on OS X can abort the process if
242        # being called in an environment where a window server connection
243        # cannot be made, for instance when invoked by a buildbot or ssh
244        # process not running under the same user id as the current console
245        # user.  To avoid that, raise an exception if the window manager
246        # connection is not available.
247        from ctypes import cdll, c_int, pointer, Structure
248        from ctypes.util import find_library
249
250        app_services = cdll.LoadLibrary(find_library("ApplicationServices"))
251
252        if app_services.CGMainDisplayID() == 0:
253            reason = "gui tests cannot run without OS X window manager"
254        else:
255            class ProcessSerialNumber(Structure):
256                _fields_ = [("highLongOfPSN", c_int),
257                            ("lowLongOfPSN", c_int)]
258            psn = ProcessSerialNumber()
259            psn_p = pointer(psn)
260            if (  (app_services.GetCurrentProcess(psn_p) < 0) or
261                  (app_services.SetFrontProcess(psn_p) < 0) ):
262                reason = "cannot run without OS X gui process"
263
264    # check on every platform whether tkinter can actually do anything
265    if not reason:
266        try:
267            from tkinter import Tk
268            root = Tk()
269            root.withdraw()
270            root.update()
271            root.destroy()
272        except Exception as e:
273            err_string = str(e)
274            if len(err_string) > 50:
275                err_string = err_string[:50] + ' [...]'
276            reason = 'Tk unavailable due to {}: {}'.format(type(e).__name__,
277                                                           err_string)
278
279    _is_gui_available.reason = reason
280    _is_gui_available.result = not reason
281
282    return _is_gui_available.result
283
284def is_resource_enabled(resource):
285    """Test whether a resource is enabled.
286
287    Known resources are set by regrtest.py.  If not running under regrtest.py,
288    all resources are assumed enabled unless use_resources has been set.
289    """
290    return use_resources is None or resource in use_resources
291
292def requires(resource, msg=None):
293    """Raise ResourceDenied if the specified resource is not available."""
294    if not is_resource_enabled(resource):
295        if msg is None:
296            msg = "Use of the %r resource not enabled" % resource
297        raise ResourceDenied(msg)
298    if resource == 'gui' and not _is_gui_available():
299        raise ResourceDenied(_is_gui_available.reason)
300
301def _requires_unix_version(sysname, min_version):
302    """Decorator raising SkipTest if the OS is `sysname` and the version is less
303    than `min_version`.
304
305    For example, @_requires_unix_version('FreeBSD', (7, 2)) raises SkipTest if
306    the FreeBSD version is less than 7.2.
307    """
308    import platform
309    min_version_txt = '.'.join(map(str, min_version))
310    version_txt = platform.release().split('-', 1)[0]
311    if platform.system() == sysname:
312        try:
313            version = tuple(map(int, version_txt.split('.')))
314        except ValueError:
315            skip = False
316        else:
317            skip = version < min_version
318    else:
319        skip = False
320
321    return unittest.skipIf(
322        skip,
323        f"{sysname} version {min_version_txt} or higher required, not "
324        f"{version_txt}"
325    )
326
327
328def requires_freebsd_version(*min_version):
329    """Decorator raising SkipTest if the OS is FreeBSD and the FreeBSD version is
330    less than `min_version`.
331
332    For example, @requires_freebsd_version(7, 2) raises SkipTest if the FreeBSD
333    version is less than 7.2.
334    """
335    return _requires_unix_version('FreeBSD', min_version)
336
337def requires_linux_version(*min_version):
338    """Decorator raising SkipTest if the OS is Linux and the Linux version is
339    less than `min_version`.
340
341    For example, @requires_linux_version(2, 6, 32) raises SkipTest if the Linux
342    version is less than 2.6.32.
343    """
344    return _requires_unix_version('Linux', min_version)
345
346def requires_mac_ver(*min_version):
347    """Decorator raising SkipTest if the OS is Mac OS X and the OS X
348    version if less than min_version.
349
350    For example, @requires_mac_ver(10, 5) raises SkipTest if the OS X version
351    is lesser than 10.5.
352    """
353    def decorator(func):
354        @functools.wraps(func)
355        def wrapper(*args, **kw):
356            if sys.platform == 'darwin':
357                import platform
358                version_txt = platform.mac_ver()[0]
359                try:
360                    version = tuple(map(int, version_txt.split('.')))
361                except ValueError:
362                    pass
363                else:
364                    if version < min_version:
365                        min_version_txt = '.'.join(map(str, min_version))
366                        raise unittest.SkipTest(
367                            "Mac OS X %s or higher required, not %s"
368                            % (min_version_txt, version_txt))
369            return func(*args, **kw)
370        wrapper.min_version = min_version
371        return wrapper
372    return decorator
373
374
375def skip_if_buildbot(reason=None):
376    """Decorator raising SkipTest if running on a buildbot."""
377    if not reason:
378        reason = 'not suitable for buildbots'
379    if sys.platform == 'win32':
380        isbuildbot = os.environ.get('USERNAME') == 'Buildbot'
381    else:
382        isbuildbot = os.environ.get('USER') == 'buildbot'
383    return unittest.skipIf(isbuildbot, reason)
384
385
386def system_must_validate_cert(f):
387    """Skip the test on TLS certificate validation failures."""
388    @functools.wraps(f)
389    def dec(*args, **kwargs):
390        try:
391            f(*args, **kwargs)
392        except OSError as e:
393            if "CERTIFICATE_VERIFY_FAILED" in str(e):
394                raise unittest.SkipTest("system does not contain "
395                                        "necessary certificates")
396            raise
397    return dec
398
399# A constant likely larger than the underlying OS pipe buffer size, to
400# make writes blocking.
401# Windows limit seems to be around 512 B, and many Unix kernels have a
402# 64 KiB pipe buffer size or 16 * PAGE_SIZE: take a few megs to be sure.
403# (see issue #17835 for a discussion of this number).
404PIPE_MAX_SIZE = 4 * 1024 * 1024 + 1
405
406# A constant likely larger than the underlying OS socket buffer size, to make
407# writes blocking.
408# The socket buffer sizes can usually be tuned system-wide (e.g. through sysctl
409# on Linux), or on a per-socket basis (SO_SNDBUF/SO_RCVBUF).  See issue #18643
410# for a discussion of this number.
411SOCK_MAX_SIZE = 16 * 1024 * 1024 + 1
412
413# decorator for skipping tests on non-IEEE 754 platforms
414requires_IEEE_754 = unittest.skipUnless(
415    float.__getformat__("double").startswith("IEEE"),
416    "test requires IEEE 754 doubles")
417
418def requires_zlib(reason='requires zlib'):
419    try:
420        import zlib
421    except ImportError:
422        zlib = None
423    return unittest.skipUnless(zlib, reason)
424
425def requires_gzip(reason='requires gzip'):
426    try:
427        import gzip
428    except ImportError:
429        gzip = None
430    return unittest.skipUnless(gzip, reason)
431
432def requires_bz2(reason='requires bz2'):
433    try:
434        import bz2
435    except ImportError:
436        bz2 = None
437    return unittest.skipUnless(bz2, reason)
438
439def requires_lzma(reason='requires lzma'):
440    try:
441        import lzma
442    except ImportError:
443        lzma = None
444    return unittest.skipUnless(lzma, reason)
445
446def has_no_debug_ranges():
447    try:
448        import _testinternalcapi
449    except ImportError:
450        raise unittest.SkipTest("_testinternalcapi required")
451    config = _testinternalcapi.get_config()
452    return not bool(config['code_debug_ranges'])
453
454def requires_debug_ranges(reason='requires co_positions / debug_ranges'):
455    return unittest.skipIf(has_no_debug_ranges(), reason)
456
457requires_legacy_unicode_capi = unittest.skipUnless(unicode_legacy_string,
458                        'requires legacy Unicode C API')
459
460is_jython = sys.platform.startswith('java')
461
462is_android = hasattr(sys, 'getandroidapilevel')
463
464if sys.platform not in ('win32', 'vxworks'):
465    unix_shell = '/system/bin/sh' if is_android else '/bin/sh'
466else:
467    unix_shell = None
468
469# Define the URL of a dedicated HTTP server for the network tests.
470# The URL must use clear-text HTTP: no redirection to encrypted HTTPS.
471TEST_HTTP_URL = "http://www.pythontest.net"
472
473# Set by libregrtest/main.py so we can skip tests that are not
474# useful for PGO
475PGO = False
476
477# Set by libregrtest/main.py if we are running the extended (time consuming)
478# PGO task.  If this is True, PGO is also True.
479PGO_EXTENDED = False
480
481# TEST_DATA_DIR is used as a target download location for remote resources
482TEST_DATA_DIR = os.path.join(TEST_HOME_DIR, "data")
483
484
485def darwin_malloc_err_warning(test_name):
486    """Assure user that loud errors generated by macOS libc's malloc are
487    expected."""
488    if sys.platform != 'darwin':
489        return
490
491    import shutil
492    msg = ' NOTICE '
493    detail = (f'{test_name} may generate "malloc can\'t allocate region"\n'
494              'warnings on macOS systems. This behavior is known. Do not\n'
495              'report a bug unless tests are also failing. See bpo-40928.')
496
497    padding, _ = shutil.get_terminal_size()
498    print(msg.center(padding, '-'))
499    print(detail)
500    print('-' * padding)
501
502
503def findfile(filename, subdir=None):
504    """Try to find a file on sys.path or in the test directory.  If it is not
505    found the argument passed to the function is returned (this does not
506    necessarily signal failure; could still be the legitimate path).
507
508    Setting *subdir* indicates a relative path to use to find the file
509    rather than looking directly in the path directories.
510    """
511    if os.path.isabs(filename):
512        return filename
513    if subdir is not None:
514        filename = os.path.join(subdir, filename)
515    path = [TEST_HOME_DIR] + sys.path
516    for dn in path:
517        fn = os.path.join(dn, filename)
518        if os.path.exists(fn): return fn
519    return filename
520
521
522def sortdict(dict):
523    "Like repr(dict), but in sorted order."
524    items = sorted(dict.items())
525    reprpairs = ["%r: %r" % pair for pair in items]
526    withcommas = ", ".join(reprpairs)
527    return "{%s}" % withcommas
528
529def check_syntax_error(testcase, statement, errtext='', *, lineno=None, offset=None):
530    with testcase.assertRaisesRegex(SyntaxError, errtext) as cm:
531        compile(statement, '<test string>', 'exec')
532    err = cm.exception
533    testcase.assertIsNotNone(err.lineno)
534    if lineno is not None:
535        testcase.assertEqual(err.lineno, lineno)
536    testcase.assertIsNotNone(err.offset)
537    if offset is not None:
538        testcase.assertEqual(err.offset, offset)
539
540
541def open_urlresource(url, *args, **kw):
542    import urllib.request, urllib.parse
543    from .os_helper import unlink
544    try:
545        import gzip
546    except ImportError:
547        gzip = None
548
549    check = kw.pop('check', None)
550
551    filename = urllib.parse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
552
553    fn = os.path.join(TEST_DATA_DIR, filename)
554
555    def check_valid_file(fn):
556        f = open(fn, *args, **kw)
557        if check is None:
558            return f
559        elif check(f):
560            f.seek(0)
561            return f
562        f.close()
563
564    if os.path.exists(fn):
565        f = check_valid_file(fn)
566        if f is not None:
567            return f
568        unlink(fn)
569
570    # Verify the requirement before downloading the file
571    requires('urlfetch')
572
573    if verbose:
574        print('\tfetching %s ...' % url, file=get_original_stdout())
575    opener = urllib.request.build_opener()
576    if gzip:
577        opener.addheaders.append(('Accept-Encoding', 'gzip'))
578    f = opener.open(url, timeout=INTERNET_TIMEOUT)
579    if gzip and f.headers.get('Content-Encoding') == 'gzip':
580        f = gzip.GzipFile(fileobj=f)
581    try:
582        with open(fn, "wb") as out:
583            s = f.read()
584            while s:
585                out.write(s)
586                s = f.read()
587    finally:
588        f.close()
589
590    f = check_valid_file(fn)
591    if f is not None:
592        return f
593    raise TestFailed('invalid resource %r' % fn)
594
595
596@contextlib.contextmanager
597def captured_output(stream_name):
598    """Return a context manager used by captured_stdout/stdin/stderr
599    that temporarily replaces the sys stream *stream_name* with a StringIO."""
600    import io
601    orig_stdout = getattr(sys, stream_name)
602    setattr(sys, stream_name, io.StringIO())
603    try:
604        yield getattr(sys, stream_name)
605    finally:
606        setattr(sys, stream_name, orig_stdout)
607
608def captured_stdout():
609    """Capture the output of sys.stdout:
610
611       with captured_stdout() as stdout:
612           print("hello")
613       self.assertEqual(stdout.getvalue(), "hello\\n")
614    """
615    return captured_output("stdout")
616
617def captured_stderr():
618    """Capture the output of sys.stderr:
619
620       with captured_stderr() as stderr:
621           print("hello", file=sys.stderr)
622       self.assertEqual(stderr.getvalue(), "hello\\n")
623    """
624    return captured_output("stderr")
625
626def captured_stdin():
627    """Capture the input to sys.stdin:
628
629       with captured_stdin() as stdin:
630           stdin.write('hello\\n')
631           stdin.seek(0)
632           # call test code that consumes from sys.stdin
633           captured = input()
634       self.assertEqual(captured, "hello")
635    """
636    return captured_output("stdin")
637
638
639def gc_collect():
640    """Force as many objects as possible to be collected.
641
642    In non-CPython implementations of Python, this is needed because timely
643    deallocation is not guaranteed by the garbage collector.  (Even in CPython
644    this can be the case in case of reference cycles.)  This means that __del__
645    methods may be called later than expected and weakrefs may remain alive for
646    longer than expected.  This function tries its best to force all garbage
647    objects to disappear.
648    """
649    import gc
650    gc.collect()
651    if is_jython:
652        time.sleep(0.1)
653    gc.collect()
654    gc.collect()
655
656@contextlib.contextmanager
657def disable_gc():
658    import gc
659    have_gc = gc.isenabled()
660    gc.disable()
661    try:
662        yield
663    finally:
664        if have_gc:
665            gc.enable()
666
667
668def python_is_optimized():
669    """Find if Python was built with optimizations."""
670    cflags = sysconfig.get_config_var('PY_CFLAGS') or ''
671    final_opt = ""
672    for opt in cflags.split():
673        if opt.startswith('-O'):
674            final_opt = opt
675    return final_opt not in ('', '-O0', '-Og')
676
677
678_header = 'nP'
679_align = '0n'
680if hasattr(sys, "getobjects"):
681    _header = '2P' + _header
682    _align = '0P'
683_vheader = _header + 'n'
684
685def calcobjsize(fmt):
686    import struct
687    return struct.calcsize(_header + fmt + _align)
688
689def calcvobjsize(fmt):
690    import struct
691    return struct.calcsize(_vheader + fmt + _align)
692
693
694_TPFLAGS_HAVE_GC = 1<<14
695_TPFLAGS_HEAPTYPE = 1<<9
696
697def check_sizeof(test, o, size):
698    try:
699        import _testinternalcapi
700    except ImportError:
701        raise unittest.SkipTest("_testinternalcapi required")
702    result = sys.getsizeof(o)
703    # add GC header size
704    if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\
705        ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))):
706        size += _testinternalcapi.SIZEOF_PYGC_HEAD
707    msg = 'wrong size for %s: got %d, expected %d' \
708            % (type(o), result, size)
709    test.assertEqual(result, size, msg)
710
711#=======================================================================
712# Decorator for running a function in a different locale, correctly resetting
713# it afterwards.
714
715@contextlib.contextmanager
716def run_with_locale(catstr, *locales):
717            try:
718                import locale
719                category = getattr(locale, catstr)
720                orig_locale = locale.setlocale(category)
721            except AttributeError:
722                # if the test author gives us an invalid category string
723                raise
724            except:
725                # cannot retrieve original locale, so do nothing
726                locale = orig_locale = None
727            else:
728                for loc in locales:
729                    try:
730                        locale.setlocale(category, loc)
731                        break
732                    except:
733                        pass
734
735            try:
736                yield
737            finally:
738                if locale and orig_locale:
739                    locale.setlocale(category, orig_locale)
740
741#=======================================================================
742# Decorator for running a function in a specific timezone, correctly
743# resetting it afterwards.
744
745def run_with_tz(tz):
746    def decorator(func):
747        def inner(*args, **kwds):
748            try:
749                tzset = time.tzset
750            except AttributeError:
751                raise unittest.SkipTest("tzset required")
752            if 'TZ' in os.environ:
753                orig_tz = os.environ['TZ']
754            else:
755                orig_tz = None
756            os.environ['TZ'] = tz
757            tzset()
758
759            # now run the function, resetting the tz on exceptions
760            try:
761                return func(*args, **kwds)
762            finally:
763                if orig_tz is None:
764                    del os.environ['TZ']
765                else:
766                    os.environ['TZ'] = orig_tz
767                time.tzset()
768
769        inner.__name__ = func.__name__
770        inner.__doc__ = func.__doc__
771        return inner
772    return decorator
773
774#=======================================================================
775# Big-memory-test support. Separate from 'resources' because memory use
776# should be configurable.
777
778# Some handy shorthands. Note that these are used for byte-limits as well
779# as size-limits, in the various bigmem tests
780_1M = 1024*1024
781_1G = 1024 * _1M
782_2G = 2 * _1G
783_4G = 4 * _1G
784
785MAX_Py_ssize_t = sys.maxsize
786
787def set_memlimit(limit):
788    global max_memuse
789    global real_max_memuse
790    sizes = {
791        'k': 1024,
792        'm': _1M,
793        'g': _1G,
794        't': 1024*_1G,
795    }
796    m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
797                 re.IGNORECASE | re.VERBOSE)
798    if m is None:
799        raise ValueError('Invalid memory limit %r' % (limit,))
800    memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
801    real_max_memuse = memlimit
802    if memlimit > MAX_Py_ssize_t:
803        memlimit = MAX_Py_ssize_t
804    if memlimit < _2G - 1:
805        raise ValueError('Memory limit %r too low to be useful' % (limit,))
806    max_memuse = memlimit
807
808class _MemoryWatchdog:
809    """An object which periodically watches the process' memory consumption
810    and prints it out.
811    """
812
813    def __init__(self):
814        self.procfile = '/proc/{pid}/statm'.format(pid=os.getpid())
815        self.started = False
816
817    def start(self):
818        import warnings
819        try:
820            f = open(self.procfile, 'r')
821        except OSError as e:
822            warnings.warn('/proc not available for stats: {}'.format(e),
823                          RuntimeWarning)
824            sys.stderr.flush()
825            return
826
827        import subprocess
828        with f:
829            watchdog_script = findfile("memory_watchdog.py")
830            self.mem_watchdog = subprocess.Popen([sys.executable, watchdog_script],
831                                                 stdin=f,
832                                                 stderr=subprocess.DEVNULL)
833        self.started = True
834
835    def stop(self):
836        if self.started:
837            self.mem_watchdog.terminate()
838            self.mem_watchdog.wait()
839
840
841def bigmemtest(size, memuse, dry_run=True):
842    """Decorator for bigmem tests.
843
844    'size' is a requested size for the test (in arbitrary, test-interpreted
845    units.) 'memuse' is the number of bytes per unit for the test, or a good
846    estimate of it. For example, a test that needs two byte buffers, of 4 GiB
847    each, could be decorated with @bigmemtest(size=_4G, memuse=2).
848
849    The 'size' argument is normally passed to the decorated test method as an
850    extra argument. If 'dry_run' is true, the value passed to the test method
851    may be less than the requested value. If 'dry_run' is false, it means the
852    test doesn't support dummy runs when -M is not specified.
853    """
854    def decorator(f):
855        def wrapper(self):
856            size = wrapper.size
857            memuse = wrapper.memuse
858            if not real_max_memuse:
859                maxsize = 5147
860            else:
861                maxsize = size
862
863            if ((real_max_memuse or not dry_run)
864                and real_max_memuse < maxsize * memuse):
865                raise unittest.SkipTest(
866                    "not enough memory: %.1fG minimum needed"
867                    % (size * memuse / (1024 ** 3)))
868
869            if real_max_memuse and verbose:
870                print()
871                print(" ... expected peak memory use: {peak:.1f}G"
872                      .format(peak=size * memuse / (1024 ** 3)))
873                watchdog = _MemoryWatchdog()
874                watchdog.start()
875            else:
876                watchdog = None
877
878            try:
879                return f(self, maxsize)
880            finally:
881                if watchdog:
882                    watchdog.stop()
883
884        wrapper.size = size
885        wrapper.memuse = memuse
886        return wrapper
887    return decorator
888
889def bigaddrspacetest(f):
890    """Decorator for tests that fill the address space."""
891    def wrapper(self):
892        if max_memuse < MAX_Py_ssize_t:
893            if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31:
894                raise unittest.SkipTest(
895                    "not enough memory: try a 32-bit build instead")
896            else:
897                raise unittest.SkipTest(
898                    "not enough memory: %.1fG minimum needed"
899                    % (MAX_Py_ssize_t / (1024 ** 3)))
900        else:
901            return f(self)
902    return wrapper
903
904#=======================================================================
905# unittest integration.
906
907class BasicTestRunner:
908    def run(self, test):
909        result = unittest.TestResult()
910        test(result)
911        return result
912
913def _id(obj):
914    return obj
915
916def requires_resource(resource):
917    if resource == 'gui' and not _is_gui_available():
918        return unittest.skip(_is_gui_available.reason)
919    if is_resource_enabled(resource):
920        return _id
921    else:
922        return unittest.skip("resource {0!r} is not enabled".format(resource))
923
924def cpython_only(test):
925    """
926    Decorator for tests only applicable on CPython.
927    """
928    return impl_detail(cpython=True)(test)
929
930def impl_detail(msg=None, **guards):
931    if check_impl_detail(**guards):
932        return _id
933    if msg is None:
934        guardnames, default = _parse_guards(guards)
935        if default:
936            msg = "implementation detail not available on {0}"
937        else:
938            msg = "implementation detail specific to {0}"
939        guardnames = sorted(guardnames.keys())
940        msg = msg.format(' or '.join(guardnames))
941    return unittest.skip(msg)
942
943def _parse_guards(guards):
944    # Returns a tuple ({platform_name: run_me}, default_value)
945    if not guards:
946        return ({'cpython': True}, False)
947    is_true = list(guards.values())[0]
948    assert list(guards.values()) == [is_true] * len(guards)   # all True or all False
949    return (guards, not is_true)
950
951# Use the following check to guard CPython's implementation-specific tests --
952# or to run them only on the implementation(s) guarded by the arguments.
953def check_impl_detail(**guards):
954    """This function returns True or False depending on the host platform.
955       Examples:
956          if check_impl_detail():               # only on CPython (default)
957          if check_impl_detail(jython=True):    # only on Jython
958          if check_impl_detail(cpython=False):  # everywhere except on CPython
959    """
960    guards, default = _parse_guards(guards)
961    return guards.get(sys.implementation.name, default)
962
963
964def no_tracing(func):
965    """Decorator to temporarily turn off tracing for the duration of a test."""
966    if not hasattr(sys, 'gettrace'):
967        return func
968    else:
969        @functools.wraps(func)
970        def wrapper(*args, **kwargs):
971            original_trace = sys.gettrace()
972            try:
973                sys.settrace(None)
974                return func(*args, **kwargs)
975            finally:
976                sys.settrace(original_trace)
977        return wrapper
978
979
980def refcount_test(test):
981    """Decorator for tests which involve reference counting.
982
983    To start, the decorator does not run the test if is not run by CPython.
984    After that, any trace function is unset during the test to prevent
985    unexpected refcounts caused by the trace function.
986
987    """
988    return no_tracing(cpython_only(test))
989
990
991def _filter_suite(suite, pred):
992    """Recursively filter test cases in a suite based on a predicate."""
993    newtests = []
994    for test in suite._tests:
995        if isinstance(test, unittest.TestSuite):
996            _filter_suite(test, pred)
997            newtests.append(test)
998        else:
999            if pred(test):
1000                newtests.append(test)
1001    suite._tests = newtests
1002
1003def _run_suite(suite):
1004    """Run tests from a unittest.TestSuite-derived class."""
1005    runner = get_test_runner(sys.stdout,
1006                             verbosity=verbose,
1007                             capture_output=(junit_xml_list is not None))
1008
1009    result = runner.run(suite)
1010
1011    if junit_xml_list is not None:
1012        junit_xml_list.append(result.get_xml_element())
1013
1014    if not result.testsRun and not result.skipped:
1015        raise TestDidNotRun
1016    if not result.wasSuccessful():
1017        if len(result.errors) == 1 and not result.failures:
1018            err = result.errors[0][1]
1019        elif len(result.failures) == 1 and not result.errors:
1020            err = result.failures[0][1]
1021        else:
1022            err = "multiple errors occurred"
1023            if not verbose: err += "; run in verbose mode for details"
1024        errors = [(str(tc), exc_str) for tc, exc_str in result.errors]
1025        failures = [(str(tc), exc_str) for tc, exc_str in result.failures]
1026        raise TestFailedWithDetails(err, errors, failures)
1027
1028
1029# By default, don't filter tests
1030_match_test_func = None
1031
1032_accept_test_patterns = None
1033_ignore_test_patterns = None
1034
1035
1036def match_test(test):
1037    # Function used by support.run_unittest() and regrtest --list-cases
1038    if _match_test_func is None:
1039        return True
1040    else:
1041        return _match_test_func(test.id())
1042
1043
1044def _is_full_match_test(pattern):
1045    # If a pattern contains at least one dot, it's considered
1046    # as a full test identifier.
1047    # Example: 'test.test_os.FileTests.test_access'.
1048    #
1049    # ignore patterns which contain fnmatch patterns: '*', '?', '[...]'
1050    # or '[!...]'. For example, ignore 'test_access*'.
1051    return ('.' in pattern) and (not re.search(r'[?*\[\]]', pattern))
1052
1053
1054def set_match_tests(accept_patterns=None, ignore_patterns=None):
1055    global _match_test_func, _accept_test_patterns, _ignore_test_patterns
1056
1057
1058    if accept_patterns is None:
1059        accept_patterns = ()
1060    if ignore_patterns is None:
1061        ignore_patterns = ()
1062
1063    accept_func = ignore_func = None
1064
1065    if accept_patterns != _accept_test_patterns:
1066        accept_patterns, accept_func = _compile_match_function(accept_patterns)
1067    if ignore_patterns != _ignore_test_patterns:
1068        ignore_patterns, ignore_func = _compile_match_function(ignore_patterns)
1069
1070    # Create a copy since patterns can be mutable and so modified later
1071    _accept_test_patterns = tuple(accept_patterns)
1072    _ignore_test_patterns = tuple(ignore_patterns)
1073
1074    if accept_func is not None or ignore_func is not None:
1075        def match_function(test_id):
1076            accept = True
1077            ignore = False
1078            if accept_func:
1079                accept = accept_func(test_id)
1080            if ignore_func:
1081                ignore = ignore_func(test_id)
1082            return accept and not ignore
1083
1084        _match_test_func = match_function
1085
1086
1087def _compile_match_function(patterns):
1088    if not patterns:
1089        func = None
1090        # set_match_tests(None) behaves as set_match_tests(())
1091        patterns = ()
1092    elif all(map(_is_full_match_test, patterns)):
1093        # Simple case: all patterns are full test identifier.
1094        # The test.bisect_cmd utility only uses such full test identifiers.
1095        func = set(patterns).__contains__
1096    else:
1097        import fnmatch
1098        regex = '|'.join(map(fnmatch.translate, patterns))
1099        # The search *is* case sensitive on purpose:
1100        # don't use flags=re.IGNORECASE
1101        regex_match = re.compile(regex).match
1102
1103        def match_test_regex(test_id):
1104            if regex_match(test_id):
1105                # The regex matches the whole identifier, for example
1106                # 'test.test_os.FileTests.test_access'.
1107                return True
1108            else:
1109                # Try to match parts of the test identifier.
1110                # For example, split 'test.test_os.FileTests.test_access'
1111                # into: 'test', 'test_os', 'FileTests' and 'test_access'.
1112                return any(map(regex_match, test_id.split(".")))
1113
1114        func = match_test_regex
1115
1116    return patterns, func
1117
1118
1119def run_unittest(*classes):
1120    """Run tests from unittest.TestCase-derived classes."""
1121    valid_types = (unittest.TestSuite, unittest.TestCase)
1122    loader = unittest.TestLoader()
1123    suite = unittest.TestSuite()
1124    for cls in classes:
1125        if isinstance(cls, str):
1126            if cls in sys.modules:
1127                suite.addTest(loader.loadTestsFromModule(sys.modules[cls]))
1128            else:
1129                raise ValueError("str arguments must be keys in sys.modules")
1130        elif isinstance(cls, valid_types):
1131            suite.addTest(cls)
1132        else:
1133            suite.addTest(loader.loadTestsFromTestCase(cls))
1134    _filter_suite(suite, match_test)
1135    _run_suite(suite)
1136
1137#=======================================================================
1138# Check for the presence of docstrings.
1139
1140# Rather than trying to enumerate all the cases where docstrings may be
1141# disabled, we just check for that directly
1142
1143def _check_docstrings():
1144    """Just used to check if docstrings are enabled"""
1145
1146MISSING_C_DOCSTRINGS = (check_impl_detail() and
1147                        sys.platform != 'win32' and
1148                        not sysconfig.get_config_var('WITH_DOC_STRINGS'))
1149
1150HAVE_DOCSTRINGS = (_check_docstrings.__doc__ is not None and
1151                   not MISSING_C_DOCSTRINGS)
1152
1153requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS,
1154                                          "test requires docstrings")
1155
1156
1157#=======================================================================
1158# doctest driver.
1159
1160def run_doctest(module, verbosity=None, optionflags=0):
1161    """Run doctest on the given module.  Return (#failures, #tests).
1162
1163    If optional argument verbosity is not specified (or is None), pass
1164    support's belief about verbosity on to doctest.  Else doctest's
1165    usual behavior is used (it searches sys.argv for -v).
1166    """
1167
1168    import doctest
1169
1170    if verbosity is None:
1171        verbosity = verbose
1172    else:
1173        verbosity = None
1174
1175    f, t = doctest.testmod(module, verbose=verbosity, optionflags=optionflags)
1176    if f:
1177        raise TestFailed("%d of %d doctests failed" % (f, t))
1178    if verbose:
1179        print('doctest (%s) ... %d tests with zero failures' %
1180              (module.__name__, t))
1181    return f, t
1182
1183
1184#=======================================================================
1185# Support for saving and restoring the imported modules.
1186
1187def flush_std_streams():
1188    if sys.stdout is not None:
1189        sys.stdout.flush()
1190    if sys.stderr is not None:
1191        sys.stderr.flush()
1192
1193
1194def print_warning(msg):
1195    # bpo-45410: Explicitly flush stdout to keep logs in order
1196    flush_std_streams()
1197    stream = print_warning.orig_stderr
1198    for line in msg.splitlines():
1199        print(f"Warning -- {line}", file=stream)
1200    stream.flush()
1201
1202# bpo-39983: Store the original sys.stderr at Python startup to be able to
1203# log warnings even if sys.stderr is captured temporarily by a test.
1204print_warning.orig_stderr = sys.stderr
1205
1206
1207# Flag used by saved_test_environment of test.libregrtest.save_env,
1208# to check if a test modified the environment. The flag should be set to False
1209# before running a new test.
1210#
1211# For example, threading_helper.threading_cleanup() sets the flag is the function fails
1212# to cleanup threads.
1213environment_altered = False
1214
1215def reap_children():
1216    """Use this function at the end of test_main() whenever sub-processes
1217    are started.  This will help ensure that no extra children (zombies)
1218    stick around to hog resources and create problems when looking
1219    for refleaks.
1220    """
1221    global environment_altered
1222
1223    # Need os.waitpid(-1, os.WNOHANG): Windows is not supported
1224    if not (hasattr(os, 'waitpid') and hasattr(os, 'WNOHANG')):
1225        return
1226
1227    # Reap all our dead child processes so we don't leave zombies around.
1228    # These hog resources and might be causing some of the buildbots to die.
1229    while True:
1230        try:
1231            # Read the exit status of any child process which already completed
1232            pid, status = os.waitpid(-1, os.WNOHANG)
1233        except OSError:
1234            break
1235
1236        if pid == 0:
1237            break
1238
1239        print_warning(f"reap_children() reaped child process {pid}")
1240        environment_altered = True
1241
1242
1243@contextlib.contextmanager
1244def swap_attr(obj, attr, new_val):
1245    """Temporary swap out an attribute with a new object.
1246
1247    Usage:
1248        with swap_attr(obj, "attr", 5):
1249            ...
1250
1251        This will set obj.attr to 5 for the duration of the with: block,
1252        restoring the old value at the end of the block. If `attr` doesn't
1253        exist on `obj`, it will be created and then deleted at the end of the
1254        block.
1255
1256        The old value (or None if it doesn't exist) will be assigned to the
1257        target of the "as" clause, if there is one.
1258    """
1259    if hasattr(obj, attr):
1260        real_val = getattr(obj, attr)
1261        setattr(obj, attr, new_val)
1262        try:
1263            yield real_val
1264        finally:
1265            setattr(obj, attr, real_val)
1266    else:
1267        setattr(obj, attr, new_val)
1268        try:
1269            yield
1270        finally:
1271            if hasattr(obj, attr):
1272                delattr(obj, attr)
1273
1274@contextlib.contextmanager
1275def swap_item(obj, item, new_val):
1276    """Temporary swap out an item with a new object.
1277
1278    Usage:
1279        with swap_item(obj, "item", 5):
1280            ...
1281
1282        This will set obj["item"] to 5 for the duration of the with: block,
1283        restoring the old value at the end of the block. If `item` doesn't
1284        exist on `obj`, it will be created and then deleted at the end of the
1285        block.
1286
1287        The old value (or None if it doesn't exist) will be assigned to the
1288        target of the "as" clause, if there is one.
1289    """
1290    if item in obj:
1291        real_val = obj[item]
1292        obj[item] = new_val
1293        try:
1294            yield real_val
1295        finally:
1296            obj[item] = real_val
1297    else:
1298        obj[item] = new_val
1299        try:
1300            yield
1301        finally:
1302            if item in obj:
1303                del obj[item]
1304
1305def args_from_interpreter_flags():
1306    """Return a list of command-line arguments reproducing the current
1307    settings in sys.flags and sys.warnoptions."""
1308    import subprocess
1309    return subprocess._args_from_interpreter_flags()
1310
1311def optim_args_from_interpreter_flags():
1312    """Return a list of command-line arguments reproducing the current
1313    optimization settings in sys.flags."""
1314    import subprocess
1315    return subprocess._optim_args_from_interpreter_flags()
1316
1317
1318class Matcher(object):
1319
1320    _partial_matches = ('msg', 'message')
1321
1322    def matches(self, d, **kwargs):
1323        """
1324        Try to match a single dict with the supplied arguments.
1325
1326        Keys whose values are strings and which are in self._partial_matches
1327        will be checked for partial (i.e. substring) matches. You can extend
1328        this scheme to (for example) do regular expression matching, etc.
1329        """
1330        result = True
1331        for k in kwargs:
1332            v = kwargs[k]
1333            dv = d.get(k)
1334            if not self.match_value(k, dv, v):
1335                result = False
1336                break
1337        return result
1338
1339    def match_value(self, k, dv, v):
1340        """
1341        Try to match a single stored value (dv) with a supplied value (v).
1342        """
1343        if type(v) != type(dv):
1344            result = False
1345        elif type(dv) is not str or k not in self._partial_matches:
1346            result = (v == dv)
1347        else:
1348            result = dv.find(v) >= 0
1349        return result
1350
1351
1352_buggy_ucrt = None
1353def skip_if_buggy_ucrt_strfptime(test):
1354    """
1355    Skip decorator for tests that use buggy strptime/strftime
1356
1357    If the UCRT bugs are present time.localtime().tm_zone will be
1358    an empty string, otherwise we assume the UCRT bugs are fixed
1359
1360    See bpo-37552 [Windows] strptime/strftime return invalid
1361    results with UCRT version 17763.615
1362    """
1363    import locale
1364    global _buggy_ucrt
1365    if _buggy_ucrt is None:
1366        if(sys.platform == 'win32' and
1367                locale.getdefaultlocale()[1]  == 'cp65001' and
1368                time.localtime().tm_zone == ''):
1369            _buggy_ucrt = True
1370        else:
1371            _buggy_ucrt = False
1372    return unittest.skip("buggy MSVC UCRT strptime/strftime")(test) if _buggy_ucrt else test
1373
1374class PythonSymlink:
1375    """Creates a symlink for the current Python executable"""
1376    def __init__(self, link=None):
1377        from .os_helper import TESTFN
1378
1379        self.link = link or os.path.abspath(TESTFN)
1380        self._linked = []
1381        self.real = os.path.realpath(sys.executable)
1382        self._also_link = []
1383
1384        self._env = None
1385
1386        self._platform_specific()
1387
1388    def _platform_specific(self):
1389        pass
1390
1391    if sys.platform == "win32":
1392        def _platform_specific(self):
1393            import glob
1394            import _winapi
1395
1396            if os.path.lexists(self.real) and not os.path.exists(self.real):
1397                # App symlink appears to not exist, but we want the
1398                # real executable here anyway
1399                self.real = _winapi.GetModuleFileName(0)
1400
1401            dll = _winapi.GetModuleFileName(sys.dllhandle)
1402            src_dir = os.path.dirname(dll)
1403            dest_dir = os.path.dirname(self.link)
1404            self._also_link.append((
1405                dll,
1406                os.path.join(dest_dir, os.path.basename(dll))
1407            ))
1408            for runtime in glob.glob(os.path.join(glob.escape(src_dir), "vcruntime*.dll")):
1409                self._also_link.append((
1410                    runtime,
1411                    os.path.join(dest_dir, os.path.basename(runtime))
1412                ))
1413
1414            self._env = {k.upper(): os.getenv(k) for k in os.environ}
1415            self._env["PYTHONHOME"] = os.path.dirname(self.real)
1416            if sysconfig.is_python_build(True):
1417                self._env["PYTHONPATH"] = STDLIB_DIR
1418
1419    def __enter__(self):
1420        os.symlink(self.real, self.link)
1421        self._linked.append(self.link)
1422        for real, link in self._also_link:
1423            os.symlink(real, link)
1424            self._linked.append(link)
1425        return self
1426
1427    def __exit__(self, exc_type, exc_value, exc_tb):
1428        for link in self._linked:
1429            try:
1430                os.remove(link)
1431            except IOError as ex:
1432                if verbose:
1433                    print("failed to clean up {}: {}".format(link, ex))
1434
1435    def _call(self, python, args, env, returncode):
1436        import subprocess
1437        cmd = [python, *args]
1438        p = subprocess.Popen(cmd, stdout=subprocess.PIPE,
1439                             stderr=subprocess.PIPE, env=env)
1440        r = p.communicate()
1441        if p.returncode != returncode:
1442            if verbose:
1443                print(repr(r[0]))
1444                print(repr(r[1]), file=sys.stderr)
1445            raise RuntimeError(
1446                'unexpected return code: {0} (0x{0:08X})'.format(p.returncode))
1447        return r
1448
1449    def call_real(self, *args, returncode=0):
1450        return self._call(self.real, args, None, returncode)
1451
1452    def call_link(self, *args, returncode=0):
1453        return self._call(self.link, args, self._env, returncode)
1454
1455
1456def skip_if_pgo_task(test):
1457    """Skip decorator for tests not run in (non-extended) PGO task"""
1458    ok = not PGO or PGO_EXTENDED
1459    msg = "Not run for (non-extended) PGO task"
1460    return test if ok else unittest.skip(msg)(test)
1461
1462
1463def detect_api_mismatch(ref_api, other_api, *, ignore=()):
1464    """Returns the set of items in ref_api not in other_api, except for a
1465    defined list of items to be ignored in this check.
1466
1467    By default this skips private attributes beginning with '_' but
1468    includes all magic methods, i.e. those starting and ending in '__'.
1469    """
1470    missing_items = set(dir(ref_api)) - set(dir(other_api))
1471    if ignore:
1472        missing_items -= set(ignore)
1473    missing_items = set(m for m in missing_items
1474                        if not m.startswith('_') or m.endswith('__'))
1475    return missing_items
1476
1477
1478def check__all__(test_case, module, name_of_module=None, extra=(),
1479                 not_exported=()):
1480    """Assert that the __all__ variable of 'module' contains all public names.
1481
1482    The module's public names (its API) are detected automatically based on
1483    whether they match the public name convention and were defined in
1484    'module'.
1485
1486    The 'name_of_module' argument can specify (as a string or tuple thereof)
1487    what module(s) an API could be defined in in order to be detected as a
1488    public API. One case for this is when 'module' imports part of its public
1489    API from other modules, possibly a C backend (like 'csv' and its '_csv').
1490
1491    The 'extra' argument can be a set of names that wouldn't otherwise be
1492    automatically detected as "public", like objects without a proper
1493    '__module__' attribute. If provided, it will be added to the
1494    automatically detected ones.
1495
1496    The 'not_exported' argument can be a set of names that must not be treated
1497    as part of the public API even though their names indicate otherwise.
1498
1499    Usage:
1500        import bar
1501        import foo
1502        import unittest
1503        from test import support
1504
1505        class MiscTestCase(unittest.TestCase):
1506            def test__all__(self):
1507                support.check__all__(self, foo)
1508
1509        class OtherTestCase(unittest.TestCase):
1510            def test__all__(self):
1511                extra = {'BAR_CONST', 'FOO_CONST'}
1512                not_exported = {'baz'}  # Undocumented name.
1513                # bar imports part of its API from _bar.
1514                support.check__all__(self, bar, ('bar', '_bar'),
1515                                     extra=extra, not_exported=not_exported)
1516
1517    """
1518
1519    if name_of_module is None:
1520        name_of_module = (module.__name__, )
1521    elif isinstance(name_of_module, str):
1522        name_of_module = (name_of_module, )
1523
1524    expected = set(extra)
1525
1526    for name in dir(module):
1527        if name.startswith('_') or name in not_exported:
1528            continue
1529        obj = getattr(module, name)
1530        if (getattr(obj, '__module__', None) in name_of_module or
1531                (not hasattr(obj, '__module__') and
1532                 not isinstance(obj, types.ModuleType))):
1533            expected.add(name)
1534    test_case.assertCountEqual(module.__all__, expected)
1535
1536
1537def suppress_msvcrt_asserts(verbose=False):
1538    try:
1539        import msvcrt
1540    except ImportError:
1541        return
1542
1543    msvcrt.SetErrorMode(msvcrt.SEM_FAILCRITICALERRORS
1544                        | msvcrt.SEM_NOALIGNMENTFAULTEXCEPT
1545                        | msvcrt.SEM_NOGPFAULTERRORBOX
1546                        | msvcrt.SEM_NOOPENFILEERRORBOX)
1547
1548    # CrtSetReportMode() is only available in debug build
1549    if hasattr(msvcrt, 'CrtSetReportMode'):
1550        for m in [msvcrt.CRT_WARN, msvcrt.CRT_ERROR, msvcrt.CRT_ASSERT]:
1551            if verbose:
1552                msvcrt.CrtSetReportMode(m, msvcrt.CRTDBG_MODE_FILE)
1553                msvcrt.CrtSetReportFile(m, msvcrt.CRTDBG_FILE_STDERR)
1554            else:
1555                msvcrt.CrtSetReportMode(m, 0)
1556
1557
1558class SuppressCrashReport:
1559    """Try to prevent a crash report from popping up.
1560
1561    On Windows, don't display the Windows Error Reporting dialog.  On UNIX,
1562    disable the creation of coredump file.
1563    """
1564    old_value = None
1565    old_modes = None
1566
1567    def __enter__(self):
1568        """On Windows, disable Windows Error Reporting dialogs using
1569        SetErrorMode() and CrtSetReportMode().
1570
1571        On UNIX, try to save the previous core file size limit, then set
1572        soft limit to 0.
1573        """
1574        if sys.platform.startswith('win'):
1575            # see http://msdn.microsoft.com/en-us/library/windows/desktop/ms680621.aspx
1576            try:
1577                import msvcrt
1578            except ImportError:
1579                return
1580
1581            self.old_value = msvcrt.GetErrorMode()
1582
1583            msvcrt.SetErrorMode(self.old_value | msvcrt.SEM_NOGPFAULTERRORBOX)
1584
1585            # bpo-23314: Suppress assert dialogs in debug builds.
1586            # CrtSetReportMode() is only available in debug build.
1587            if hasattr(msvcrt, 'CrtSetReportMode'):
1588                self.old_modes = {}
1589                for report_type in [msvcrt.CRT_WARN,
1590                                    msvcrt.CRT_ERROR,
1591                                    msvcrt.CRT_ASSERT]:
1592                    old_mode = msvcrt.CrtSetReportMode(report_type,
1593                            msvcrt.CRTDBG_MODE_FILE)
1594                    old_file = msvcrt.CrtSetReportFile(report_type,
1595                            msvcrt.CRTDBG_FILE_STDERR)
1596                    self.old_modes[report_type] = old_mode, old_file
1597
1598        else:
1599            try:
1600                import resource
1601                self.resource = resource
1602            except ImportError:
1603                self.resource = None
1604            if self.resource is not None:
1605                try:
1606                    self.old_value = self.resource.getrlimit(self.resource.RLIMIT_CORE)
1607                    self.resource.setrlimit(self.resource.RLIMIT_CORE,
1608                                            (0, self.old_value[1]))
1609                except (ValueError, OSError):
1610                    pass
1611
1612            if sys.platform == 'darwin':
1613                import subprocess
1614                # Check if the 'Crash Reporter' on OSX was configured
1615                # in 'Developer' mode and warn that it will get triggered
1616                # when it is.
1617                #
1618                # This assumes that this context manager is used in tests
1619                # that might trigger the next manager.
1620                cmd = ['/usr/bin/defaults', 'read',
1621                       'com.apple.CrashReporter', 'DialogType']
1622                proc = subprocess.Popen(cmd,
1623                                        stdout=subprocess.PIPE,
1624                                        stderr=subprocess.PIPE)
1625                with proc:
1626                    stdout = proc.communicate()[0]
1627                if stdout.strip() == b'developer':
1628                    print("this test triggers the Crash Reporter, "
1629                          "that is intentional", end='', flush=True)
1630
1631        return self
1632
1633    def __exit__(self, *ignore_exc):
1634        """Restore Windows ErrorMode or core file behavior to initial value."""
1635        if self.old_value is None:
1636            return
1637
1638        if sys.platform.startswith('win'):
1639            import msvcrt
1640            msvcrt.SetErrorMode(self.old_value)
1641
1642            if self.old_modes:
1643                for report_type, (old_mode, old_file) in self.old_modes.items():
1644                    msvcrt.CrtSetReportMode(report_type, old_mode)
1645                    msvcrt.CrtSetReportFile(report_type, old_file)
1646        else:
1647            if self.resource is not None:
1648                try:
1649                    self.resource.setrlimit(self.resource.RLIMIT_CORE, self.old_value)
1650                except (ValueError, OSError):
1651                    pass
1652
1653
1654def patch(test_instance, object_to_patch, attr_name, new_value):
1655    """Override 'object_to_patch'.'attr_name' with 'new_value'.
1656
1657    Also, add a cleanup procedure to 'test_instance' to restore
1658    'object_to_patch' value for 'attr_name'.
1659    The 'attr_name' should be a valid attribute for 'object_to_patch'.
1660
1661    """
1662    # check that 'attr_name' is a real attribute for 'object_to_patch'
1663    # will raise AttributeError if it does not exist
1664    getattr(object_to_patch, attr_name)
1665
1666    # keep a copy of the old value
1667    attr_is_local = False
1668    try:
1669        old_value = object_to_patch.__dict__[attr_name]
1670    except (AttributeError, KeyError):
1671        old_value = getattr(object_to_patch, attr_name, None)
1672    else:
1673        attr_is_local = True
1674
1675    # restore the value when the test is done
1676    def cleanup():
1677        if attr_is_local:
1678            setattr(object_to_patch, attr_name, old_value)
1679        else:
1680            delattr(object_to_patch, attr_name)
1681
1682    test_instance.addCleanup(cleanup)
1683
1684    # actually override the attribute
1685    setattr(object_to_patch, attr_name, new_value)
1686
1687
1688def run_in_subinterp(code):
1689    """
1690    Run code in a subinterpreter. Raise unittest.SkipTest if the tracemalloc
1691    module is enabled.
1692    """
1693    # Issue #10915, #15751: PyGILState_*() functions don't work with
1694    # sub-interpreters, the tracemalloc module uses these functions internally
1695    try:
1696        import tracemalloc
1697    except ImportError:
1698        pass
1699    else:
1700        if tracemalloc.is_tracing():
1701            raise unittest.SkipTest("run_in_subinterp() cannot be used "
1702                                     "if tracemalloc module is tracing "
1703                                     "memory allocations")
1704    import _testcapi
1705    return _testcapi.run_in_subinterp(code)
1706
1707
1708def check_free_after_iterating(test, iter, cls, args=()):
1709    class A(cls):
1710        def __del__(self):
1711            nonlocal done
1712            done = True
1713            try:
1714                next(it)
1715            except StopIteration:
1716                pass
1717
1718    done = False
1719    it = iter(A(*args))
1720    # Issue 26494: Shouldn't crash
1721    test.assertRaises(StopIteration, next, it)
1722    # The sequence should be deallocated just after the end of iterating
1723    gc_collect()
1724    test.assertTrue(done)
1725
1726
1727def missing_compiler_executable(cmd_names=[]):
1728    """Check if the compiler components used to build the interpreter exist.
1729
1730    Check for the existence of the compiler executables whose names are listed
1731    in 'cmd_names' or all the compiler executables when 'cmd_names' is empty
1732    and return the first missing executable or None when none is found
1733    missing.
1734
1735    """
1736    # TODO (PEP 632): alternate check without using distutils
1737    from distutils import ccompiler, sysconfig, spawn, errors
1738    compiler = ccompiler.new_compiler()
1739    sysconfig.customize_compiler(compiler)
1740    if compiler.compiler_type == "msvc":
1741        # MSVC has no executables, so check whether initialization succeeds
1742        try:
1743            compiler.initialize()
1744        except errors.DistutilsPlatformError:
1745            return "msvc"
1746    for name in compiler.executables:
1747        if cmd_names and name not in cmd_names:
1748            continue
1749        cmd = getattr(compiler, name)
1750        if cmd_names:
1751            assert cmd is not None, \
1752                    "the '%s' executable is not configured" % name
1753        elif not cmd:
1754            continue
1755        if spawn.find_executable(cmd[0]) is None:
1756            return cmd[0]
1757
1758
1759_is_android_emulator = None
1760def setswitchinterval(interval):
1761    # Setting a very low gil interval on the Android emulator causes python
1762    # to hang (issue #26939).
1763    minimum_interval = 1e-5
1764    if is_android and interval < minimum_interval:
1765        global _is_android_emulator
1766        if _is_android_emulator is None:
1767            import subprocess
1768            _is_android_emulator = (subprocess.check_output(
1769                               ['getprop', 'ro.kernel.qemu']).strip() == b'1')
1770        if _is_android_emulator:
1771            interval = minimum_interval
1772    return sys.setswitchinterval(interval)
1773
1774
1775@contextlib.contextmanager
1776def disable_faulthandler():
1777    import faulthandler
1778
1779    # use sys.__stderr__ instead of sys.stderr, since regrtest replaces
1780    # sys.stderr with a StringIO which has no file descriptor when a test
1781    # is run with -W/--verbose3.
1782    fd = sys.__stderr__.fileno()
1783
1784    is_enabled = faulthandler.is_enabled()
1785    try:
1786        faulthandler.disable()
1787        yield
1788    finally:
1789        if is_enabled:
1790            faulthandler.enable(file=fd, all_threads=True)
1791
1792
1793class SaveSignals:
1794    """
1795    Save and restore signal handlers.
1796
1797    This class is only able to save/restore signal handlers registered
1798    by the Python signal module: see bpo-13285 for "external" signal
1799    handlers.
1800    """
1801
1802    def __init__(self):
1803        import signal
1804        self.signal = signal
1805        self.signals = signal.valid_signals()
1806        # SIGKILL and SIGSTOP signals cannot be ignored nor caught
1807        for signame in ('SIGKILL', 'SIGSTOP'):
1808            try:
1809                signum = getattr(signal, signame)
1810            except AttributeError:
1811                continue
1812            self.signals.remove(signum)
1813        self.handlers = {}
1814
1815    def save(self):
1816        for signum in self.signals:
1817            handler = self.signal.getsignal(signum)
1818            if handler is None:
1819                # getsignal() returns None if a signal handler was not
1820                # registered by the Python signal module,
1821                # and the handler is not SIG_DFL nor SIG_IGN.
1822                #
1823                # Ignore the signal: we cannot restore the handler.
1824                continue
1825            self.handlers[signum] = handler
1826
1827    def restore(self):
1828        for signum, handler in self.handlers.items():
1829            self.signal.signal(signum, handler)
1830
1831
1832def with_pymalloc():
1833    import _testcapi
1834    return _testcapi.WITH_PYMALLOC
1835
1836
1837class _ALWAYS_EQ:
1838    """
1839    Object that is equal to anything.
1840    """
1841    def __eq__(self, other):
1842        return True
1843    def __ne__(self, other):
1844        return False
1845
1846ALWAYS_EQ = _ALWAYS_EQ()
1847
1848class _NEVER_EQ:
1849    """
1850    Object that is not equal to anything.
1851    """
1852    def __eq__(self, other):
1853        return False
1854    def __ne__(self, other):
1855        return True
1856    def __hash__(self):
1857        return 1
1858
1859NEVER_EQ = _NEVER_EQ()
1860
1861@functools.total_ordering
1862class _LARGEST:
1863    """
1864    Object that is greater than anything (except itself).
1865    """
1866    def __eq__(self, other):
1867        return isinstance(other, _LARGEST)
1868    def __lt__(self, other):
1869        return False
1870
1871LARGEST = _LARGEST()
1872
1873@functools.total_ordering
1874class _SMALLEST:
1875    """
1876    Object that is less than anything (except itself).
1877    """
1878    def __eq__(self, other):
1879        return isinstance(other, _SMALLEST)
1880    def __gt__(self, other):
1881        return False
1882
1883SMALLEST = _SMALLEST()
1884
1885def maybe_get_event_loop_policy():
1886    """Return the global event loop policy if one is set, else return None."""
1887    import asyncio.events
1888    return asyncio.events._event_loop_policy
1889
1890# Helpers for testing hashing.
1891NHASHBITS = sys.hash_info.width # number of bits in hash() result
1892assert NHASHBITS in (32, 64)
1893
1894# Return mean and sdev of number of collisions when tossing nballs balls
1895# uniformly at random into nbins bins.  By definition, the number of
1896# collisions is the number of balls minus the number of occupied bins at
1897# the end.
1898def collision_stats(nbins, nballs):
1899    n, k = nbins, nballs
1900    # prob a bin empty after k trials = (1 - 1/n)**k
1901    # mean # empty is then n * (1 - 1/n)**k
1902    # so mean # occupied is n - n * (1 - 1/n)**k
1903    # so collisions = k - (n - n*(1 - 1/n)**k)
1904    #
1905    # For the variance:
1906    # n*(n-1)*(1-2/n)**k + meanempty - meanempty**2 =
1907    # n*(n-1)*(1-2/n)**k + meanempty * (1 - meanempty)
1908    #
1909    # Massive cancellation occurs, and, e.g., for a 64-bit hash code
1910    # 1-1/2**64 rounds uselessly to 1.0.  Rather than make heroic (and
1911    # error-prone) efforts to rework the naive formulas to avoid those,
1912    # we use the `decimal` module to get plenty of extra precision.
1913    #
1914    # Note:  the exact values are straightforward to compute with
1915    # rationals, but in context that's unbearably slow, requiring
1916    # multi-million bit arithmetic.
1917    import decimal
1918    with decimal.localcontext() as ctx:
1919        bits = n.bit_length() * 2  # bits in n**2
1920        # At least that many bits will likely cancel out.
1921        # Use that many decimal digits instead.
1922        ctx.prec = max(bits, 30)
1923        dn = decimal.Decimal(n)
1924        p1empty = ((dn - 1) / dn) ** k
1925        meanempty = n * p1empty
1926        occupied = n - meanempty
1927        collisions = k - occupied
1928        var = dn*(dn-1)*((dn-2)/dn)**k + meanempty * (1 - meanempty)
1929        return float(collisions), float(var.sqrt())
1930
1931
1932class catch_unraisable_exception:
1933    """
1934    Context manager catching unraisable exception using sys.unraisablehook.
1935
1936    Storing the exception value (cm.unraisable.exc_value) creates a reference
1937    cycle. The reference cycle is broken explicitly when the context manager
1938    exits.
1939
1940    Storing the object (cm.unraisable.object) can resurrect it if it is set to
1941    an object which is being finalized. Exiting the context manager clears the
1942    stored object.
1943
1944    Usage:
1945
1946        with support.catch_unraisable_exception() as cm:
1947            # code creating an "unraisable exception"
1948            ...
1949
1950            # check the unraisable exception: use cm.unraisable
1951            ...
1952
1953        # cm.unraisable attribute no longer exists at this point
1954        # (to break a reference cycle)
1955    """
1956
1957    def __init__(self):
1958        self.unraisable = None
1959        self._old_hook = None
1960
1961    def _hook(self, unraisable):
1962        # Storing unraisable.object can resurrect an object which is being
1963        # finalized. Storing unraisable.exc_value creates a reference cycle.
1964        self.unraisable = unraisable
1965
1966    def __enter__(self):
1967        self._old_hook = sys.unraisablehook
1968        sys.unraisablehook = self._hook
1969        return self
1970
1971    def __exit__(self, *exc_info):
1972        sys.unraisablehook = self._old_hook
1973        del self.unraisable
1974
1975
1976def wait_process(pid, *, exitcode, timeout=None):
1977    """
1978    Wait until process pid completes and check that the process exit code is
1979    exitcode.
1980
1981    Raise an AssertionError if the process exit code is not equal to exitcode.
1982
1983    If the process runs longer than timeout seconds (SHORT_TIMEOUT by default),
1984    kill the process (if signal.SIGKILL is available) and raise an
1985    AssertionError. The timeout feature is not available on Windows.
1986    """
1987    if os.name != "nt":
1988        import signal
1989
1990        if timeout is None:
1991            timeout = SHORT_TIMEOUT
1992        t0 = time.monotonic()
1993        sleep = 0.001
1994        max_sleep = 0.1
1995        while True:
1996            pid2, status = os.waitpid(pid, os.WNOHANG)
1997            if pid2 != 0:
1998                break
1999            # process is still running
2000
2001            dt = time.monotonic() - t0
2002            if dt > SHORT_TIMEOUT:
2003                try:
2004                    os.kill(pid, signal.SIGKILL)
2005                    os.waitpid(pid, 0)
2006                except OSError:
2007                    # Ignore errors like ChildProcessError or PermissionError
2008                    pass
2009
2010                raise AssertionError(f"process {pid} is still running "
2011                                     f"after {dt:.1f} seconds")
2012
2013            sleep = min(sleep * 2, max_sleep)
2014            time.sleep(sleep)
2015    else:
2016        # Windows implementation
2017        pid2, status = os.waitpid(pid, 0)
2018
2019    exitcode2 = os.waitstatus_to_exitcode(status)
2020    if exitcode2 != exitcode:
2021        raise AssertionError(f"process {pid} exited with code {exitcode2}, "
2022                             f"but exit code {exitcode} is expected")
2023
2024    # sanity check: it should not fail in practice
2025    if pid2 != pid:
2026        raise AssertionError(f"pid {pid2} != pid {pid}")
2027
2028def skip_if_broken_multiprocessing_synchronize():
2029    """
2030    Skip tests if the multiprocessing.synchronize module is missing, if there
2031    is no available semaphore implementation, or if creating a lock raises an
2032    OSError (on Linux only).
2033    """
2034    from .import_helper import import_module
2035
2036    # Skip tests if the _multiprocessing extension is missing.
2037    import_module('_multiprocessing')
2038
2039    # Skip tests if there is no available semaphore implementation:
2040    # multiprocessing.synchronize requires _multiprocessing.SemLock.
2041    synchronize = import_module('multiprocessing.synchronize')
2042
2043    if sys.platform == "linux":
2044        try:
2045            # bpo-38377: On Linux, creating a semaphore fails with OSError
2046            # if the current user does not have the permission to create
2047            # a file in /dev/shm/ directory.
2048            synchronize.Lock(ctx=None)
2049        except OSError as exc:
2050            raise unittest.SkipTest(f"broken multiprocessing SemLock: {exc!r}")
2051
2052
2053def check_disallow_instantiation(testcase, tp, *args, **kwds):
2054    """
2055    Check that given type cannot be instantiated using *args and **kwds.
2056
2057    See bpo-43916: Add Py_TPFLAGS_DISALLOW_INSTANTIATION type flag.
2058    """
2059    mod = tp.__module__
2060    name = tp.__name__
2061    if mod != 'builtins':
2062        qualname = f"{mod}.{name}"
2063    else:
2064        qualname = f"{name}"
2065    msg = f"cannot create '{re.escape(qualname)}' instances"
2066    testcase.assertRaisesRegex(TypeError, msg, tp, *args, **kwds)
2067
2068@contextlib.contextmanager
2069def infinite_recursion(max_depth=75):
2070    """Set a lower limit for tests that interact with infinite recursions
2071    (e.g test_ast.ASTHelpers_Test.test_recursion_direct) since on some
2072    debug windows builds, due to not enough functions being inlined the
2073    stack size might not handle the default recursion limit (1000). See
2074    bpo-11105 for details."""
2075
2076    original_depth = sys.getrecursionlimit()
2077    try:
2078        sys.setrecursionlimit(max_depth)
2079        yield
2080    finally:
2081        sys.setrecursionlimit(original_depth)
2082
2083def ignore_deprecations_from(module: str, *, like: str) -> object:
2084    token = object()
2085    warnings.filterwarnings(
2086        "ignore",
2087        category=DeprecationWarning,
2088        module=module,
2089        message=like + fr"(?#support{id(token)})",
2090    )
2091    return token
2092
2093def clear_ignored_deprecations(*tokens: object) -> None:
2094    if not tokens:
2095        raise ValueError("Provide token or tokens returned by ignore_deprecations_from")
2096
2097    new_filters = []
2098    endswith = tuple(rf"(?#support{id(token)})" for token in tokens)
2099    for action, message, category, module, lineno in warnings.filters:
2100        if action == "ignore" and category is DeprecationWarning:
2101            if isinstance(message, re.Pattern):
2102                msg = message.pattern
2103            else:
2104                msg = message or ""
2105            if msg.endswith(endswith):
2106                continue
2107        new_filters.append((action, message, category, module, lineno))
2108    if warnings.filters != new_filters:
2109        warnings.filters[:] = new_filters
2110        warnings._filters_mutated()
2111