1"""Supporting definitions for the Python regression tests."""
2
3if __name__ != 'test.test_support':
4    raise ImportError('test_support must be imported from the test package')
5
6import contextlib
7import errno
8import functools
9import gc
10import socket
11import sys
12import os
13import platform
14import shutil
15import warnings
16import unittest
17import importlib
18import UserDict
19import re
20import time
21import struct
22import sysconfig
23try:
24    import thread
25except ImportError:
26    thread = None
27
28__all__ = ["Error", "TestFailed", "ResourceDenied", "import_module",
29           "verbose", "use_resources", "max_memuse", "record_original_stdout",
30           "get_original_stdout", "unload", "unlink", "rmtree", "forget",
31           "is_resource_enabled", "requires", "find_unused_port", "bind_port",
32           "fcmp", "have_unicode", "is_jython", "TESTFN", "HOST", "FUZZ",
33           "SAVEDCWD", "temp_cwd", "findfile", "sortdict", "check_syntax_error",
34           "open_urlresource", "check_warnings", "check_py3k_warnings",
35           "CleanImport", "EnvironmentVarGuard", "captured_output",
36           "captured_stdout", "TransientResource", "transient_internet",
37           "run_with_locale", "set_memlimit", "bigmemtest", "bigaddrspacetest",
38           "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup",
39           "threading_cleanup", "reap_children", "cpython_only",
40           "check_impl_detail", "get_attribute", "py3k_bytes",
41           "import_fresh_module", "threading_cleanup", "reap_children",
42           "strip_python_stderr"]
43
44class Error(Exception):
45    """Base class for regression test exceptions."""
46
47class TestFailed(Error):
48    """Test failed."""
49
50class ResourceDenied(unittest.SkipTest):
51    """Test skipped because it requested a disallowed resource.
52
53    This is raised when a test calls requires() for a resource that
54    has not been enabled.  It is used to distinguish between expected
55    and unexpected skips.
56    """
57
58@contextlib.contextmanager
59def _ignore_deprecated_imports(ignore=True):
60    """Context manager to suppress package and module deprecation
61    warnings when importing them.
62
63    If ignore is False, this context manager has no effect."""
64    if ignore:
65        with warnings.catch_warnings():
66            warnings.filterwarnings("ignore", ".+ (module|package)",
67                                    DeprecationWarning)
68            yield
69    else:
70        yield
71
72
73def import_module(name, deprecated=False):
74    """Import and return the module to be tested, raising SkipTest if
75    it is not available.
76
77    If deprecated is True, any module or package deprecation messages
78    will be suppressed."""
79    with _ignore_deprecated_imports(deprecated):
80        try:
81            return importlib.import_module(name)
82        except ImportError, msg:
83            raise unittest.SkipTest(str(msg))
84
85
86def _save_and_remove_module(name, orig_modules):
87    """Helper function to save and remove a module from sys.modules
88
89       Raise ImportError if the module can't be imported."""
90    # try to import the module and raise an error if it can't be imported
91    if name not in sys.modules:
92        __import__(name)
93        del sys.modules[name]
94    for modname in list(sys.modules):
95        if modname == name or modname.startswith(name + '.'):
96            orig_modules[modname] = sys.modules[modname]
97            del sys.modules[modname]
98
99def _save_and_block_module(name, orig_modules):
100    """Helper function to save and block a module in sys.modules
101
102       Return True if the module was in sys.modules, False otherwise."""
103    saved = True
104    try:
105        orig_modules[name] = sys.modules[name]
106    except KeyError:
107        saved = False
108    sys.modules[name] = None
109    return saved
110
111
112def import_fresh_module(name, fresh=(), blocked=(), deprecated=False):
113    """Imports and returns a module, deliberately bypassing the sys.modules cache
114    and importing a fresh copy of the module. Once the import is complete,
115    the sys.modules cache is restored to its original state.
116
117    Modules named in fresh are also imported anew if needed by the import.
118    If one of these modules can't be imported, None is returned.
119
120    Importing of modules named in blocked is prevented while the fresh import
121    takes place.
122
123    If deprecated is True, any module or package deprecation messages
124    will be suppressed."""
125    # NOTE: test_heapq, test_json, and test_warnings include extra sanity
126    # checks to make sure that this utility function is working as expected
127    with _ignore_deprecated_imports(deprecated):
128        # Keep track of modules saved for later restoration as well
129        # as those which just need a blocking entry removed
130        orig_modules = {}
131        names_to_remove = []
132        _save_and_remove_module(name, orig_modules)
133        try:
134            for fresh_name in fresh:
135                _save_and_remove_module(fresh_name, orig_modules)
136            for blocked_name in blocked:
137                if not _save_and_block_module(blocked_name, orig_modules):
138                    names_to_remove.append(blocked_name)
139            fresh_module = importlib.import_module(name)
140        except ImportError:
141            fresh_module = None
142        finally:
143            for orig_name, module in orig_modules.items():
144                sys.modules[orig_name] = module
145            for name_to_remove in names_to_remove:
146                del sys.modules[name_to_remove]
147        return fresh_module
148
149
150def get_attribute(obj, name):
151    """Get an attribute, raising SkipTest if AttributeError is raised."""
152    try:
153        attribute = getattr(obj, name)
154    except AttributeError:
155        raise unittest.SkipTest("module %s has no attribute %s" % (
156            obj.__name__, name))
157    else:
158        return attribute
159
160
161verbose = 1              # Flag set to 0 by regrtest.py
162use_resources = None     # Flag set to [] by regrtest.py
163max_memuse = 0           # Disable bigmem tests (they will still be run with
164                         # small sizes, to make sure they work.)
165real_max_memuse = 0
166
167# _original_stdout is meant to hold stdout at the time regrtest began.
168# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
169# The point is to have some flavor of stdout the user can actually see.
170_original_stdout = None
171def record_original_stdout(stdout):
172    global _original_stdout
173    _original_stdout = stdout
174
175def get_original_stdout():
176    return _original_stdout or sys.stdout
177
178def unload(name):
179    try:
180        del sys.modules[name]
181    except KeyError:
182        pass
183
184if sys.platform.startswith("win"):
185    def _waitfor(func, pathname, waitall=False):
186        # Perform the operation
187        func(pathname)
188        # Now setup the wait loop
189        if waitall:
190            dirname = pathname
191        else:
192            dirname, name = os.path.split(pathname)
193            dirname = dirname or '.'
194        # Check for `pathname` to be removed from the filesystem.
195        # The exponential backoff of the timeout amounts to a total
196        # of ~1 second after which the deletion is probably an error
197        # anyway.
198        # Testing on a i7@4.3GHz shows that usually only 1 iteration is
199        # required when contention occurs.
200        timeout = 0.001
201        while timeout < 1.0:
202            # Note we are only testing for the existence of the file(s) in
203            # the contents of the directory regardless of any security or
204            # access rights.  If we have made it this far, we have sufficient
205            # permissions to do that much using Python's equivalent of the
206            # Windows API FindFirstFile.
207            # Other Windows APIs can fail or give incorrect results when
208            # dealing with files that are pending deletion.
209            L = os.listdir(dirname)
210            if not (L if waitall else name in L):
211                return
212            # Increase the timeout and try again
213            time.sleep(timeout)
214            timeout *= 2
215        warnings.warn('tests may fail, delete still pending for ' + pathname,
216                      RuntimeWarning, stacklevel=4)
217
218    def _unlink(filename):
219        _waitfor(os.unlink, filename)
220
221    def _rmdir(dirname):
222        _waitfor(os.rmdir, dirname)
223
224    def _rmtree(path):
225        def _rmtree_inner(path):
226            for name in os.listdir(path):
227                fullname = os.path.join(path, name)
228                if os.path.isdir(fullname):
229                    _waitfor(_rmtree_inner, fullname, waitall=True)
230                    os.rmdir(fullname)
231                else:
232                    os.unlink(fullname)
233        _waitfor(_rmtree_inner, path, waitall=True)
234        _waitfor(os.rmdir, path)
235else:
236    _unlink = os.unlink
237    _rmdir = os.rmdir
238    _rmtree = shutil.rmtree
239
240def unlink(filename):
241    try:
242        _unlink(filename)
243    except OSError:
244        pass
245
246def rmdir(dirname):
247    try:
248        _rmdir(dirname)
249    except OSError as error:
250        # The directory need not exist.
251        if error.errno != errno.ENOENT:
252            raise
253
254def rmtree(path):
255    try:
256        _rmtree(path)
257    except OSError, e:
258        # Unix returns ENOENT, Windows returns ESRCH.
259        if e.errno not in (errno.ENOENT, errno.ESRCH):
260            raise
261
262def forget(modname):
263    '''"Forget" a module was ever imported by removing it from sys.modules and
264    deleting any .pyc and .pyo files.'''
265    unload(modname)
266    for dirname in sys.path:
267        unlink(os.path.join(dirname, modname + os.extsep + 'pyc'))
268        # Deleting the .pyo file cannot be within the 'try' for the .pyc since
269        # the chance exists that there is no .pyc (and thus the 'try' statement
270        # is exited) but there is a .pyo file.
271        unlink(os.path.join(dirname, modname + os.extsep + 'pyo'))
272
273# Check whether a gui is actually available
274def _is_gui_available():
275    if hasattr(_is_gui_available, 'result'):
276        return _is_gui_available.result
277    reason = None
278    if sys.platform.startswith('win'):
279        # if Python is running as a service (such as the buildbot service),
280        # gui interaction may be disallowed
281        import ctypes
282        import ctypes.wintypes
283        UOI_FLAGS = 1
284        WSF_VISIBLE = 0x0001
285        class USEROBJECTFLAGS(ctypes.Structure):
286            _fields_ = [("fInherit", ctypes.wintypes.BOOL),
287                        ("fReserved", ctypes.wintypes.BOOL),
288                        ("dwFlags", ctypes.wintypes.DWORD)]
289        dll = ctypes.windll.user32
290        h = dll.GetProcessWindowStation()
291        if not h:
292            raise ctypes.WinError()
293        uof = USEROBJECTFLAGS()
294        needed = ctypes.wintypes.DWORD()
295        res = dll.GetUserObjectInformationW(h,
296            UOI_FLAGS,
297            ctypes.byref(uof),
298            ctypes.sizeof(uof),
299            ctypes.byref(needed))
300        if not res:
301            raise ctypes.WinError()
302        if not bool(uof.dwFlags & WSF_VISIBLE):
303            reason = "gui not available (WSF_VISIBLE flag not set)"
304    elif sys.platform == 'darwin':
305        # The Aqua Tk implementations on OS X can abort the process if
306        # being called in an environment where a window server connection
307        # cannot be made, for instance when invoked by a buildbot or ssh
308        # process not running under the same user id as the current console
309        # user.  To avoid that, raise an exception if the window manager
310        # connection is not available.
311        from ctypes import cdll, c_int, pointer, Structure
312        from ctypes.util import find_library
313
314        app_services = cdll.LoadLibrary(find_library("ApplicationServices"))
315
316        if app_services.CGMainDisplayID() == 0:
317            reason = "gui tests cannot run without OS X window manager"
318        else:
319            class ProcessSerialNumber(Structure):
320                _fields_ = [("highLongOfPSN", c_int),
321                            ("lowLongOfPSN", c_int)]
322            psn = ProcessSerialNumber()
323            psn_p = pointer(psn)
324            if (  (app_services.GetCurrentProcess(psn_p) < 0) or
325                  (app_services.SetFrontProcess(psn_p) < 0) ):
326                reason = "cannot run without OS X gui process"
327
328    # check on every platform whether tkinter can actually do anything
329    if not reason:
330        try:
331            from Tkinter import Tk
332            root = Tk()
333            root.destroy()
334        except Exception as e:
335            err_string = str(e)
336            if len(err_string) > 50:
337                err_string = err_string[:50] + ' [...]'
338            reason = 'Tk unavailable due to {}: {}'.format(type(e).__name__,
339                                                           err_string)
340
341    _is_gui_available.reason = reason
342    _is_gui_available.result = not reason
343
344    return _is_gui_available.result
345
346def is_resource_enabled(resource):
347    """Test whether a resource is enabled.  Known resources are set by
348    regrtest.py."""
349    return use_resources is not None and resource in use_resources
350
351def requires(resource, msg=None):
352    """Raise ResourceDenied if the specified resource is not available.
353
354    If the caller's module is __main__ then automatically return True.  The
355    possibility of False being returned occurs when regrtest.py is executing."""
356    if resource == 'gui' and not _is_gui_available():
357        raise ResourceDenied(_is_gui_available.reason)
358    # see if the caller's module is __main__ - if so, treat as if
359    # the resource was set
360    if sys._getframe(1).f_globals.get("__name__") == "__main__":
361        return
362    if not is_resource_enabled(resource):
363        if msg is None:
364            msg = "Use of the `%s' resource not enabled" % resource
365        raise ResourceDenied(msg)
366
367
368# Don't use "localhost", since resolving it uses the DNS under recent
369# Windows versions (see issue #18792).
370HOST = "127.0.0.1"
371HOSTv6 = "::1"
372
373
374def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
375    """Returns an unused port that should be suitable for binding.  This is
376    achieved by creating a temporary socket with the same family and type as
377    the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
378    the specified host address (defaults to 0.0.0.0) with the port set to 0,
379    eliciting an unused ephemeral port from the OS.  The temporary socket is
380    then closed and deleted, and the ephemeral port is returned.
381
382    Either this method or bind_port() should be used for any tests where a
383    server socket needs to be bound to a particular port for the duration of
384    the test.  Which one to use depends on whether the calling code is creating
385    a python socket, or if an unused port needs to be provided in a constructor
386    or passed to an external program (i.e. the -accept argument to openssl's
387    s_server mode).  Always prefer bind_port() over find_unused_port() where
388    possible.  Hard coded ports should *NEVER* be used.  As soon as a server
389    socket is bound to a hard coded port, the ability to run multiple instances
390    of the test simultaneously on the same host is compromised, which makes the
391    test a ticking time bomb in a buildbot environment. On Unix buildbots, this
392    may simply manifest as a failed test, which can be recovered from without
393    intervention in most cases, but on Windows, the entire python process can
394    completely and utterly wedge, requiring someone to log in to the buildbot
395    and manually kill the affected process.
396
397    (This is easy to reproduce on Windows, unfortunately, and can be traced to
398    the SO_REUSEADDR socket option having different semantics on Windows versus
399    Unix/Linux.  On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
400    listen and then accept connections on identical host/ports.  An EADDRINUSE
401    socket.error will be raised at some point (depending on the platform and
402    the order bind and listen were called on each socket).
403
404    However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
405    will ever be raised when attempting to bind two identical host/ports. When
406    accept() is called on each socket, the second caller's process will steal
407    the port from the first caller, leaving them both in an awkwardly wedged
408    state where they'll no longer respond to any signals or graceful kills, and
409    must be forcibly killed via OpenProcess()/TerminateProcess().
410
411    The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
412    instead of SO_REUSEADDR, which effectively affords the same semantics as
413    SO_REUSEADDR on Unix.  Given the propensity of Unix developers in the Open
414    Source world compared to Windows ones, this is a common mistake.  A quick
415    look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
416    openssl.exe is called with the 's_server' option, for example. See
417    http://bugs.python.org/issue2550 for more info.  The following site also
418    has a very thorough description about the implications of both REUSEADDR
419    and EXCLUSIVEADDRUSE on Windows:
420    http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx)
421
422    XXX: although this approach is a vast improvement on previous attempts to
423    elicit unused ports, it rests heavily on the assumption that the ephemeral
424    port returned to us by the OS won't immediately be dished back out to some
425    other process when we close and delete our temporary socket but before our
426    calling code has a chance to bind the returned port.  We can deal with this
427    issue if/when we come across it."""
428    tempsock = socket.socket(family, socktype)
429    port = bind_port(tempsock)
430    tempsock.close()
431    del tempsock
432    return port
433
434def bind_port(sock, host=HOST):
435    """Bind the socket to a free port and return the port number.  Relies on
436    ephemeral ports in order to ensure we are using an unbound port.  This is
437    important as many tests may be running simultaneously, especially in a
438    buildbot environment.  This method raises an exception if the sock.family
439    is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
440    or SO_REUSEPORT set on it.  Tests should *never* set these socket options
441    for TCP/IP sockets.  The only case for setting these options is testing
442    multicasting via multiple UDP sockets.
443
444    Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
445    on Windows), it will be set on the socket.  This will prevent anyone else
446    from bind()'ing to our host/port for the duration of the test.
447    """
448    if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
449        if hasattr(socket, 'SO_REUSEADDR'):
450            if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
451                raise TestFailed("tests should never set the SO_REUSEADDR "   \
452                                 "socket option on TCP/IP sockets!")
453        if hasattr(socket, 'SO_REUSEPORT'):
454            try:
455                if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
456                    raise TestFailed("tests should never set the SO_REUSEPORT "   \
457                                     "socket option on TCP/IP sockets!")
458            except EnvironmentError:
459                # Python's socket module was compiled using modern headers
460                # thus defining SO_REUSEPORT but this process is running
461                # under an older kernel that does not support SO_REUSEPORT.
462                pass
463        if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
464            sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
465
466    sock.bind((host, 0))
467    port = sock.getsockname()[1]
468    return port
469
470FUZZ = 1e-6
471
472def fcmp(x, y): # fuzzy comparison function
473    if isinstance(x, float) or isinstance(y, float):
474        try:
475            fuzz = (abs(x) + abs(y)) * FUZZ
476            if abs(x-y) <= fuzz:
477                return 0
478        except:
479            pass
480    elif type(x) == type(y) and isinstance(x, (tuple, list)):
481        for i in range(min(len(x), len(y))):
482            outcome = fcmp(x[i], y[i])
483            if outcome != 0:
484                return outcome
485        return (len(x) > len(y)) - (len(x) < len(y))
486    return (x > y) - (x < y)
487
488
489# A constant likely larger than the underlying OS pipe buffer size, to
490# make writes blocking.
491# Windows limit seems to be around 512 B, and many Unix kernels have a
492# 64 KiB pipe buffer size or 16 * PAGE_SIZE: take a few megs to be sure.
493# (see issue #17835 for a discussion of this number).
494PIPE_MAX_SIZE = 4 * 1024 * 1024 + 1
495
496# A constant likely larger than the underlying OS socket buffer size, to make
497# writes blocking.
498# The socket buffer sizes can usually be tuned system-wide (e.g. through sysctl
499# on Linux), or on a per-socket basis (SO_SNDBUF/SO_RCVBUF). See issue #18643
500# for a discussion of this number).
501SOCK_MAX_SIZE = 16 * 1024 * 1024 + 1
502
503try:
504    unicode
505    have_unicode = True
506except NameError:
507    have_unicode = False
508
509is_jython = sys.platform.startswith('java')
510
511# FS_NONASCII: non-ASCII Unicode character encodable by
512# sys.getfilesystemencoding(), or None if there is no such character.
513FS_NONASCII = None
514if have_unicode:
515    for character in (
516        # First try printable and common characters to have a readable filename.
517        # For each character, the encoding list are just example of encodings able
518        # to encode the character (the list is not exhaustive).
519
520        # U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1
521        unichr(0x00E6),
522        # U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3
523        unichr(0x0130),
524        # U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257
525        unichr(0x0141),
526        # U+03C6 (Greek Small Letter Phi): cp1253
527        unichr(0x03C6),
528        # U+041A (Cyrillic Capital Letter Ka): cp1251
529        unichr(0x041A),
530        # U+05D0 (Hebrew Letter Alef): Encodable to cp424
531        unichr(0x05D0),
532        # U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic
533        unichr(0x060C),
534        # U+062A (Arabic Letter Teh): cp720
535        unichr(0x062A),
536        # U+0E01 (Thai Character Ko Kai): cp874
537        unichr(0x0E01),
538
539        # Then try more "special" characters. "special" because they may be
540        # interpreted or displayed differently depending on the exact locale
541        # encoding and the font.
542
543        # U+00A0 (No-Break Space)
544        unichr(0x00A0),
545        # U+20AC (Euro Sign)
546        unichr(0x20AC),
547    ):
548        try:
549            character.encode(sys.getfilesystemencoding())\
550                     .decode(sys.getfilesystemencoding())
551        except UnicodeError:
552            pass
553        else:
554            FS_NONASCII = character
555            break
556
557# Filename used for testing
558if os.name == 'java':
559    # Jython disallows @ in module names
560    TESTFN = '$test'
561elif os.name == 'riscos':
562    TESTFN = 'testfile'
563else:
564    TESTFN = '@test'
565    # Unicode name only used if TEST_FN_ENCODING exists for the platform.
566    if have_unicode:
567        # Assuming sys.getfilesystemencoding()!=sys.getdefaultencoding()
568        # TESTFN_UNICODE is a filename that can be encoded using the
569        # file system encoding, but *not* with the default (ascii) encoding
570        if isinstance('', unicode):
571            # python -U
572            # XXX perhaps unicode() should accept Unicode strings?
573            TESTFN_UNICODE = "@test-\xe0\xf2"
574        else:
575            # 2 latin characters.
576            TESTFN_UNICODE = unicode("@test-\xe0\xf2", "latin-1")
577        TESTFN_ENCODING = sys.getfilesystemencoding()
578        # TESTFN_UNENCODABLE is a filename that should *not* be
579        # able to be encoded by *either* the default or filesystem encoding.
580        # This test really only makes sense on Windows NT platforms
581        # which have special Unicode support in posixmodule.
582        if (not hasattr(sys, "getwindowsversion") or
583                sys.getwindowsversion()[3] < 2): #  0=win32s or 1=9x/ME
584            TESTFN_UNENCODABLE = None
585        else:
586            # Japanese characters (I think - from bug 846133)
587            TESTFN_UNENCODABLE = eval('u"@test-\u5171\u6709\u3055\u308c\u308b"')
588            try:
589                # XXX - Note - should be using TESTFN_ENCODING here - but for
590                # Windows, "mbcs" currently always operates as if in
591                # errors=ignore' mode - hence we get '?' characters rather than
592                # the exception.  'Latin1' operates as we expect - ie, fails.
593                # See [ 850997 ] mbcs encoding ignores errors
594                TESTFN_UNENCODABLE.encode("Latin1")
595            except UnicodeEncodeError:
596                pass
597            else:
598                print \
599                'WARNING: The filename %r CAN be encoded by the filesystem.  ' \
600                'Unicode filename tests may not be effective' \
601                % TESTFN_UNENCODABLE
602
603
604# Disambiguate TESTFN for parallel testing, while letting it remain a valid
605# module name.
606TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid())
607
608# Save the initial cwd
609SAVEDCWD = os.getcwd()
610
611@contextlib.contextmanager
612def temp_cwd(name='tempcwd', quiet=False):
613    """
614    Context manager that creates a temporary directory and set it as CWD.
615
616    The new CWD is created in the current directory and it's named *name*.
617    If *quiet* is False (default) and it's not possible to create or change
618    the CWD, an error is raised.  If it's True, only a warning is raised
619    and the original CWD is used.
620    """
621    if have_unicode and isinstance(name, unicode):
622        try:
623            name = name.encode(sys.getfilesystemencoding() or 'ascii')
624        except UnicodeEncodeError:
625            if not quiet:
626                raise unittest.SkipTest('unable to encode the cwd name with '
627                                        'the filesystem encoding.')
628    saved_dir = os.getcwd()
629    is_temporary = False
630    try:
631        os.mkdir(name)
632        os.chdir(name)
633        is_temporary = True
634    except OSError:
635        if not quiet:
636            raise
637        warnings.warn('tests may fail, unable to change the CWD to ' + name,
638                      RuntimeWarning, stacklevel=3)
639    try:
640        yield os.getcwd()
641    finally:
642        os.chdir(saved_dir)
643        if is_temporary:
644            rmtree(name)
645
646
647def findfile(file, here=__file__, subdir=None):
648    """Try to find a file on sys.path and the working directory.  If it is not
649    found the argument passed to the function is returned (this does not
650    necessarily signal failure; could still be the legitimate path)."""
651    if os.path.isabs(file):
652        return file
653    if subdir is not None:
654        file = os.path.join(subdir, file)
655    path = sys.path
656    path = [os.path.dirname(here)] + path
657    for dn in path:
658        fn = os.path.join(dn, file)
659        if os.path.exists(fn): return fn
660    return file
661
662def sortdict(dict):
663    "Like repr(dict), but in sorted order."
664    items = dict.items()
665    items.sort()
666    reprpairs = ["%r: %r" % pair for pair in items]
667    withcommas = ", ".join(reprpairs)
668    return "{%s}" % withcommas
669
670def make_bad_fd():
671    """
672    Create an invalid file descriptor by opening and closing a file and return
673    its fd.
674    """
675    file = open(TESTFN, "wb")
676    try:
677        return file.fileno()
678    finally:
679        file.close()
680        unlink(TESTFN)
681
682def check_syntax_error(testcase, statement):
683    testcase.assertRaises(SyntaxError, compile, statement,
684                          '<test string>', 'exec')
685
686def open_urlresource(url, check=None):
687    import urlparse, urllib2
688
689    filename = urlparse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
690
691    fn = os.path.join(os.path.dirname(__file__), "data", filename)
692
693    def check_valid_file(fn):
694        f = open(fn)
695        if check is None:
696            return f
697        elif check(f):
698            f.seek(0)
699            return f
700        f.close()
701
702    if os.path.exists(fn):
703        f = check_valid_file(fn)
704        if f is not None:
705            return f
706        unlink(fn)
707
708    # Verify the requirement before downloading the file
709    requires('urlfetch')
710
711    print >> get_original_stdout(), '\tfetching %s ...' % url
712    f = urllib2.urlopen(url, timeout=15)
713    try:
714        with open(fn, "wb") as out:
715            s = f.read()
716            while s:
717                out.write(s)
718                s = f.read()
719    finally:
720        f.close()
721
722    f = check_valid_file(fn)
723    if f is not None:
724        return f
725    raise TestFailed('invalid resource "%s"' % fn)
726
727
728class WarningsRecorder(object):
729    """Convenience wrapper for the warnings list returned on
730       entry to the warnings.catch_warnings() context manager.
731    """
732    def __init__(self, warnings_list):
733        self._warnings = warnings_list
734        self._last = 0
735
736    def __getattr__(self, attr):
737        if len(self._warnings) > self._last:
738            return getattr(self._warnings[-1], attr)
739        elif attr in warnings.WarningMessage._WARNING_DETAILS:
740            return None
741        raise AttributeError("%r has no attribute %r" % (self, attr))
742
743    @property
744    def warnings(self):
745        return self._warnings[self._last:]
746
747    def reset(self):
748        self._last = len(self._warnings)
749
750
751def _filterwarnings(filters, quiet=False):
752    """Catch the warnings, then check if all the expected
753    warnings have been raised and re-raise unexpected warnings.
754    If 'quiet' is True, only re-raise the unexpected warnings.
755    """
756    # Clear the warning registry of the calling module
757    # in order to re-raise the warnings.
758    frame = sys._getframe(2)
759    registry = frame.f_globals.get('__warningregistry__')
760    if registry:
761        registry.clear()
762    with warnings.catch_warnings(record=True) as w:
763        # Set filter "always" to record all warnings.  Because
764        # test_warnings swap the module, we need to look up in
765        # the sys.modules dictionary.
766        sys.modules['warnings'].simplefilter("always")
767        yield WarningsRecorder(w)
768    # Filter the recorded warnings
769    reraise = [warning.message for warning in w]
770    missing = []
771    for msg, cat in filters:
772        seen = False
773        for exc in reraise[:]:
774            message = str(exc)
775            # Filter out the matching messages
776            if (re.match(msg, message, re.I) and
777                issubclass(exc.__class__, cat)):
778                seen = True
779                reraise.remove(exc)
780        if not seen and not quiet:
781            # This filter caught nothing
782            missing.append((msg, cat.__name__))
783    if reraise:
784        raise AssertionError("unhandled warning %r" % reraise[0])
785    if missing:
786        raise AssertionError("filter (%r, %s) did not catch any warning" %
787                             missing[0])
788
789
790@contextlib.contextmanager
791def check_warnings(*filters, **kwargs):
792    """Context manager to silence warnings.
793
794    Accept 2-tuples as positional arguments:
795        ("message regexp", WarningCategory)
796
797    Optional argument:
798     - if 'quiet' is True, it does not fail if a filter catches nothing
799        (default True without argument,
800         default False if some filters are defined)
801
802    Without argument, it defaults to:
803        check_warnings(("", Warning), quiet=True)
804    """
805    quiet = kwargs.get('quiet')
806    if not filters:
807        filters = (("", Warning),)
808        # Preserve backward compatibility
809        if quiet is None:
810            quiet = True
811    return _filterwarnings(filters, quiet)
812
813
814@contextlib.contextmanager
815def check_py3k_warnings(*filters, **kwargs):
816    """Context manager to silence py3k warnings.
817
818    Accept 2-tuples as positional arguments:
819        ("message regexp", WarningCategory)
820
821    Optional argument:
822     - if 'quiet' is True, it does not fail if a filter catches nothing
823        (default False)
824
825    Without argument, it defaults to:
826        check_py3k_warnings(("", DeprecationWarning), quiet=False)
827    """
828    if sys.py3kwarning:
829        if not filters:
830            filters = (("", DeprecationWarning),)
831    else:
832        # It should not raise any py3k warning
833        filters = ()
834    return _filterwarnings(filters, kwargs.get('quiet'))
835
836
837class CleanImport(object):
838    """Context manager to force import to return a new module reference.
839
840    This is useful for testing module-level behaviours, such as
841    the emission of a DeprecationWarning on import.
842
843    Use like this:
844
845        with CleanImport("foo"):
846            importlib.import_module("foo") # new reference
847    """
848
849    def __init__(self, *module_names):
850        self.original_modules = sys.modules.copy()
851        for module_name in module_names:
852            if module_name in sys.modules:
853                module = sys.modules[module_name]
854                # It is possible that module_name is just an alias for
855                # another module (e.g. stub for modules renamed in 3.x).
856                # In that case, we also need delete the real module to clear
857                # the import cache.
858                if module.__name__ != module_name:
859                    del sys.modules[module.__name__]
860                del sys.modules[module_name]
861
862    def __enter__(self):
863        return self
864
865    def __exit__(self, *ignore_exc):
866        sys.modules.update(self.original_modules)
867
868
869class EnvironmentVarGuard(UserDict.DictMixin):
870
871    """Class to help protect the environment variable properly.  Can be used as
872    a context manager."""
873
874    def __init__(self):
875        self._environ = os.environ
876        self._changed = {}
877
878    def __getitem__(self, envvar):
879        return self._environ[envvar]
880
881    def __setitem__(self, envvar, value):
882        # Remember the initial value on the first access
883        if envvar not in self._changed:
884            self._changed[envvar] = self._environ.get(envvar)
885        self._environ[envvar] = value
886
887    def __delitem__(self, envvar):
888        # Remember the initial value on the first access
889        if envvar not in self._changed:
890            self._changed[envvar] = self._environ.get(envvar)
891        if envvar in self._environ:
892            del self._environ[envvar]
893
894    def keys(self):
895        return self._environ.keys()
896
897    def set(self, envvar, value):
898        self[envvar] = value
899
900    def unset(self, envvar):
901        del self[envvar]
902
903    def __enter__(self):
904        return self
905
906    def __exit__(self, *ignore_exc):
907        for (k, v) in self._changed.items():
908            if v is None:
909                if k in self._environ:
910                    del self._environ[k]
911            else:
912                self._environ[k] = v
913        os.environ = self._environ
914
915
916class DirsOnSysPath(object):
917    """Context manager to temporarily add directories to sys.path.
918
919    This makes a copy of sys.path, appends any directories given
920    as positional arguments, then reverts sys.path to the copied
921    settings when the context ends.
922
923    Note that *all* sys.path modifications in the body of the
924    context manager, including replacement of the object,
925    will be reverted at the end of the block.
926    """
927
928    def __init__(self, *paths):
929        self.original_value = sys.path[:]
930        self.original_object = sys.path
931        sys.path.extend(paths)
932
933    def __enter__(self):
934        return self
935
936    def __exit__(self, *ignore_exc):
937        sys.path = self.original_object
938        sys.path[:] = self.original_value
939
940
941class TransientResource(object):
942
943    """Raise ResourceDenied if an exception is raised while the context manager
944    is in effect that matches the specified exception and attributes."""
945
946    def __init__(self, exc, **kwargs):
947        self.exc = exc
948        self.attrs = kwargs
949
950    def __enter__(self):
951        return self
952
953    def __exit__(self, type_=None, value=None, traceback=None):
954        """If type_ is a subclass of self.exc and value has attributes matching
955        self.attrs, raise ResourceDenied.  Otherwise let the exception
956        propagate (if any)."""
957        if type_ is not None and issubclass(self.exc, type_):
958            for attr, attr_value in self.attrs.iteritems():
959                if not hasattr(value, attr):
960                    break
961                if getattr(value, attr) != attr_value:
962                    break
963            else:
964                raise ResourceDenied("an optional resource is not available")
965
966
967@contextlib.contextmanager
968def transient_internet(resource_name, timeout=30.0, errnos=()):
969    """Return a context manager that raises ResourceDenied when various issues
970    with the Internet connection manifest themselves as exceptions."""
971    default_errnos = [
972        ('ECONNREFUSED', 111),
973        ('ECONNRESET', 104),
974        ('EHOSTUNREACH', 113),
975        ('ENETUNREACH', 101),
976        ('ETIMEDOUT', 110),
977    ]
978    default_gai_errnos = [
979        ('EAI_AGAIN', -3),
980        ('EAI_FAIL', -4),
981        ('EAI_NONAME', -2),
982        ('EAI_NODATA', -5),
983        # Windows defines EAI_NODATA as 11001 but idiotic getaddrinfo()
984        # implementation actually returns WSANO_DATA i.e. 11004.
985        ('WSANO_DATA', 11004),
986    ]
987
988    denied = ResourceDenied("Resource '%s' is not available" % resource_name)
989    captured_errnos = errnos
990    gai_errnos = []
991    if not captured_errnos:
992        captured_errnos = [getattr(errno, name, num)
993                           for (name, num) in default_errnos]
994        gai_errnos = [getattr(socket, name, num)
995                      for (name, num) in default_gai_errnos]
996
997    def filter_error(err):
998        n = getattr(err, 'errno', None)
999        if (isinstance(err, socket.timeout) or
1000            (isinstance(err, socket.gaierror) and n in gai_errnos) or
1001            n in captured_errnos):
1002            if not verbose:
1003                sys.stderr.write(denied.args[0] + "\n")
1004            raise denied
1005
1006    old_timeout = socket.getdefaulttimeout()
1007    try:
1008        if timeout is not None:
1009            socket.setdefaulttimeout(timeout)
1010        yield
1011    except IOError as err:
1012        # urllib can wrap original socket errors multiple times (!), we must
1013        # unwrap to get at the original error.
1014        while True:
1015            a = err.args
1016            if len(a) >= 1 and isinstance(a[0], IOError):
1017                err = a[0]
1018            # The error can also be wrapped as args[1]:
1019            #    except socket.error as msg:
1020            #        raise IOError('socket error', msg).with_traceback(sys.exc_info()[2])
1021            elif len(a) >= 2 and isinstance(a[1], IOError):
1022                err = a[1]
1023            else:
1024                break
1025        filter_error(err)
1026        raise
1027    # XXX should we catch generic exceptions and look for their
1028    # __cause__ or __context__?
1029    finally:
1030        socket.setdefaulttimeout(old_timeout)
1031
1032
1033@contextlib.contextmanager
1034def captured_output(stream_name):
1035    """Return a context manager used by captured_stdout and captured_stdin
1036    that temporarily replaces the sys stream *stream_name* with a StringIO."""
1037    import StringIO
1038    orig_stdout = getattr(sys, stream_name)
1039    setattr(sys, stream_name, StringIO.StringIO())
1040    try:
1041        yield getattr(sys, stream_name)
1042    finally:
1043        setattr(sys, stream_name, orig_stdout)
1044
1045def captured_stdout():
1046    """Capture the output of sys.stdout:
1047
1048       with captured_stdout() as s:
1049           print "hello"
1050       self.assertEqual(s.getvalue(), "hello")
1051    """
1052    return captured_output("stdout")
1053
1054def captured_stderr():
1055    return captured_output("stderr")
1056
1057def captured_stdin():
1058    return captured_output("stdin")
1059
1060def gc_collect():
1061    """Force as many objects as possible to be collected.
1062
1063    In non-CPython implementations of Python, this is needed because timely
1064    deallocation is not guaranteed by the garbage collector.  (Even in CPython
1065    this can be the case in case of reference cycles.)  This means that __del__
1066    methods may be called later than expected and weakrefs may remain alive for
1067    longer than expected.  This function tries its best to force all garbage
1068    objects to disappear.
1069    """
1070    gc.collect()
1071    if is_jython:
1072        time.sleep(0.1)
1073    gc.collect()
1074    gc.collect()
1075
1076
1077_header = '2P'
1078if hasattr(sys, "gettotalrefcount"):
1079    _header = '2P' + _header
1080_vheader = _header + 'P'
1081
1082def calcobjsize(fmt):
1083    return struct.calcsize(_header + fmt + '0P')
1084
1085def calcvobjsize(fmt):
1086    return struct.calcsize(_vheader + fmt + '0P')
1087
1088
1089_TPFLAGS_HAVE_GC = 1<<14
1090_TPFLAGS_HEAPTYPE = 1<<9
1091
1092def check_sizeof(test, o, size):
1093    import _testcapi
1094    result = sys.getsizeof(o)
1095    # add GC header size
1096    if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\
1097        ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))):
1098        size += _testcapi.SIZEOF_PYGC_HEAD
1099    msg = 'wrong size for %s: got %d, expected %d' \
1100            % (type(o), result, size)
1101    test.assertEqual(result, size, msg)
1102
1103
1104#=======================================================================
1105# Decorator for running a function in a different locale, correctly resetting
1106# it afterwards.
1107
1108def run_with_locale(catstr, *locales):
1109    def decorator(func):
1110        def inner(*args, **kwds):
1111            try:
1112                import locale
1113                category = getattr(locale, catstr)
1114                orig_locale = locale.setlocale(category)
1115            except AttributeError:
1116                # if the test author gives us an invalid category string
1117                raise
1118            except:
1119                # cannot retrieve original locale, so do nothing
1120                locale = orig_locale = None
1121            else:
1122                for loc in locales:
1123                    try:
1124                        locale.setlocale(category, loc)
1125                        break
1126                    except:
1127                        pass
1128
1129            # now run the function, resetting the locale on exceptions
1130            try:
1131                return func(*args, **kwds)
1132            finally:
1133                if locale and orig_locale:
1134                    locale.setlocale(category, orig_locale)
1135        inner.func_name = func.func_name
1136        inner.__doc__ = func.__doc__
1137        return inner
1138    return decorator
1139
1140#=======================================================================
1141# Big-memory-test support. Separate from 'resources' because memory use should be configurable.
1142
1143# Some handy shorthands. Note that these are used for byte-limits as well
1144# as size-limits, in the various bigmem tests
1145_1M = 1024*1024
1146_1G = 1024 * _1M
1147_2G = 2 * _1G
1148_4G = 4 * _1G
1149
1150MAX_Py_ssize_t = sys.maxsize
1151
1152def set_memlimit(limit):
1153    global max_memuse
1154    global real_max_memuse
1155    sizes = {
1156        'k': 1024,
1157        'm': _1M,
1158        'g': _1G,
1159        't': 1024*_1G,
1160    }
1161    m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
1162                 re.IGNORECASE | re.VERBOSE)
1163    if m is None:
1164        raise ValueError('Invalid memory limit %r' % (limit,))
1165    memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
1166    real_max_memuse = memlimit
1167    if memlimit > MAX_Py_ssize_t:
1168        memlimit = MAX_Py_ssize_t
1169    if memlimit < _2G - 1:
1170        raise ValueError('Memory limit %r too low to be useful' % (limit,))
1171    max_memuse = memlimit
1172
1173def bigmemtest(minsize, memuse, overhead=5*_1M):
1174    """Decorator for bigmem tests.
1175
1176    'minsize' is the minimum useful size for the test (in arbitrary,
1177    test-interpreted units.) 'memuse' is the number of 'bytes per size' for
1178    the test, or a good estimate of it. 'overhead' specifies fixed overhead,
1179    independent of the testsize, and defaults to 5Mb.
1180
1181    The decorator tries to guess a good value for 'size' and passes it to
1182    the decorated test function. If minsize * memuse is more than the
1183    allowed memory use (as defined by max_memuse), the test is skipped.
1184    Otherwise, minsize is adjusted upward to use up to max_memuse.
1185    """
1186    def decorator(f):
1187        def wrapper(self):
1188            if not max_memuse:
1189                # If max_memuse is 0 (the default),
1190                # we still want to run the tests with size set to a few kb,
1191                # to make sure they work. We still want to avoid using
1192                # too much memory, though, but we do that noisily.
1193                maxsize = 5147
1194                self.assertFalse(maxsize * memuse + overhead > 20 * _1M)
1195            else:
1196                maxsize = int((max_memuse - overhead) / memuse)
1197                if maxsize < minsize:
1198                    # Really ought to print 'test skipped' or something
1199                    if verbose:
1200                        sys.stderr.write("Skipping %s because of memory "
1201                                         "constraint\n" % (f.__name__,))
1202                    return
1203                # Try to keep some breathing room in memory use
1204                maxsize = max(maxsize - 50 * _1M, minsize)
1205            return f(self, maxsize)
1206        wrapper.minsize = minsize
1207        wrapper.memuse = memuse
1208        wrapper.overhead = overhead
1209        return wrapper
1210    return decorator
1211
1212def precisionbigmemtest(size, memuse, overhead=5*_1M, dry_run=True):
1213    def decorator(f):
1214        def wrapper(self):
1215            if not real_max_memuse:
1216                maxsize = 5147
1217            else:
1218                maxsize = size
1219
1220            if ((real_max_memuse or not dry_run)
1221                and real_max_memuse < maxsize * memuse):
1222                if verbose:
1223                    sys.stderr.write("Skipping %s because of memory "
1224                                     "constraint\n" % (f.__name__,))
1225                return
1226
1227            return f(self, maxsize)
1228        wrapper.size = size
1229        wrapper.memuse = memuse
1230        wrapper.overhead = overhead
1231        return wrapper
1232    return decorator
1233
1234def bigaddrspacetest(f):
1235    """Decorator for tests that fill the address space."""
1236    def wrapper(self):
1237        if max_memuse < MAX_Py_ssize_t:
1238            if verbose:
1239                sys.stderr.write("Skipping %s because of memory "
1240                                 "constraint\n" % (f.__name__,))
1241        else:
1242            return f(self)
1243    return wrapper
1244
1245#=======================================================================
1246# unittest integration.
1247
1248class BasicTestRunner:
1249    def run(self, test):
1250        result = unittest.TestResult()
1251        test(result)
1252        return result
1253
1254def _id(obj):
1255    return obj
1256
1257def requires_resource(resource):
1258    if resource == 'gui' and not _is_gui_available():
1259        return unittest.skip(_is_gui_available.reason)
1260    if is_resource_enabled(resource):
1261        return _id
1262    else:
1263        return unittest.skip("resource {0!r} is not enabled".format(resource))
1264
1265def cpython_only(test):
1266    """
1267    Decorator for tests only applicable on CPython.
1268    """
1269    return impl_detail(cpython=True)(test)
1270
1271def impl_detail(msg=None, **guards):
1272    if check_impl_detail(**guards):
1273        return _id
1274    if msg is None:
1275        guardnames, default = _parse_guards(guards)
1276        if default:
1277            msg = "implementation detail not available on {0}"
1278        else:
1279            msg = "implementation detail specific to {0}"
1280        guardnames = sorted(guardnames.keys())
1281        msg = msg.format(' or '.join(guardnames))
1282    return unittest.skip(msg)
1283
1284def _parse_guards(guards):
1285    # Returns a tuple ({platform_name: run_me}, default_value)
1286    if not guards:
1287        return ({'cpython': True}, False)
1288    is_true = guards.values()[0]
1289    assert guards.values() == [is_true] * len(guards)   # all True or all False
1290    return (guards, not is_true)
1291
1292# Use the following check to guard CPython's implementation-specific tests --
1293# or to run them only on the implementation(s) guarded by the arguments.
1294def check_impl_detail(**guards):
1295    """This function returns True or False depending on the host platform.
1296       Examples:
1297          if check_impl_detail():               # only on CPython (default)
1298          if check_impl_detail(jython=True):    # only on Jython
1299          if check_impl_detail(cpython=False):  # everywhere except on CPython
1300    """
1301    guards, default = _parse_guards(guards)
1302    return guards.get(platform.python_implementation().lower(), default)
1303
1304
1305
1306def _run_suite(suite):
1307    """Run tests from a unittest.TestSuite-derived class."""
1308    if verbose:
1309        runner = unittest.TextTestRunner(sys.stdout, verbosity=2)
1310    else:
1311        runner = BasicTestRunner()
1312
1313    result = runner.run(suite)
1314    if not result.wasSuccessful():
1315        if len(result.errors) == 1 and not result.failures:
1316            err = result.errors[0][1]
1317        elif len(result.failures) == 1 and not result.errors:
1318            err = result.failures[0][1]
1319        else:
1320            err = "multiple errors occurred"
1321            if not verbose:
1322                err += "; run in verbose mode for details"
1323        raise TestFailed(err)
1324
1325
1326def run_unittest(*classes):
1327    """Run tests from unittest.TestCase-derived classes."""
1328    valid_types = (unittest.TestSuite, unittest.TestCase)
1329    suite = unittest.TestSuite()
1330    for cls in classes:
1331        if isinstance(cls, str):
1332            if cls in sys.modules:
1333                suite.addTest(unittest.findTestCases(sys.modules[cls]))
1334            else:
1335                raise ValueError("str arguments must be keys in sys.modules")
1336        elif isinstance(cls, valid_types):
1337            suite.addTest(cls)
1338        else:
1339            suite.addTest(unittest.makeSuite(cls))
1340    _run_suite(suite)
1341
1342#=======================================================================
1343# Check for the presence of docstrings.
1344
1345HAVE_DOCSTRINGS = (check_impl_detail(cpython=False) or
1346                   sys.platform == 'win32' or
1347                   sysconfig.get_config_var('WITH_DOC_STRINGS'))
1348
1349requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS,
1350                                          "test requires docstrings")
1351
1352
1353#=======================================================================
1354# doctest driver.
1355
1356def run_doctest(module, verbosity=None):
1357    """Run doctest on the given module.  Return (#failures, #tests).
1358
1359    If optional argument verbosity is not specified (or is None), pass
1360    test_support's belief about verbosity on to doctest.  Else doctest's
1361    usual behavior is used (it searches sys.argv for -v).
1362    """
1363
1364    import doctest
1365
1366    if verbosity is None:
1367        verbosity = verbose
1368    else:
1369        verbosity = None
1370
1371    # Direct doctest output (normally just errors) to real stdout; doctest
1372    # output shouldn't be compared by regrtest.
1373    save_stdout = sys.stdout
1374    sys.stdout = get_original_stdout()
1375    try:
1376        f, t = doctest.testmod(module, verbose=verbosity)
1377        if f:
1378            raise TestFailed("%d of %d doctests failed" % (f, t))
1379    finally:
1380        sys.stdout = save_stdout
1381    if verbose:
1382        print 'doctest (%s) ... %d tests with zero failures' % (module.__name__, t)
1383    return f, t
1384
1385#=======================================================================
1386# Threading support to prevent reporting refleaks when running regrtest.py -R
1387
1388# NOTE: we use thread._count() rather than threading.enumerate() (or the
1389# moral equivalent thereof) because a threading.Thread object is still alive
1390# until its __bootstrap() method has returned, even after it has been
1391# unregistered from the threading module.
1392# thread._count(), on the other hand, only gets decremented *after* the
1393# __bootstrap() method has returned, which gives us reliable reference counts
1394# at the end of a test run.
1395
1396def threading_setup():
1397    if thread:
1398        return thread._count(),
1399    else:
1400        return 1,
1401
1402def threading_cleanup(nb_threads):
1403    if not thread:
1404        return
1405
1406    _MAX_COUNT = 10
1407    for count in range(_MAX_COUNT):
1408        n = thread._count()
1409        if n == nb_threads:
1410            break
1411        time.sleep(0.1)
1412    # XXX print a warning in case of failure?
1413
1414def reap_threads(func):
1415    """Use this function when threads are being used.  This will
1416    ensure that the threads are cleaned up even when the test fails.
1417    If threading is unavailable this function does nothing.
1418    """
1419    if not thread:
1420        return func
1421
1422    @functools.wraps(func)
1423    def decorator(*args):
1424        key = threading_setup()
1425        try:
1426            return func(*args)
1427        finally:
1428            threading_cleanup(*key)
1429    return decorator
1430
1431def reap_children():
1432    """Use this function at the end of test_main() whenever sub-processes
1433    are started.  This will help ensure that no extra children (zombies)
1434    stick around to hog resources and create problems when looking
1435    for refleaks.
1436    """
1437
1438    # Reap all our dead child processes so we don't leave zombies around.
1439    # These hog resources and might be causing some of the buildbots to die.
1440    if hasattr(os, 'waitpid'):
1441        any_process = -1
1442        while True:
1443            try:
1444                # This will raise an exception on Windows.  That's ok.
1445                pid, status = os.waitpid(any_process, os.WNOHANG)
1446                if pid == 0:
1447                    break
1448            except:
1449                break
1450
1451@contextlib.contextmanager
1452def swap_attr(obj, attr, new_val):
1453    """Temporary swap out an attribute with a new object.
1454
1455    Usage:
1456        with swap_attr(obj, "attr", 5):
1457            ...
1458
1459        This will set obj.attr to 5 for the duration of the with: block,
1460        restoring the old value at the end of the block. If `attr` doesn't
1461        exist on `obj`, it will be created and then deleted at the end of the
1462        block.
1463    """
1464    if hasattr(obj, attr):
1465        real_val = getattr(obj, attr)
1466        setattr(obj, attr, new_val)
1467        try:
1468            yield
1469        finally:
1470            setattr(obj, attr, real_val)
1471    else:
1472        setattr(obj, attr, new_val)
1473        try:
1474            yield
1475        finally:
1476            delattr(obj, attr)
1477
1478def py3k_bytes(b):
1479    """Emulate the py3k bytes() constructor.
1480
1481    NOTE: This is only a best effort function.
1482    """
1483    try:
1484        # memoryview?
1485        return b.tobytes()
1486    except AttributeError:
1487        try:
1488            # iterable of ints?
1489            return b"".join(chr(x) for x in b)
1490        except TypeError:
1491            return bytes(b)
1492
1493def args_from_interpreter_flags():
1494    """Return a list of command-line arguments reproducing the current
1495    settings in sys.flags."""
1496    import subprocess
1497    return subprocess._args_from_interpreter_flags()
1498
1499def strip_python_stderr(stderr):
1500    """Strip the stderr of a Python process from potential debug output
1501    emitted by the interpreter.
1502
1503    This will typically be run on the result of the communicate() method
1504    of a subprocess.Popen object.
1505    """
1506    stderr = re.sub(br"\[\d+ refs\]\r?\n?$", b"", stderr).strip()
1507    return stderr
1508