1# -*- coding: utf-8 -*-
2"""Supporting definitions for the Python regression tests.
3
4Backported for python-future from Python 3.3 test/support.py.
5"""
6
7from __future__ import (absolute_import, division,
8                        print_function, unicode_literals)
9from future import utils
10from future.builtins import str, range, open, int, map, list
11
12import contextlib
13import errno
14import functools
15import gc
16import socket
17import sys
18import os
19import platform
20import shutil
21import warnings
22import unittest
23# For Python 2.6 compatibility:
24if not hasattr(unittest, 'skip'):
25    import unittest2 as unittest
26
27import importlib
28# import collections.abc    # not present on Py2.7
29import re
30import subprocess
31import imp
32import time
33try:
34    import sysconfig
35except ImportError:
36    # sysconfig is not available on Python 2.6. Try using distutils.sysconfig instead:
37    from distutils import sysconfig
38import fnmatch
39import logging.handlers
40import struct
41import tempfile
42
43try:
44    if utils.PY3:
45        import _thread, threading
46    else:
47        import thread as _thread, threading
48except ImportError:
49    _thread = None
50    threading = None
51try:
52    import multiprocessing.process
53except ImportError:
54    multiprocessing = None
55
56try:
57    import zlib
58except ImportError:
59    zlib = None
60
61try:
62    import gzip
63except ImportError:
64    gzip = None
65
66try:
67    import bz2
68except ImportError:
69    bz2 = None
70
71try:
72    import lzma
73except ImportError:
74    lzma = None
75
76__all__ = [
77    "Error", "TestFailed", "ResourceDenied", "import_module", "verbose",
78    "use_resources", "max_memuse", "record_original_stdout",
79    "get_original_stdout", "unload", "unlink", "rmtree", "forget",
80    "is_resource_enabled", "requires", "requires_freebsd_version",
81    "requires_linux_version", "requires_mac_ver", "find_unused_port",
82    "bind_port", "IPV6_ENABLED", "is_jython", "TESTFN", "HOST", "SAVEDCWD",
83    "temp_cwd", "findfile", "create_empty_file", "sortdict",
84    "check_syntax_error", "open_urlresource", "check_warnings", "CleanImport",
85    "EnvironmentVarGuard", "TransientResource", "captured_stdout",
86    "captured_stdin", "captured_stderr", "time_out", "socket_peer_reset",
87    "ioerror_peer_reset", "run_with_locale", 'temp_umask',
88    "transient_internet", "set_memlimit", "bigmemtest", "bigaddrspacetest",
89    "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup",
90    "threading_cleanup", "reap_children", "cpython_only", "check_impl_detail",
91    "get_attribute", "swap_item", "swap_attr", "requires_IEEE_754",
92    "TestHandler", "Matcher", "can_symlink", "skip_unless_symlink",
93    "skip_unless_xattr", "import_fresh_module", "requires_zlib",
94    "PIPE_MAX_SIZE", "failfast", "anticipate_failure", "run_with_tz",
95    "requires_gzip", "requires_bz2", "requires_lzma", "suppress_crash_popup",
96    ]
97
98class Error(Exception):
99    """Base class for regression test exceptions."""
100
101class TestFailed(Error):
102    """Test failed."""
103
104class ResourceDenied(unittest.SkipTest):
105    """Test skipped because it requested a disallowed resource.
106
107    This is raised when a test calls requires() for a resource that
108    has not be enabled.  It is used to distinguish between expected
109    and unexpected skips.
110    """
111
112@contextlib.contextmanager
113def _ignore_deprecated_imports(ignore=True):
114    """Context manager to suppress package and module deprecation
115    warnings when importing them.
116
117    If ignore is False, this context manager has no effect."""
118    if ignore:
119        with warnings.catch_warnings():
120            warnings.filterwarnings("ignore", ".+ (module|package)",
121                                    DeprecationWarning)
122            yield
123    else:
124        yield
125
126
127def import_module(name, deprecated=False):
128    """Import and return the module to be tested, raising SkipTest if
129    it is not available.
130
131    If deprecated is True, any module or package deprecation messages
132    will be suppressed."""
133    with _ignore_deprecated_imports(deprecated):
134        try:
135            return importlib.import_module(name)
136        except ImportError as msg:
137            raise unittest.SkipTest(str(msg))
138
139
140def _save_and_remove_module(name, orig_modules):
141    """Helper function to save and remove a module from sys.modules
142
143    Raise ImportError if the module can't be imported.
144    """
145    # try to import the module and raise an error if it can't be imported
146    if name not in sys.modules:
147        __import__(name)
148        del sys.modules[name]
149    for modname in list(sys.modules):
150        if modname == name or modname.startswith(name + '.'):
151            orig_modules[modname] = sys.modules[modname]
152            del sys.modules[modname]
153
154def _save_and_block_module(name, orig_modules):
155    """Helper function to save and block a module in sys.modules
156
157    Return True if the module was in sys.modules, False otherwise.
158    """
159    saved = True
160    try:
161        orig_modules[name] = sys.modules[name]
162    except KeyError:
163        saved = False
164    sys.modules[name] = None
165    return saved
166
167
168def anticipate_failure(condition):
169    """Decorator to mark a test that is known to be broken in some cases
170
171       Any use of this decorator should have a comment identifying the
172       associated tracker issue.
173    """
174    if condition:
175        return unittest.expectedFailure
176    return lambda f: f
177
178
179def import_fresh_module(name, fresh=(), blocked=(), deprecated=False):
180    """Import and return a module, deliberately bypassing sys.modules.
181    This function imports and returns a fresh copy of the named Python module
182    by removing the named module from sys.modules before doing the import.
183    Note that unlike reload, the original module is not affected by
184    this operation.
185
186    *fresh* is an iterable of additional module names that are also removed
187    from the sys.modules cache before doing the import.
188
189    *blocked* is an iterable of module names that are replaced with None
190    in the module cache during the import to ensure that attempts to import
191    them raise ImportError.
192
193    The named module and any modules named in the *fresh* and *blocked*
194    parameters are saved before starting the import and then reinserted into
195    sys.modules when the fresh import is complete.
196
197    Module and package deprecation messages are suppressed during this import
198    if *deprecated* is True.
199
200    This function will raise ImportError if the named module cannot be
201    imported.
202
203    If deprecated is True, any module or package deprecation messages
204    will be suppressed.
205    """
206    # NOTE: test_heapq, test_json and test_warnings include extra sanity checks
207    # to make sure that this utility function is working as expected
208    with _ignore_deprecated_imports(deprecated):
209        # Keep track of modules saved for later restoration as well
210        # as those which just need a blocking entry removed
211        orig_modules = {}
212        names_to_remove = []
213        _save_and_remove_module(name, orig_modules)
214        try:
215            for fresh_name in fresh:
216                _save_and_remove_module(fresh_name, orig_modules)
217            for blocked_name in blocked:
218                if not _save_and_block_module(blocked_name, orig_modules):
219                    names_to_remove.append(blocked_name)
220            fresh_module = importlib.import_module(name)
221        except ImportError:
222            fresh_module = None
223        finally:
224            for orig_name, module in orig_modules.items():
225                sys.modules[orig_name] = module
226            for name_to_remove in names_to_remove:
227                del sys.modules[name_to_remove]
228        return fresh_module
229
230
231def get_attribute(obj, name):
232    """Get an attribute, raising SkipTest if AttributeError is raised."""
233    try:
234        attribute = getattr(obj, name)
235    except AttributeError:
236        raise unittest.SkipTest("object %r has no attribute %r" % (obj, name))
237    else:
238        return attribute
239
240verbose = 1              # Flag set to 0 by regrtest.py
241use_resources = None     # Flag set to [] by regrtest.py
242max_memuse = 0           # Disable bigmem tests (they will still be run with
243                         # small sizes, to make sure they work.)
244real_max_memuse = 0
245failfast = False
246match_tests = None
247
248# _original_stdout is meant to hold stdout at the time regrtest began.
249# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
250# The point is to have some flavor of stdout the user can actually see.
251_original_stdout = None
252def record_original_stdout(stdout):
253    global _original_stdout
254    _original_stdout = stdout
255
256def get_original_stdout():
257    return _original_stdout or sys.stdout
258
259def unload(name):
260    try:
261        del sys.modules[name]
262    except KeyError:
263        pass
264
265if sys.platform.startswith("win"):
266    def _waitfor(func, pathname, waitall=False):
267        # Perform the operation
268        func(pathname)
269        # Now setup the wait loop
270        if waitall:
271            dirname = pathname
272        else:
273            dirname, name = os.path.split(pathname)
274            dirname = dirname or '.'
275        # Check for `pathname` to be removed from the filesystem.
276        # The exponential backoff of the timeout amounts to a total
277        # of ~1 second after which the deletion is probably an error
278        # anyway.
279        # Testing on a i7@4.3GHz shows that usually only 1 iteration is
280        # required when contention occurs.
281        timeout = 0.001
282        while timeout < 1.0:
283            # Note we are only testing for the existence of the file(s) in
284            # the contents of the directory regardless of any security or
285            # access rights.  If we have made it this far, we have sufficient
286            # permissions to do that much using Python's equivalent of the
287            # Windows API FindFirstFile.
288            # Other Windows APIs can fail or give incorrect results when
289            # dealing with files that are pending deletion.
290            L = os.listdir(dirname)
291            if not (L if waitall else name in L):
292                return
293            # Increase the timeout and try again
294            time.sleep(timeout)
295            timeout *= 2
296        warnings.warn('tests may fail, delete still pending for ' + pathname,
297                      RuntimeWarning, stacklevel=4)
298
299    def _unlink(filename):
300        _waitfor(os.unlink, filename)
301
302    def _rmdir(dirname):
303        _waitfor(os.rmdir, dirname)
304
305    def _rmtree(path):
306        def _rmtree_inner(path):
307            for name in os.listdir(path):
308                fullname = os.path.join(path, name)
309                if os.path.isdir(fullname):
310                    _waitfor(_rmtree_inner, fullname, waitall=True)
311                    os.rmdir(fullname)
312                else:
313                    os.unlink(fullname)
314        _waitfor(_rmtree_inner, path, waitall=True)
315        _waitfor(os.rmdir, path)
316else:
317    _unlink = os.unlink
318    _rmdir = os.rmdir
319    _rmtree = shutil.rmtree
320
321def unlink(filename):
322    try:
323        _unlink(filename)
324    except OSError as error:
325        # The filename need not exist.
326        if error.errno not in (errno.ENOENT, errno.ENOTDIR):
327            raise
328
329def rmdir(dirname):
330    try:
331        _rmdir(dirname)
332    except OSError as error:
333        # The directory need not exist.
334        if error.errno != errno.ENOENT:
335            raise
336
337def rmtree(path):
338    try:
339        _rmtree(path)
340    except OSError as error:
341        if error.errno != errno.ENOENT:
342            raise
343
344def make_legacy_pyc(source):
345    """Move a PEP 3147 pyc/pyo file to its legacy pyc/pyo location.
346
347    The choice of .pyc or .pyo extension is done based on the __debug__ flag
348    value.
349
350    :param source: The file system path to the source file.  The source file
351        does not need to exist, however the PEP 3147 pyc file must exist.
352    :return: The file system path to the legacy pyc file.
353    """
354    pyc_file = imp.cache_from_source(source)
355    up_one = os.path.dirname(os.path.abspath(source))
356    legacy_pyc = os.path.join(up_one, source + ('c' if __debug__ else 'o'))
357    os.rename(pyc_file, legacy_pyc)
358    return legacy_pyc
359
360def forget(modname):
361    """'Forget' a module was ever imported.
362
363    This removes the module from sys.modules and deletes any PEP 3147 or
364    legacy .pyc and .pyo files.
365    """
366    unload(modname)
367    for dirname in sys.path:
368        source = os.path.join(dirname, modname + '.py')
369        # It doesn't matter if they exist or not, unlink all possible
370        # combinations of PEP 3147 and legacy pyc and pyo files.
371        unlink(source + 'c')
372        unlink(source + 'o')
373        unlink(imp.cache_from_source(source, debug_override=True))
374        unlink(imp.cache_from_source(source, debug_override=False))
375
376# On some platforms, should not run gui test even if it is allowed
377# in `use_resources'.
378if sys.platform.startswith('win'):
379    import ctypes
380    import ctypes.wintypes
381    def _is_gui_available():
382        UOI_FLAGS = 1
383        WSF_VISIBLE = 0x0001
384        class USEROBJECTFLAGS(ctypes.Structure):
385            _fields_ = [("fInherit", ctypes.wintypes.BOOL),
386                        ("fReserved", ctypes.wintypes.BOOL),
387                        ("dwFlags", ctypes.wintypes.DWORD)]
388        dll = ctypes.windll.user32
389        h = dll.GetProcessWindowStation()
390        if not h:
391            raise ctypes.WinError()
392        uof = USEROBJECTFLAGS()
393        needed = ctypes.wintypes.DWORD()
394        res = dll.GetUserObjectInformationW(h,
395            UOI_FLAGS,
396            ctypes.byref(uof),
397            ctypes.sizeof(uof),
398            ctypes.byref(needed))
399        if not res:
400            raise ctypes.WinError()
401        return bool(uof.dwFlags & WSF_VISIBLE)
402else:
403    def _is_gui_available():
404        return True
405
406def is_resource_enabled(resource):
407    """Test whether a resource is enabled.  Known resources are set by
408    regrtest.py."""
409    return use_resources is not None and resource in use_resources
410
411def requires(resource, msg=None):
412    """Raise ResourceDenied if the specified resource is not available.
413
414    If the caller's module is __main__ then automatically return True.  The
415    possibility of False being returned occurs when regrtest.py is
416    executing.
417    """
418    if resource == 'gui' and not _is_gui_available():
419        raise unittest.SkipTest("Cannot use the 'gui' resource")
420    # see if the caller's module is __main__ - if so, treat as if
421    # the resource was set
422    if sys._getframe(1).f_globals.get("__name__") == "__main__":
423        return
424    if not is_resource_enabled(resource):
425        if msg is None:
426            msg = "Use of the %r resource not enabled" % resource
427        raise ResourceDenied(msg)
428
429def _requires_unix_version(sysname, min_version):
430    """Decorator raising SkipTest if the OS is `sysname` and the version is less
431    than `min_version`.
432
433    For example, @_requires_unix_version('FreeBSD', (7, 2)) raises SkipTest if
434    the FreeBSD version is less than 7.2.
435    """
436    def decorator(func):
437        @functools.wraps(func)
438        def wrapper(*args, **kw):
439            if platform.system() == sysname:
440                version_txt = platform.release().split('-', 1)[0]
441                try:
442                    version = tuple(map(int, version_txt.split('.')))
443                except ValueError:
444                    pass
445                else:
446                    if version < min_version:
447                        min_version_txt = '.'.join(map(str, min_version))
448                        raise unittest.SkipTest(
449                            "%s version %s or higher required, not %s"
450                            % (sysname, min_version_txt, version_txt))
451            return func(*args, **kw)
452        wrapper.min_version = min_version
453        return wrapper
454    return decorator
455
456def requires_freebsd_version(*min_version):
457    """Decorator raising SkipTest if the OS is FreeBSD and the FreeBSD version is
458    less than `min_version`.
459
460    For example, @requires_freebsd_version(7, 2) raises SkipTest if the FreeBSD
461    version is less than 7.2.
462    """
463    return _requires_unix_version('FreeBSD', min_version)
464
465def requires_linux_version(*min_version):
466    """Decorator raising SkipTest if the OS is Linux and the Linux version is
467    less than `min_version`.
468
469    For example, @requires_linux_version(2, 6, 32) raises SkipTest if the Linux
470    version is less than 2.6.32.
471    """
472    return _requires_unix_version('Linux', min_version)
473
474def requires_mac_ver(*min_version):
475    """Decorator raising SkipTest if the OS is Mac OS X and the OS X
476    version if less than min_version.
477
478    For example, @requires_mac_ver(10, 5) raises SkipTest if the OS X version
479    is lesser than 10.5.
480    """
481    def decorator(func):
482        @functools.wraps(func)
483        def wrapper(*args, **kw):
484            if sys.platform == 'darwin':
485                version_txt = platform.mac_ver()[0]
486                try:
487                    version = tuple(map(int, version_txt.split('.')))
488                except ValueError:
489                    pass
490                else:
491                    if version < min_version:
492                        min_version_txt = '.'.join(map(str, min_version))
493                        raise unittest.SkipTest(
494                            "Mac OS X %s or higher required, not %s"
495                            % (min_version_txt, version_txt))
496            return func(*args, **kw)
497        wrapper.min_version = min_version
498        return wrapper
499    return decorator
500
501# Don't use "localhost", since resolving it uses the DNS under recent
502# Windows versions (see issue #18792).
503HOST = "127.0.0.1"
504HOSTv6 = "::1"
505
506
507def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
508    """Returns an unused port that should be suitable for binding.  This is
509    achieved by creating a temporary socket with the same family and type as
510    the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
511    the specified host address (defaults to 0.0.0.0) with the port set to 0,
512    eliciting an unused ephemeral port from the OS.  The temporary socket is
513    then closed and deleted, and the ephemeral port is returned.
514
515    Either this method or bind_port() should be used for any tests where a
516    server socket needs to be bound to a particular port for the duration of
517    the test.  Which one to use depends on whether the calling code is creating
518    a python socket, or if an unused port needs to be provided in a constructor
519    or passed to an external program (i.e. the -accept argument to openssl's
520    s_server mode).  Always prefer bind_port() over find_unused_port() where
521    possible.  Hard coded ports should *NEVER* be used.  As soon as a server
522    socket is bound to a hard coded port, the ability to run multiple instances
523    of the test simultaneously on the same host is compromised, which makes the
524    test a ticking time bomb in a buildbot environment. On Unix buildbots, this
525    may simply manifest as a failed test, which can be recovered from without
526    intervention in most cases, but on Windows, the entire python process can
527    completely and utterly wedge, requiring someone to log in to the buildbot
528    and manually kill the affected process.
529
530    (This is easy to reproduce on Windows, unfortunately, and can be traced to
531    the SO_REUSEADDR socket option having different semantics on Windows versus
532    Unix/Linux.  On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
533    listen and then accept connections on identical host/ports.  An EADDRINUSE
534    socket.error will be raised at some point (depending on the platform and
535    the order bind and listen were called on each socket).
536
537    However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
538    will ever be raised when attempting to bind two identical host/ports. When
539    accept() is called on each socket, the second caller's process will steal
540    the port from the first caller, leaving them both in an awkwardly wedged
541    state where they'll no longer respond to any signals or graceful kills, and
542    must be forcibly killed via OpenProcess()/TerminateProcess().
543
544    The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
545    instead of SO_REUSEADDR, which effectively affords the same semantics as
546    SO_REUSEADDR on Unix.  Given the propensity of Unix developers in the Open
547    Source world compared to Windows ones, this is a common mistake.  A quick
548    look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
549    openssl.exe is called with the 's_server' option, for example. See
550    http://bugs.python.org/issue2550 for more info.  The following site also
551    has a very thorough description about the implications of both REUSEADDR
552    and EXCLUSIVEADDRUSE on Windows:
553    http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx)
554
555    XXX: although this approach is a vast improvement on previous attempts to
556    elicit unused ports, it rests heavily on the assumption that the ephemeral
557    port returned to us by the OS won't immediately be dished back out to some
558    other process when we close and delete our temporary socket but before our
559    calling code has a chance to bind the returned port.  We can deal with this
560    issue if/when we come across it.
561    """
562
563    tempsock = socket.socket(family, socktype)
564    port = bind_port(tempsock)
565    tempsock.close()
566    del tempsock
567    return port
568
569def bind_port(sock, host=HOST):
570    """Bind the socket to a free port and return the port number.  Relies on
571    ephemeral ports in order to ensure we are using an unbound port.  This is
572    important as many tests may be running simultaneously, especially in a
573    buildbot environment.  This method raises an exception if the sock.family
574    is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
575    or SO_REUSEPORT set on it.  Tests should *never* set these socket options
576    for TCP/IP sockets.  The only case for setting these options is testing
577    multicasting via multiple UDP sockets.
578
579    Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
580    on Windows), it will be set on the socket.  This will prevent anyone else
581    from bind()'ing to our host/port for the duration of the test.
582    """
583
584    if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
585        if hasattr(socket, 'SO_REUSEADDR'):
586            if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
587                raise TestFailed("tests should never set the SO_REUSEADDR "   \
588                                 "socket option on TCP/IP sockets!")
589        if hasattr(socket, 'SO_REUSEPORT'):
590            try:
591                if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
592                    raise TestFailed("tests should never set the SO_REUSEPORT "   \
593                                     "socket option on TCP/IP sockets!")
594            except socket.error:
595                # Python's socket module was compiled using modern headers
596                # thus defining SO_REUSEPORT but this process is running
597                # under an older kernel that does not support SO_REUSEPORT.
598                pass
599        if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
600            sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
601
602    sock.bind((host, 0))
603    port = sock.getsockname()[1]
604    return port
605
606def _is_ipv6_enabled():
607    """Check whether IPv6 is enabled on this host."""
608    if socket.has_ipv6:
609        sock = None
610        try:
611            sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
612            sock.bind(('::1', 0))
613            return True
614        except (socket.error, socket.gaierror):
615            pass
616        finally:
617            if sock:
618                sock.close()
619    return False
620
621IPV6_ENABLED = _is_ipv6_enabled()
622
623
624# A constant likely larger than the underlying OS pipe buffer size, to
625# make writes blocking.
626# Windows limit seems to be around 512 B, and many Unix kernels have a
627# 64 KiB pipe buffer size or 16 * PAGE_SIZE: take a few megs to be sure.
628# (see issue #17835 for a discussion of this number).
629PIPE_MAX_SIZE = 4 * 1024 * 1024 + 1
630
631# A constant likely larger than the underlying OS socket buffer size, to make
632# writes blocking.
633# The socket buffer sizes can usually be tuned system-wide (e.g. through sysctl
634# on Linux), or on a per-socket basis (SO_SNDBUF/SO_RCVBUF). See issue #18643
635# for a discussion of this number).
636SOCK_MAX_SIZE = 16 * 1024 * 1024 + 1
637
638# # decorator for skipping tests on non-IEEE 754 platforms
639# requires_IEEE_754 = unittest.skipUnless(
640#     float.__getformat__("double").startswith("IEEE"),
641#     "test requires IEEE 754 doubles")
642
643requires_zlib = unittest.skipUnless(zlib, 'requires zlib')
644
645requires_bz2 = unittest.skipUnless(bz2, 'requires bz2')
646
647requires_lzma = unittest.skipUnless(lzma, 'requires lzma')
648
649is_jython = sys.platform.startswith('java')
650
651# Filename used for testing
652if os.name == 'java':
653    # Jython disallows @ in module names
654    TESTFN = '$test'
655else:
656    TESTFN = '@test'
657
658# Disambiguate TESTFN for parallel testing, while letting it remain a valid
659# module name.
660TESTFN = "{0}_{1}_tmp".format(TESTFN, os.getpid())
661
662# # FS_NONASCII: non-ASCII character encodable by os.fsencode(),
663# # or None if there is no such character.
664# FS_NONASCII = None
665# for character in (
666#     # First try printable and common characters to have a readable filename.
667#     # For each character, the encoding list are just example of encodings able
668#     # to encode the character (the list is not exhaustive).
669#
670#     # U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1
671#     '\u00E6',
672#     # U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3
673#     '\u0130',
674#     # U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257
675#     '\u0141',
676#     # U+03C6 (Greek Small Letter Phi): cp1253
677#     '\u03C6',
678#     # U+041A (Cyrillic Capital Letter Ka): cp1251
679#     '\u041A',
680#     # U+05D0 (Hebrew Letter Alef): Encodable to cp424
681#     '\u05D0',
682#     # U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic
683#     '\u060C',
684#     # U+062A (Arabic Letter Teh): cp720
685#     '\u062A',
686#     # U+0E01 (Thai Character Ko Kai): cp874
687#     '\u0E01',
688#
689#     # Then try more "special" characters. "special" because they may be
690#     # interpreted or displayed differently depending on the exact locale
691#     # encoding and the font.
692#
693#     # U+00A0 (No-Break Space)
694#     '\u00A0',
695#     # U+20AC (Euro Sign)
696#     '\u20AC',
697# ):
698#     try:
699#         os.fsdecode(os.fsencode(character))
700#     except UnicodeError:
701#         pass
702#     else:
703#         FS_NONASCII = character
704#         break
705#
706# # TESTFN_UNICODE is a non-ascii filename
707# TESTFN_UNICODE = TESTFN + "-\xe0\xf2\u0258\u0141\u011f"
708# if sys.platform == 'darwin':
709#     # In Mac OS X's VFS API file names are, by definition, canonically
710#     # decomposed Unicode, encoded using UTF-8. See QA1173:
711#     # http://developer.apple.com/mac/library/qa/qa2001/qa1173.html
712#     import unicodedata
713#     TESTFN_UNICODE = unicodedata.normalize('NFD', TESTFN_UNICODE)
714# TESTFN_ENCODING = sys.getfilesystemencoding()
715#
716# # TESTFN_UNENCODABLE is a filename (str type) that should *not* be able to be
717# # encoded by the filesystem encoding (in strict mode). It can be None if we
718# # cannot generate such filename.
719# TESTFN_UNENCODABLE = None
720# if os.name in ('nt', 'ce'):
721#     # skip win32s (0) or Windows 9x/ME (1)
722#     if sys.getwindowsversion().platform >= 2:
723#         # Different kinds of characters from various languages to minimize the
724#         # probability that the whole name is encodable to MBCS (issue #9819)
725#         TESTFN_UNENCODABLE = TESTFN + "-\u5171\u0141\u2661\u0363\uDC80"
726#         try:
727#             TESTFN_UNENCODABLE.encode(TESTFN_ENCODING)
728#         except UnicodeEncodeError:
729#             pass
730#         else:
731#             print('WARNING: The filename %r CAN be encoded by the filesystem encoding (%s). '
732#                   'Unicode filename tests may not be effective'
733#                   % (TESTFN_UNENCODABLE, TESTFN_ENCODING))
734#             TESTFN_UNENCODABLE = None
735# # Mac OS X denies unencodable filenames (invalid utf-8)
736# elif sys.platform != 'darwin':
737#     try:
738#         # ascii and utf-8 cannot encode the byte 0xff
739#         b'\xff'.decode(TESTFN_ENCODING)
740#     except UnicodeDecodeError:
741#         # 0xff will be encoded using the surrogate character u+DCFF
742#         TESTFN_UNENCODABLE = TESTFN \
743#             + b'-\xff'.decode(TESTFN_ENCODING, 'surrogateescape')
744#     else:
745#         # File system encoding (eg. ISO-8859-* encodings) can encode
746#         # the byte 0xff. Skip some unicode filename tests.
747#         pass
748#
749# # TESTFN_UNDECODABLE is a filename (bytes type) that should *not* be able to be
750# # decoded from the filesystem encoding (in strict mode). It can be None if we
751# # cannot generate such filename (ex: the latin1 encoding can decode any byte
752# # sequence). On UNIX, TESTFN_UNDECODABLE can be decoded by os.fsdecode() thanks
753# # to the surrogateescape error handler (PEP 383), but not from the filesystem
754# # encoding in strict mode.
755# TESTFN_UNDECODABLE = None
756# for name in (
757#     # b'\xff' is not decodable by os.fsdecode() with code page 932. Windows
758#     # accepts it to create a file or a directory, or don't accept to enter to
759#     # such directory (when the bytes name is used). So test b'\xe7' first: it is
760#     # not decodable from cp932.
761#     b'\xe7w\xf0',
762#     # undecodable from ASCII, UTF-8
763#     b'\xff',
764#     # undecodable from iso8859-3, iso8859-6, iso8859-7, cp424, iso8859-8, cp856
765#     # and cp857
766#     b'\xae\xd5'
767#     # undecodable from UTF-8 (UNIX and Mac OS X)
768#     b'\xed\xb2\x80', b'\xed\xb4\x80',
769#     # undecodable from shift_jis, cp869, cp874, cp932, cp1250, cp1251, cp1252,
770#     # cp1253, cp1254, cp1255, cp1257, cp1258
771#     b'\x81\x98',
772# ):
773#     try:
774#         name.decode(TESTFN_ENCODING)
775#     except UnicodeDecodeError:
776#         TESTFN_UNDECODABLE = os.fsencode(TESTFN) + name
777#         break
778#
779# if FS_NONASCII:
780#     TESTFN_NONASCII = TESTFN + '-' + FS_NONASCII
781# else:
782#     TESTFN_NONASCII = None
783
784# Save the initial cwd
785SAVEDCWD = os.getcwd()
786
787@contextlib.contextmanager
788def temp_cwd(name='tempcwd', quiet=False, path=None):
789    """
790    Context manager that temporarily changes the CWD.
791
792    An existing path may be provided as *path*, in which case this
793    function makes no changes to the file system.
794
795    Otherwise, the new CWD is created in the current directory and it's
796    named *name*. If *quiet* is False (default) and it's not possible to
797    create or change the CWD, an error is raised.  If it's True, only a
798    warning is raised and the original CWD is used.
799    """
800    saved_dir = os.getcwd()
801    is_temporary = False
802    if path is None:
803        path = name
804        try:
805            os.mkdir(name)
806            is_temporary = True
807        except OSError:
808            if not quiet:
809                raise
810            warnings.warn('tests may fail, unable to create temp CWD ' + name,
811                          RuntimeWarning, stacklevel=3)
812    try:
813        os.chdir(path)
814    except OSError:
815        if not quiet:
816            raise
817        warnings.warn('tests may fail, unable to change the CWD to ' + path,
818                      RuntimeWarning, stacklevel=3)
819    try:
820        yield os.getcwd()
821    finally:
822        os.chdir(saved_dir)
823        if is_temporary:
824            rmtree(name)
825
826
827if hasattr(os, "umask"):
828    @contextlib.contextmanager
829    def temp_umask(umask):
830        """Context manager that temporarily sets the process umask."""
831        oldmask = os.umask(umask)
832        try:
833            yield
834        finally:
835            os.umask(oldmask)
836
837
838def findfile(file, here=__file__, subdir=None):
839    """Try to find a file on sys.path and the working directory.  If it is not
840    found the argument passed to the function is returned (this does not
841    necessarily signal failure; could still be the legitimate path)."""
842    if os.path.isabs(file):
843        return file
844    if subdir is not None:
845        file = os.path.join(subdir, file)
846    path = sys.path
847    path = [os.path.dirname(here)] + path
848    for dn in path:
849        fn = os.path.join(dn, file)
850        if os.path.exists(fn): return fn
851    return file
852
853def create_empty_file(filename):
854    """Create an empty file. If the file already exists, truncate it."""
855    fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
856    os.close(fd)
857
858def sortdict(dict):
859    "Like repr(dict), but in sorted order."
860    items = sorted(dict.items())
861    reprpairs = ["%r: %r" % pair for pair in items]
862    withcommas = ", ".join(reprpairs)
863    return "{%s}" % withcommas
864
865def make_bad_fd():
866    """
867    Create an invalid file descriptor by opening and closing a file and return
868    its fd.
869    """
870    file = open(TESTFN, "wb")
871    try:
872        return file.fileno()
873    finally:
874        file.close()
875        unlink(TESTFN)
876
877def check_syntax_error(testcase, statement):
878    testcase.assertRaises(SyntaxError, compile, statement,
879                          '<test string>', 'exec')
880
881def open_urlresource(url, *args, **kw):
882    from future.backports.urllib import (request as urllib_request,
883                                         parse as urllib_parse)
884
885    check = kw.pop('check', None)
886
887    filename = urllib_parse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
888
889    fn = os.path.join(os.path.dirname(__file__), "data", filename)
890
891    def check_valid_file(fn):
892        f = open(fn, *args, **kw)
893        if check is None:
894            return f
895        elif check(f):
896            f.seek(0)
897            return f
898        f.close()
899
900    if os.path.exists(fn):
901        f = check_valid_file(fn)
902        if f is not None:
903            return f
904        unlink(fn)
905
906    # Verify the requirement before downloading the file
907    requires('urlfetch')
908
909    print('\tfetching %s ...' % url, file=get_original_stdout())
910    f = urllib_request.urlopen(url, timeout=15)
911    try:
912        with open(fn, "wb") as out:
913            s = f.read()
914            while s:
915                out.write(s)
916                s = f.read()
917    finally:
918        f.close()
919
920    f = check_valid_file(fn)
921    if f is not None:
922        return f
923    raise TestFailed('invalid resource %r' % fn)
924
925
926class WarningsRecorder(object):
927    """Convenience wrapper for the warnings list returned on
928       entry to the warnings.catch_warnings() context manager.
929    """
930    def __init__(self, warnings_list):
931        self._warnings = warnings_list
932        self._last = 0
933
934    def __getattr__(self, attr):
935        if len(self._warnings) > self._last:
936            return getattr(self._warnings[-1], attr)
937        elif attr in warnings.WarningMessage._WARNING_DETAILS:
938            return None
939        raise AttributeError("%r has no attribute %r" % (self, attr))
940
941    @property
942    def warnings(self):
943        return self._warnings[self._last:]
944
945    def reset(self):
946        self._last = len(self._warnings)
947
948
949def _filterwarnings(filters, quiet=False):
950    """Catch the warnings, then check if all the expected
951    warnings have been raised and re-raise unexpected warnings.
952    If 'quiet' is True, only re-raise the unexpected warnings.
953    """
954    # Clear the warning registry of the calling module
955    # in order to re-raise the warnings.
956    frame = sys._getframe(2)
957    registry = frame.f_globals.get('__warningregistry__')
958    if registry:
959        if utils.PY3:
960            registry.clear()
961        else:
962            # Py2-compatible:
963            for i in range(len(registry)):
964                registry.pop()
965    with warnings.catch_warnings(record=True) as w:
966        # Set filter "always" to record all warnings.  Because
967        # test_warnings swap the module, we need to look up in
968        # the sys.modules dictionary.
969        sys.modules['warnings'].simplefilter("always")
970        yield WarningsRecorder(w)
971    # Filter the recorded warnings
972    reraise = list(w)
973    missing = []
974    for msg, cat in filters:
975        seen = False
976        for w in reraise[:]:
977            warning = w.message
978            # Filter out the matching messages
979            if (re.match(msg, str(warning), re.I) and
980                issubclass(warning.__class__, cat)):
981                seen = True
982                reraise.remove(w)
983        if not seen and not quiet:
984            # This filter caught nothing
985            missing.append((msg, cat.__name__))
986    if reraise:
987        raise AssertionError("unhandled warning %s" % reraise[0])
988    if missing:
989        raise AssertionError("filter (%r, %s) did not catch any warning" %
990                             missing[0])
991
992
993@contextlib.contextmanager
994def check_warnings(*filters, **kwargs):
995    """Context manager to silence warnings.
996
997    Accept 2-tuples as positional arguments:
998        ("message regexp", WarningCategory)
999
1000    Optional argument:
1001     - if 'quiet' is True, it does not fail if a filter catches nothing
1002        (default True without argument,
1003         default False if some filters are defined)
1004
1005    Without argument, it defaults to:
1006        check_warnings(("", Warning), quiet=True)
1007    """
1008    quiet = kwargs.get('quiet')
1009    if not filters:
1010        filters = (("", Warning),)
1011        # Preserve backward compatibility
1012        if quiet is None:
1013            quiet = True
1014    return _filterwarnings(filters, quiet)
1015
1016
1017class CleanImport(object):
1018    """Context manager to force import to return a new module reference.
1019
1020    This is useful for testing module-level behaviours, such as
1021    the emission of a DeprecationWarning on import.
1022
1023    Use like this:
1024
1025        with CleanImport("foo"):
1026            importlib.import_module("foo") # new reference
1027    """
1028
1029    def __init__(self, *module_names):
1030        self.original_modules = sys.modules.copy()
1031        for module_name in module_names:
1032            if module_name in sys.modules:
1033                module = sys.modules[module_name]
1034                # It is possible that module_name is just an alias for
1035                # another module (e.g. stub for modules renamed in 3.x).
1036                # In that case, we also need delete the real module to clear
1037                # the import cache.
1038                if module.__name__ != module_name:
1039                    del sys.modules[module.__name__]
1040                del sys.modules[module_name]
1041
1042    def __enter__(self):
1043        return self
1044
1045    def __exit__(self, *ignore_exc):
1046        sys.modules.update(self.original_modules)
1047
1048### Added for python-future:
1049if utils.PY3:
1050    import collections.abc
1051    mybase = collections.abc.MutableMapping
1052else:
1053    import UserDict
1054    mybase = UserDict.DictMixin
1055###
1056
1057class EnvironmentVarGuard(mybase):
1058
1059    """Class to help protect the environment variable properly.  Can be used as
1060    a context manager."""
1061
1062    def __init__(self):
1063        self._environ = os.environ
1064        self._changed = {}
1065
1066    def __getitem__(self, envvar):
1067        return self._environ[envvar]
1068
1069    def __setitem__(self, envvar, value):
1070        # Remember the initial value on the first access
1071        if envvar not in self._changed:
1072            self._changed[envvar] = self._environ.get(envvar)
1073        self._environ[envvar] = value
1074
1075    def __delitem__(self, envvar):
1076        # Remember the initial value on the first access
1077        if envvar not in self._changed:
1078            self._changed[envvar] = self._environ.get(envvar)
1079        if envvar in self._environ:
1080            del self._environ[envvar]
1081
1082    def keys(self):
1083        return self._environ.keys()
1084
1085    def __iter__(self):
1086        return iter(self._environ)
1087
1088    def __len__(self):
1089        return len(self._environ)
1090
1091    def set(self, envvar, value):
1092        self[envvar] = value
1093
1094    def unset(self, envvar):
1095        del self[envvar]
1096
1097    def __enter__(self):
1098        return self
1099
1100    def __exit__(self, *ignore_exc):
1101        for (k, v) in self._changed.items():
1102            if v is None:
1103                if k in self._environ:
1104                    del self._environ[k]
1105            else:
1106                self._environ[k] = v
1107        os.environ = self._environ
1108
1109
1110class DirsOnSysPath(object):
1111    """Context manager to temporarily add directories to sys.path.
1112
1113    This makes a copy of sys.path, appends any directories given
1114    as positional arguments, then reverts sys.path to the copied
1115    settings when the context ends.
1116
1117    Note that *all* sys.path modifications in the body of the
1118    context manager, including replacement of the object,
1119    will be reverted at the end of the block.
1120    """
1121
1122    def __init__(self, *paths):
1123        self.original_value = sys.path[:]
1124        self.original_object = sys.path
1125        sys.path.extend(paths)
1126
1127    def __enter__(self):
1128        return self
1129
1130    def __exit__(self, *ignore_exc):
1131        sys.path = self.original_object
1132        sys.path[:] = self.original_value
1133
1134
1135class TransientResource(object):
1136
1137    """Raise ResourceDenied if an exception is raised while the context manager
1138    is in effect that matches the specified exception and attributes."""
1139
1140    def __init__(self, exc, **kwargs):
1141        self.exc = exc
1142        self.attrs = kwargs
1143
1144    def __enter__(self):
1145        return self
1146
1147    def __exit__(self, type_=None, value=None, traceback=None):
1148        """If type_ is a subclass of self.exc and value has attributes matching
1149        self.attrs, raise ResourceDenied.  Otherwise let the exception
1150        propagate (if any)."""
1151        if type_ is not None and issubclass(self.exc, type_):
1152            for attr, attr_value in self.attrs.items():
1153                if not hasattr(value, attr):
1154                    break
1155                if getattr(value, attr) != attr_value:
1156                    break
1157            else:
1158                raise ResourceDenied("an optional resource is not available")
1159
1160# Context managers that raise ResourceDenied when various issues
1161# with the Internet connection manifest themselves as exceptions.
1162# XXX deprecate these and use transient_internet() instead
1163time_out = TransientResource(IOError, errno=errno.ETIMEDOUT)
1164socket_peer_reset = TransientResource(socket.error, errno=errno.ECONNRESET)
1165ioerror_peer_reset = TransientResource(IOError, errno=errno.ECONNRESET)
1166
1167
1168@contextlib.contextmanager
1169def transient_internet(resource_name, timeout=30.0, errnos=()):
1170    """Return a context manager that raises ResourceDenied when various issues
1171    with the Internet connection manifest themselves as exceptions."""
1172    default_errnos = [
1173        ('ECONNREFUSED', 111),
1174        ('ECONNRESET', 104),
1175        ('EHOSTUNREACH', 113),
1176        ('ENETUNREACH', 101),
1177        ('ETIMEDOUT', 110),
1178    ]
1179    default_gai_errnos = [
1180        ('EAI_AGAIN', -3),
1181        ('EAI_FAIL', -4),
1182        ('EAI_NONAME', -2),
1183        ('EAI_NODATA', -5),
1184        # Encountered when trying to resolve IPv6-only hostnames
1185        ('WSANO_DATA', 11004),
1186    ]
1187
1188    denied = ResourceDenied("Resource %r is not available" % resource_name)
1189    captured_errnos = errnos
1190    gai_errnos = []
1191    if not captured_errnos:
1192        captured_errnos = [getattr(errno, name, num)
1193                           for (name, num) in default_errnos]
1194        gai_errnos = [getattr(socket, name, num)
1195                      for (name, num) in default_gai_errnos]
1196
1197    def filter_error(err):
1198        n = getattr(err, 'errno', None)
1199        if (isinstance(err, socket.timeout) or
1200            (isinstance(err, socket.gaierror) and n in gai_errnos) or
1201            n in captured_errnos):
1202            if not verbose:
1203                sys.stderr.write(denied.args[0] + "\n")
1204            # Was: raise denied from err
1205            # For Python-Future:
1206            exc = denied
1207            exc.__cause__ = err
1208            raise exc
1209
1210    old_timeout = socket.getdefaulttimeout()
1211    try:
1212        if timeout is not None:
1213            socket.setdefaulttimeout(timeout)
1214        yield
1215    except IOError as err:
1216        # urllib can wrap original socket errors multiple times (!), we must
1217        # unwrap to get at the original error.
1218        while True:
1219            a = err.args
1220            if len(a) >= 1 and isinstance(a[0], IOError):
1221                err = a[0]
1222            # The error can also be wrapped as args[1]:
1223            #    except socket.error as msg:
1224            #        raise IOError('socket error', msg).with_traceback(sys.exc_info()[2])
1225            elif len(a) >= 2 and isinstance(a[1], IOError):
1226                err = a[1]
1227            else:
1228                break
1229        filter_error(err)
1230        raise
1231    # XXX should we catch generic exceptions and look for their
1232    # __cause__ or __context__?
1233    finally:
1234        socket.setdefaulttimeout(old_timeout)
1235
1236
1237@contextlib.contextmanager
1238def captured_output(stream_name):
1239    """Return a context manager used by captured_stdout/stdin/stderr
1240    that temporarily replaces the sys stream *stream_name* with a StringIO."""
1241    import io
1242    orig_stdout = getattr(sys, stream_name)
1243    setattr(sys, stream_name, io.StringIO())
1244    try:
1245        yield getattr(sys, stream_name)
1246    finally:
1247        setattr(sys, stream_name, orig_stdout)
1248
1249def captured_stdout():
1250    """Capture the output of sys.stdout:
1251
1252       with captured_stdout() as s:
1253           print("hello")
1254       self.assertEqual(s.getvalue(), "hello")
1255    """
1256    return captured_output("stdout")
1257
1258def captured_stderr():
1259    return captured_output("stderr")
1260
1261def captured_stdin():
1262    return captured_output("stdin")
1263
1264
1265def gc_collect():
1266    """Force as many objects as possible to be collected.
1267
1268    In non-CPython implementations of Python, this is needed because timely
1269    deallocation is not guaranteed by the garbage collector.  (Even in CPython
1270    this can be the case in case of reference cycles.)  This means that __del__
1271    methods may be called later than expected and weakrefs may remain alive for
1272    longer than expected.  This function tries its best to force all garbage
1273    objects to disappear.
1274    """
1275    gc.collect()
1276    if is_jython:
1277        time.sleep(0.1)
1278    gc.collect()
1279    gc.collect()
1280
1281@contextlib.contextmanager
1282def disable_gc():
1283    have_gc = gc.isenabled()
1284    gc.disable()
1285    try:
1286        yield
1287    finally:
1288        if have_gc:
1289            gc.enable()
1290
1291
1292def python_is_optimized():
1293    """Find if Python was built with optimizations."""
1294    # We don't have sysconfig on Py2.6:
1295    import sysconfig
1296    cflags = sysconfig.get_config_var('PY_CFLAGS') or ''
1297    final_opt = ""
1298    for opt in cflags.split():
1299        if opt.startswith('-O'):
1300            final_opt = opt
1301    return final_opt != '' and final_opt != '-O0'
1302
1303
1304_header = 'nP'
1305_align = '0n'
1306if hasattr(sys, "gettotalrefcount"):
1307    _header = '2P' + _header
1308    _align = '0P'
1309_vheader = _header + 'n'
1310
1311def calcobjsize(fmt):
1312    return struct.calcsize(_header + fmt + _align)
1313
1314def calcvobjsize(fmt):
1315    return struct.calcsize(_vheader + fmt + _align)
1316
1317
1318_TPFLAGS_HAVE_GC = 1<<14
1319_TPFLAGS_HEAPTYPE = 1<<9
1320
1321def check_sizeof(test, o, size):
1322    result = sys.getsizeof(o)
1323    # add GC header size
1324    if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\
1325        ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))):
1326        size += _testcapi.SIZEOF_PYGC_HEAD
1327    msg = 'wrong size for %s: got %d, expected %d' \
1328            % (type(o), result, size)
1329    test.assertEqual(result, size, msg)
1330
1331#=======================================================================
1332# Decorator for running a function in a different locale, correctly resetting
1333# it afterwards.
1334
1335def run_with_locale(catstr, *locales):
1336    def decorator(func):
1337        def inner(*args, **kwds):
1338            try:
1339                import locale
1340                category = getattr(locale, catstr)
1341                orig_locale = locale.setlocale(category)
1342            except AttributeError:
1343                # if the test author gives us an invalid category string
1344                raise
1345            except:
1346                # cannot retrieve original locale, so do nothing
1347                locale = orig_locale = None
1348            else:
1349                for loc in locales:
1350                    try:
1351                        locale.setlocale(category, loc)
1352                        break
1353                    except:
1354                        pass
1355
1356            # now run the function, resetting the locale on exceptions
1357            try:
1358                return func(*args, **kwds)
1359            finally:
1360                if locale and orig_locale:
1361                    locale.setlocale(category, orig_locale)
1362        inner.__name__ = func.__name__
1363        inner.__doc__ = func.__doc__
1364        return inner
1365    return decorator
1366
1367#=======================================================================
1368# Decorator for running a function in a specific timezone, correctly
1369# resetting it afterwards.
1370
1371def run_with_tz(tz):
1372    def decorator(func):
1373        def inner(*args, **kwds):
1374            try:
1375                tzset = time.tzset
1376            except AttributeError:
1377                raise unittest.SkipTest("tzset required")
1378            if 'TZ' in os.environ:
1379                orig_tz = os.environ['TZ']
1380            else:
1381                orig_tz = None
1382            os.environ['TZ'] = tz
1383            tzset()
1384
1385            # now run the function, resetting the tz on exceptions
1386            try:
1387                return func(*args, **kwds)
1388            finally:
1389                if orig_tz is None:
1390                    del os.environ['TZ']
1391                else:
1392                    os.environ['TZ'] = orig_tz
1393                time.tzset()
1394
1395        inner.__name__ = func.__name__
1396        inner.__doc__ = func.__doc__
1397        return inner
1398    return decorator
1399
1400#=======================================================================
1401# Big-memory-test support. Separate from 'resources' because memory use
1402# should be configurable.
1403
1404# Some handy shorthands. Note that these are used for byte-limits as well
1405# as size-limits, in the various bigmem tests
1406_1M = 1024*1024
1407_1G = 1024 * _1M
1408_2G = 2 * _1G
1409_4G = 4 * _1G
1410
1411MAX_Py_ssize_t = sys.maxsize
1412
1413def set_memlimit(limit):
1414    global max_memuse
1415    global real_max_memuse
1416    sizes = {
1417        'k': 1024,
1418        'm': _1M,
1419        'g': _1G,
1420        't': 1024*_1G,
1421    }
1422    m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
1423                 re.IGNORECASE | re.VERBOSE)
1424    if m is None:
1425        raise ValueError('Invalid memory limit %r' % (limit,))
1426    memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
1427    real_max_memuse = memlimit
1428    if memlimit > MAX_Py_ssize_t:
1429        memlimit = MAX_Py_ssize_t
1430    if memlimit < _2G - 1:
1431        raise ValueError('Memory limit %r too low to be useful' % (limit,))
1432    max_memuse = memlimit
1433
1434class _MemoryWatchdog(object):
1435    """An object which periodically watches the process' memory consumption
1436    and prints it out.
1437    """
1438
1439    def __init__(self):
1440        self.procfile = '/proc/{pid}/statm'.format(pid=os.getpid())
1441        self.started = False
1442
1443    def start(self):
1444        try:
1445            f = open(self.procfile, 'r')
1446        except OSError as e:
1447            warnings.warn('/proc not available for stats: {0}'.format(e),
1448                          RuntimeWarning)
1449            sys.stderr.flush()
1450            return
1451
1452        watchdog_script = findfile("memory_watchdog.py")
1453        self.mem_watchdog = subprocess.Popen([sys.executable, watchdog_script],
1454                                             stdin=f, stderr=subprocess.DEVNULL)
1455        f.close()
1456        self.started = True
1457
1458    def stop(self):
1459        if self.started:
1460            self.mem_watchdog.terminate()
1461            self.mem_watchdog.wait()
1462
1463
1464def bigmemtest(size, memuse, dry_run=True):
1465    """Decorator for bigmem tests.
1466
1467    'minsize' is the minimum useful size for the test (in arbitrary,
1468    test-interpreted units.) 'memuse' is the number of 'bytes per size' for
1469    the test, or a good estimate of it.
1470
1471    if 'dry_run' is False, it means the test doesn't support dummy runs
1472    when -M is not specified.
1473    """
1474    def decorator(f):
1475        def wrapper(self):
1476            size = wrapper.size
1477            memuse = wrapper.memuse
1478            if not real_max_memuse:
1479                maxsize = 5147
1480            else:
1481                maxsize = size
1482
1483            if ((real_max_memuse or not dry_run)
1484                and real_max_memuse < maxsize * memuse):
1485                raise unittest.SkipTest(
1486                    "not enough memory: %.1fG minimum needed"
1487                    % (size * memuse / (1024 ** 3)))
1488
1489            if real_max_memuse and verbose:
1490                print()
1491                print(" ... expected peak memory use: {peak:.1f}G"
1492                      .format(peak=size * memuse / (1024 ** 3)))
1493                watchdog = _MemoryWatchdog()
1494                watchdog.start()
1495            else:
1496                watchdog = None
1497
1498            try:
1499                return f(self, maxsize)
1500            finally:
1501                if watchdog:
1502                    watchdog.stop()
1503
1504        wrapper.size = size
1505        wrapper.memuse = memuse
1506        return wrapper
1507    return decorator
1508
1509def bigaddrspacetest(f):
1510    """Decorator for tests that fill the address space."""
1511    def wrapper(self):
1512        if max_memuse < MAX_Py_ssize_t:
1513            if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31:
1514                raise unittest.SkipTest(
1515                    "not enough memory: try a 32-bit build instead")
1516            else:
1517                raise unittest.SkipTest(
1518                    "not enough memory: %.1fG minimum needed"
1519                    % (MAX_Py_ssize_t / (1024 ** 3)))
1520        else:
1521            return f(self)
1522    return wrapper
1523
1524#=======================================================================
1525# unittest integration.
1526
1527class BasicTestRunner(object):
1528    def run(self, test):
1529        result = unittest.TestResult()
1530        test(result)
1531        return result
1532
1533def _id(obj):
1534    return obj
1535
1536def requires_resource(resource):
1537    if resource == 'gui' and not _is_gui_available():
1538        return unittest.skip("resource 'gui' is not available")
1539    if is_resource_enabled(resource):
1540        return _id
1541    else:
1542        return unittest.skip("resource {0!r} is not enabled".format(resource))
1543
1544def cpython_only(test):
1545    """
1546    Decorator for tests only applicable on CPython.
1547    """
1548    return impl_detail(cpython=True)(test)
1549
1550def impl_detail(msg=None, **guards):
1551    if check_impl_detail(**guards):
1552        return _id
1553    if msg is None:
1554        guardnames, default = _parse_guards(guards)
1555        if default:
1556            msg = "implementation detail not available on {0}"
1557        else:
1558            msg = "implementation detail specific to {0}"
1559        guardnames = sorted(guardnames.keys())
1560        msg = msg.format(' or '.join(guardnames))
1561    return unittest.skip(msg)
1562
1563def _parse_guards(guards):
1564    # Returns a tuple ({platform_name: run_me}, default_value)
1565    if not guards:
1566        return ({'cpython': True}, False)
1567    is_true = list(guards.values())[0]
1568    assert list(guards.values()) == [is_true] * len(guards)   # all True or all False
1569    return (guards, not is_true)
1570
1571# Use the following check to guard CPython's implementation-specific tests --
1572# or to run them only on the implementation(s) guarded by the arguments.
1573def check_impl_detail(**guards):
1574    """This function returns True or False depending on the host platform.
1575       Examples:
1576          if check_impl_detail():               # only on CPython (default)
1577          if check_impl_detail(jython=True):    # only on Jython
1578          if check_impl_detail(cpython=False):  # everywhere except on CPython
1579    """
1580    guards, default = _parse_guards(guards)
1581    return guards.get(platform.python_implementation().lower(), default)
1582
1583
1584def no_tracing(func):
1585    """Decorator to temporarily turn off tracing for the duration of a test."""
1586    if not hasattr(sys, 'gettrace'):
1587        return func
1588    else:
1589        @functools.wraps(func)
1590        def wrapper(*args, **kwargs):
1591            original_trace = sys.gettrace()
1592            try:
1593                sys.settrace(None)
1594                return func(*args, **kwargs)
1595            finally:
1596                sys.settrace(original_trace)
1597        return wrapper
1598
1599
1600def refcount_test(test):
1601    """Decorator for tests which involve reference counting.
1602
1603    To start, the decorator does not run the test if is not run by CPython.
1604    After that, any trace function is unset during the test to prevent
1605    unexpected refcounts caused by the trace function.
1606
1607    """
1608    return no_tracing(cpython_only(test))
1609
1610
1611def _filter_suite(suite, pred):
1612    """Recursively filter test cases in a suite based on a predicate."""
1613    newtests = []
1614    for test in suite._tests:
1615        if isinstance(test, unittest.TestSuite):
1616            _filter_suite(test, pred)
1617            newtests.append(test)
1618        else:
1619            if pred(test):
1620                newtests.append(test)
1621    suite._tests = newtests
1622
1623def _run_suite(suite):
1624    """Run tests from a unittest.TestSuite-derived class."""
1625    if verbose:
1626        runner = unittest.TextTestRunner(sys.stdout, verbosity=2,
1627                                         failfast=failfast)
1628    else:
1629        runner = BasicTestRunner()
1630
1631    result = runner.run(suite)
1632    if not result.wasSuccessful():
1633        if len(result.errors) == 1 and not result.failures:
1634            err = result.errors[0][1]
1635        elif len(result.failures) == 1 and not result.errors:
1636            err = result.failures[0][1]
1637        else:
1638            err = "multiple errors occurred"
1639            if not verbose: err += "; run in verbose mode for details"
1640        raise TestFailed(err)
1641
1642
1643def run_unittest(*classes):
1644    """Run tests from unittest.TestCase-derived classes."""
1645    valid_types = (unittest.TestSuite, unittest.TestCase)
1646    suite = unittest.TestSuite()
1647    for cls in classes:
1648        if isinstance(cls, str):
1649            if cls in sys.modules:
1650                suite.addTest(unittest.findTestCases(sys.modules[cls]))
1651            else:
1652                raise ValueError("str arguments must be keys in sys.modules")
1653        elif isinstance(cls, valid_types):
1654            suite.addTest(cls)
1655        else:
1656            suite.addTest(unittest.makeSuite(cls))
1657    def case_pred(test):
1658        if match_tests is None:
1659            return True
1660        for name in test.id().split("."):
1661            if fnmatch.fnmatchcase(name, match_tests):
1662                return True
1663        return False
1664    _filter_suite(suite, case_pred)
1665    _run_suite(suite)
1666
1667# We don't have sysconfig on Py2.6:
1668# #=======================================================================
1669# # Check for the presence of docstrings.
1670#
1671# HAVE_DOCSTRINGS = (check_impl_detail(cpython=False) or
1672#                    sys.platform == 'win32' or
1673#                    sysconfig.get_config_var('WITH_DOC_STRINGS'))
1674#
1675# requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS,
1676#                                           "test requires docstrings")
1677#
1678#
1679# #=======================================================================
1680# doctest driver.
1681
1682def run_doctest(module, verbosity=None, optionflags=0):
1683    """Run doctest on the given module.  Return (#failures, #tests).
1684
1685    If optional argument verbosity is not specified (or is None), pass
1686    support's belief about verbosity on to doctest.  Else doctest's
1687    usual behavior is used (it searches sys.argv for -v).
1688    """
1689
1690    import doctest
1691
1692    if verbosity is None:
1693        verbosity = verbose
1694    else:
1695        verbosity = None
1696
1697    f, t = doctest.testmod(module, verbose=verbosity, optionflags=optionflags)
1698    if f:
1699        raise TestFailed("%d of %d doctests failed" % (f, t))
1700    if verbose:
1701        print('doctest (%s) ... %d tests with zero failures' %
1702              (module.__name__, t))
1703    return f, t
1704
1705
1706#=======================================================================
1707# Support for saving and restoring the imported modules.
1708
1709def modules_setup():
1710    return sys.modules.copy(),
1711
1712def modules_cleanup(oldmodules):
1713    # Encoders/decoders are registered permanently within the internal
1714    # codec cache. If we destroy the corresponding modules their
1715    # globals will be set to None which will trip up the cached functions.
1716    encodings = [(k, v) for k, v in sys.modules.items()
1717                 if k.startswith('encodings.')]
1718    # Was:
1719    # sys.modules.clear()
1720    # Py2-compatible:
1721    for i in range(len(sys.modules)):
1722        sys.modules.pop()
1723
1724    sys.modules.update(encodings)
1725    # XXX: This kind of problem can affect more than just encodings. In particular
1726    # extension modules (such as _ssl) don't cope with reloading properly.
1727    # Really, test modules should be cleaning out the test specific modules they
1728    # know they added (ala test_runpy) rather than relying on this function (as
1729    # test_importhooks and test_pkg do currently).
1730    # Implicitly imported *real* modules should be left alone (see issue 10556).
1731    sys.modules.update(oldmodules)
1732
1733#=======================================================================
1734# Backported versions of threading_setup() and threading_cleanup() which don't refer
1735# to threading._dangling (not available on Py2.7).
1736
1737# Threading support to prevent reporting refleaks when running regrtest.py -R
1738
1739# NOTE: we use thread._count() rather than threading.enumerate() (or the
1740# moral equivalent thereof) because a threading.Thread object is still alive
1741# until its __bootstrap() method has returned, even after it has been
1742# unregistered from the threading module.
1743# thread._count(), on the other hand, only gets decremented *after* the
1744# __bootstrap() method has returned, which gives us reliable reference counts
1745# at the end of a test run.
1746
1747def threading_setup():
1748    if _thread:
1749        return _thread._count(),
1750    else:
1751        return 1,
1752
1753def threading_cleanup(nb_threads):
1754    if not _thread:
1755        return
1756
1757    _MAX_COUNT = 10
1758    for count in range(_MAX_COUNT):
1759        n = _thread._count()
1760        if n == nb_threads:
1761            break
1762        time.sleep(0.1)
1763    # XXX print a warning in case of failure?
1764
1765def reap_threads(func):
1766    """Use this function when threads are being used.  This will
1767    ensure that the threads are cleaned up even when the test fails.
1768    If threading is unavailable this function does nothing.
1769    """
1770    if not _thread:
1771        return func
1772
1773    @functools.wraps(func)
1774    def decorator(*args):
1775        key = threading_setup()
1776        try:
1777            return func(*args)
1778        finally:
1779            threading_cleanup(*key)
1780    return decorator
1781
1782def reap_children():
1783    """Use this function at the end of test_main() whenever sub-processes
1784    are started.  This will help ensure that no extra children (zombies)
1785    stick around to hog resources and create problems when looking
1786    for refleaks.
1787    """
1788
1789    # Reap all our dead child processes so we don't leave zombies around.
1790    # These hog resources and might be causing some of the buildbots to die.
1791    if hasattr(os, 'waitpid'):
1792        any_process = -1
1793        while True:
1794            try:
1795                # This will raise an exception on Windows.  That's ok.
1796                pid, status = os.waitpid(any_process, os.WNOHANG)
1797                if pid == 0:
1798                    break
1799            except:
1800                break
1801
1802@contextlib.contextmanager
1803def swap_attr(obj, attr, new_val):
1804    """Temporary swap out an attribute with a new object.
1805
1806    Usage:
1807        with swap_attr(obj, "attr", 5):
1808            ...
1809
1810        This will set obj.attr to 5 for the duration of the with: block,
1811        restoring the old value at the end of the block. If `attr` doesn't
1812        exist on `obj`, it will be created and then deleted at the end of the
1813        block.
1814    """
1815    if hasattr(obj, attr):
1816        real_val = getattr(obj, attr)
1817        setattr(obj, attr, new_val)
1818        try:
1819            yield
1820        finally:
1821            setattr(obj, attr, real_val)
1822    else:
1823        setattr(obj, attr, new_val)
1824        try:
1825            yield
1826        finally:
1827            delattr(obj, attr)
1828
1829@contextlib.contextmanager
1830def swap_item(obj, item, new_val):
1831    """Temporary swap out an item with a new object.
1832
1833    Usage:
1834        with swap_item(obj, "item", 5):
1835            ...
1836
1837        This will set obj["item"] to 5 for the duration of the with: block,
1838        restoring the old value at the end of the block. If `item` doesn't
1839        exist on `obj`, it will be created and then deleted at the end of the
1840        block.
1841    """
1842    if item in obj:
1843        real_val = obj[item]
1844        obj[item] = new_val
1845        try:
1846            yield
1847        finally:
1848            obj[item] = real_val
1849    else:
1850        obj[item] = new_val
1851        try:
1852            yield
1853        finally:
1854            del obj[item]
1855
1856def strip_python_stderr(stderr):
1857    """Strip the stderr of a Python process from potential debug output
1858    emitted by the interpreter.
1859
1860    This will typically be run on the result of the communicate() method
1861    of a subprocess.Popen object.
1862    """
1863    stderr = re.sub(br"\[\d+ refs\]\r?\n?", b"", stderr).strip()
1864    return stderr
1865
1866def args_from_interpreter_flags():
1867    """Return a list of command-line arguments reproducing the current
1868    settings in sys.flags and sys.warnoptions."""
1869    return subprocess._args_from_interpreter_flags()
1870
1871#============================================================
1872# Support for assertions about logging.
1873#============================================================
1874
1875class TestHandler(logging.handlers.BufferingHandler):
1876    def __init__(self, matcher):
1877        # BufferingHandler takes a "capacity" argument
1878        # so as to know when to flush. As we're overriding
1879        # shouldFlush anyway, we can set a capacity of zero.
1880        # You can call flush() manually to clear out the
1881        # buffer.
1882        logging.handlers.BufferingHandler.__init__(self, 0)
1883        self.matcher = matcher
1884
1885    def shouldFlush(self):
1886        return False
1887
1888    def emit(self, record):
1889        self.format(record)
1890        self.buffer.append(record.__dict__)
1891
1892    def matches(self, **kwargs):
1893        """
1894        Look for a saved dict whose keys/values match the supplied arguments.
1895        """
1896        result = False
1897        for d in self.buffer:
1898            if self.matcher.matches(d, **kwargs):
1899                result = True
1900                break
1901        return result
1902
1903class Matcher(object):
1904
1905    _partial_matches = ('msg', 'message')
1906
1907    def matches(self, d, **kwargs):
1908        """
1909        Try to match a single dict with the supplied arguments.
1910
1911        Keys whose values are strings and which are in self._partial_matches
1912        will be checked for partial (i.e. substring) matches. You can extend
1913        this scheme to (for example) do regular expression matching, etc.
1914        """
1915        result = True
1916        for k in kwargs:
1917            v = kwargs[k]
1918            dv = d.get(k)
1919            if not self.match_value(k, dv, v):
1920                result = False
1921                break
1922        return result
1923
1924    def match_value(self, k, dv, v):
1925        """
1926        Try to match a single stored value (dv) with a supplied value (v).
1927        """
1928        if type(v) != type(dv):
1929            result = False
1930        elif type(dv) is not str or k not in self._partial_matches:
1931            result = (v == dv)
1932        else:
1933            result = dv.find(v) >= 0
1934        return result
1935
1936
1937_can_symlink = None
1938def can_symlink():
1939    global _can_symlink
1940    if _can_symlink is not None:
1941        return _can_symlink
1942    symlink_path = TESTFN + "can_symlink"
1943    try:
1944        os.symlink(TESTFN, symlink_path)
1945        can = True
1946    except (OSError, NotImplementedError, AttributeError):
1947        can = False
1948    else:
1949        os.remove(symlink_path)
1950    _can_symlink = can
1951    return can
1952
1953def skip_unless_symlink(test):
1954    """Skip decorator for tests that require functional symlink"""
1955    ok = can_symlink()
1956    msg = "Requires functional symlink implementation"
1957    return test if ok else unittest.skip(msg)(test)
1958
1959_can_xattr = None
1960def can_xattr():
1961    global _can_xattr
1962    if _can_xattr is not None:
1963        return _can_xattr
1964    if not hasattr(os, "setxattr"):
1965        can = False
1966    else:
1967        tmp_fp, tmp_name = tempfile.mkstemp()
1968        try:
1969            with open(TESTFN, "wb") as fp:
1970                try:
1971                    # TESTFN & tempfile may use different file systems with
1972                    # different capabilities
1973                    os.setxattr(tmp_fp, b"user.test", b"")
1974                    os.setxattr(fp.fileno(), b"user.test", b"")
1975                    # Kernels < 2.6.39 don't respect setxattr flags.
1976                    kernel_version = platform.release()
1977                    m = re.match("2.6.(\d{1,2})", kernel_version)
1978                    can = m is None or int(m.group(1)) >= 39
1979                except OSError:
1980                    can = False
1981        finally:
1982            unlink(TESTFN)
1983            unlink(tmp_name)
1984    _can_xattr = can
1985    return can
1986
1987def skip_unless_xattr(test):
1988    """Skip decorator for tests that require functional extended attributes"""
1989    ok = can_xattr()
1990    msg = "no non-broken extended attribute support"
1991    return test if ok else unittest.skip(msg)(test)
1992
1993
1994if sys.platform.startswith('win'):
1995    @contextlib.contextmanager
1996    def suppress_crash_popup():
1997        """Disable Windows Error Reporting dialogs using SetErrorMode."""
1998        # see http://msdn.microsoft.com/en-us/library/windows/desktop/ms680621%28v=vs.85%29.aspx
1999        # GetErrorMode is not available on Windows XP and Windows Server 2003,
2000        # but SetErrorMode returns the previous value, so we can use that
2001        import ctypes
2002        k32 = ctypes.windll.kernel32
2003        SEM_NOGPFAULTERRORBOX = 0x02
2004        old_error_mode = k32.SetErrorMode(SEM_NOGPFAULTERRORBOX)
2005        k32.SetErrorMode(old_error_mode | SEM_NOGPFAULTERRORBOX)
2006        try:
2007            yield
2008        finally:
2009            k32.SetErrorMode(old_error_mode)
2010else:
2011    # this is a no-op for other platforms
2012    @contextlib.contextmanager
2013    def suppress_crash_popup():
2014        yield
2015
2016
2017def patch(test_instance, object_to_patch, attr_name, new_value):
2018    """Override 'object_to_patch'.'attr_name' with 'new_value'.
2019
2020    Also, add a cleanup procedure to 'test_instance' to restore
2021    'object_to_patch' value for 'attr_name'.
2022    The 'attr_name' should be a valid attribute for 'object_to_patch'.
2023
2024    """
2025    # check that 'attr_name' is a real attribute for 'object_to_patch'
2026    # will raise AttributeError if it does not exist
2027    getattr(object_to_patch, attr_name)
2028
2029    # keep a copy of the old value
2030    attr_is_local = False
2031    try:
2032        old_value = object_to_patch.__dict__[attr_name]
2033    except (AttributeError, KeyError):
2034        old_value = getattr(object_to_patch, attr_name, None)
2035    else:
2036        attr_is_local = True
2037
2038    # restore the value when the test is done
2039    def cleanup():
2040        if attr_is_local:
2041            setattr(object_to_patch, attr_name, old_value)
2042        else:
2043            delattr(object_to_patch, attr_name)
2044
2045    test_instance.addCleanup(cleanup)
2046
2047    # actually override the attribute
2048    setattr(object_to_patch, attr_name, new_value)
2049