1"""Supporting definitions for the Python regression tests.""" 2 3if __name__ != 'test.test_support': 4 raise ImportError('test_support must be imported from the test package') 5 6import contextlib 7import errno 8import functools 9import gc 10import socket 11import sys 12import os 13import platform 14import shutil 15import warnings 16import unittest 17import importlib 18import UserDict 19import re 20import time 21import struct 22import sysconfig 23try: 24 import thread 25except ImportError: 26 thread = None 27 28__all__ = ["Error", "TestFailed", "ResourceDenied", "import_module", 29 "verbose", "use_resources", "max_memuse", "record_original_stdout", 30 "get_original_stdout", "unload", "unlink", "rmtree", "forget", 31 "is_resource_enabled", "requires", "find_unused_port", "bind_port", 32 "fcmp", "have_unicode", "is_jython", "TESTFN", "HOST", "FUZZ", 33 "SAVEDCWD", "temp_cwd", "findfile", "sortdict", "check_syntax_error", 34 "open_urlresource", "check_warnings", "check_py3k_warnings", 35 "CleanImport", "EnvironmentVarGuard", "captured_output", 36 "captured_stdout", "TransientResource", "transient_internet", 37 "run_with_locale", "set_memlimit", "bigmemtest", "bigaddrspacetest", 38 "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup", 39 "threading_cleanup", "reap_children", "cpython_only", 40 "check_impl_detail", "get_attribute", "py3k_bytes", 41 "import_fresh_module", "threading_cleanup", "reap_children", 42 "strip_python_stderr"] 43 44class Error(Exception): 45 """Base class for regression test exceptions.""" 46 47class TestFailed(Error): 48 """Test failed.""" 49 50class ResourceDenied(unittest.SkipTest): 51 """Test skipped because it requested a disallowed resource. 52 53 This is raised when a test calls requires() for a resource that 54 has not been enabled. It is used to distinguish between expected 55 and unexpected skips. 56 """ 57 58@contextlib.contextmanager 59def _ignore_deprecated_imports(ignore=True): 60 """Context manager to suppress package and module deprecation 61 warnings when importing them. 62 63 If ignore is False, this context manager has no effect.""" 64 if ignore: 65 with warnings.catch_warnings(): 66 warnings.filterwarnings("ignore", ".+ (module|package)", 67 DeprecationWarning) 68 yield 69 else: 70 yield 71 72 73def import_module(name, deprecated=False): 74 """Import and return the module to be tested, raising SkipTest if 75 it is not available. 76 77 If deprecated is True, any module or package deprecation messages 78 will be suppressed.""" 79 with _ignore_deprecated_imports(deprecated): 80 try: 81 return importlib.import_module(name) 82 except ImportError, msg: 83 raise unittest.SkipTest(str(msg)) 84 85 86def _save_and_remove_module(name, orig_modules): 87 """Helper function to save and remove a module from sys.modules 88 89 Raise ImportError if the module can't be imported.""" 90 # try to import the module and raise an error if it can't be imported 91 if name not in sys.modules: 92 __import__(name) 93 del sys.modules[name] 94 for modname in list(sys.modules): 95 if modname == name or modname.startswith(name + '.'): 96 orig_modules[modname] = sys.modules[modname] 97 del sys.modules[modname] 98 99def _save_and_block_module(name, orig_modules): 100 """Helper function to save and block a module in sys.modules 101 102 Return True if the module was in sys.modules, False otherwise.""" 103 saved = True 104 try: 105 orig_modules[name] = sys.modules[name] 106 except KeyError: 107 saved = False 108 sys.modules[name] = None 109 return saved 110 111 112def import_fresh_module(name, fresh=(), blocked=(), deprecated=False): 113 """Imports and returns a module, deliberately bypassing the sys.modules cache 114 and importing a fresh copy of the module. Once the import is complete, 115 the sys.modules cache is restored to its original state. 116 117 Modules named in fresh are also imported anew if needed by the import. 118 If one of these modules can't be imported, None is returned. 119 120 Importing of modules named in blocked is prevented while the fresh import 121 takes place. 122 123 If deprecated is True, any module or package deprecation messages 124 will be suppressed.""" 125 # NOTE: test_heapq, test_json, and test_warnings include extra sanity 126 # checks to make sure that this utility function is working as expected 127 with _ignore_deprecated_imports(deprecated): 128 # Keep track of modules saved for later restoration as well 129 # as those which just need a blocking entry removed 130 orig_modules = {} 131 names_to_remove = [] 132 _save_and_remove_module(name, orig_modules) 133 try: 134 for fresh_name in fresh: 135 _save_and_remove_module(fresh_name, orig_modules) 136 for blocked_name in blocked: 137 if not _save_and_block_module(blocked_name, orig_modules): 138 names_to_remove.append(blocked_name) 139 fresh_module = importlib.import_module(name) 140 except ImportError: 141 fresh_module = None 142 finally: 143 for orig_name, module in orig_modules.items(): 144 sys.modules[orig_name] = module 145 for name_to_remove in names_to_remove: 146 del sys.modules[name_to_remove] 147 return fresh_module 148 149 150def get_attribute(obj, name): 151 """Get an attribute, raising SkipTest if AttributeError is raised.""" 152 try: 153 attribute = getattr(obj, name) 154 except AttributeError: 155 raise unittest.SkipTest("module %s has no attribute %s" % ( 156 obj.__name__, name)) 157 else: 158 return attribute 159 160 161verbose = 1 # Flag set to 0 by regrtest.py 162use_resources = None # Flag set to [] by regrtest.py 163max_memuse = 0 # Disable bigmem tests (they will still be run with 164 # small sizes, to make sure they work.) 165real_max_memuse = 0 166 167# _original_stdout is meant to hold stdout at the time regrtest began. 168# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever. 169# The point is to have some flavor of stdout the user can actually see. 170_original_stdout = None 171def record_original_stdout(stdout): 172 global _original_stdout 173 _original_stdout = stdout 174 175def get_original_stdout(): 176 return _original_stdout or sys.stdout 177 178def unload(name): 179 try: 180 del sys.modules[name] 181 except KeyError: 182 pass 183 184if sys.platform.startswith("win"): 185 def _waitfor(func, pathname, waitall=False): 186 # Perform the operation 187 func(pathname) 188 # Now setup the wait loop 189 if waitall: 190 dirname = pathname 191 else: 192 dirname, name = os.path.split(pathname) 193 dirname = dirname or '.' 194 # Check for `pathname` to be removed from the filesystem. 195 # The exponential backoff of the timeout amounts to a total 196 # of ~1 second after which the deletion is probably an error 197 # anyway. 198 # Testing on a i7@4.3GHz shows that usually only 1 iteration is 199 # required when contention occurs. 200 timeout = 0.001 201 while timeout < 1.0: 202 # Note we are only testing for the existence of the file(s) in 203 # the contents of the directory regardless of any security or 204 # access rights. If we have made it this far, we have sufficient 205 # permissions to do that much using Python's equivalent of the 206 # Windows API FindFirstFile. 207 # Other Windows APIs can fail or give incorrect results when 208 # dealing with files that are pending deletion. 209 L = os.listdir(dirname) 210 if not (L if waitall else name in L): 211 return 212 # Increase the timeout and try again 213 time.sleep(timeout) 214 timeout *= 2 215 warnings.warn('tests may fail, delete still pending for ' + pathname, 216 RuntimeWarning, stacklevel=4) 217 218 def _unlink(filename): 219 _waitfor(os.unlink, filename) 220 221 def _rmdir(dirname): 222 _waitfor(os.rmdir, dirname) 223 224 def _rmtree(path): 225 def _rmtree_inner(path): 226 for name in os.listdir(path): 227 fullname = os.path.join(path, name) 228 if os.path.isdir(fullname): 229 _waitfor(_rmtree_inner, fullname, waitall=True) 230 os.rmdir(fullname) 231 else: 232 os.unlink(fullname) 233 _waitfor(_rmtree_inner, path, waitall=True) 234 _waitfor(os.rmdir, path) 235else: 236 _unlink = os.unlink 237 _rmdir = os.rmdir 238 _rmtree = shutil.rmtree 239 240def unlink(filename): 241 try: 242 _unlink(filename) 243 except OSError: 244 pass 245 246def rmdir(dirname): 247 try: 248 _rmdir(dirname) 249 except OSError as error: 250 # The directory need not exist. 251 if error.errno != errno.ENOENT: 252 raise 253 254def rmtree(path): 255 try: 256 _rmtree(path) 257 except OSError, e: 258 # Unix returns ENOENT, Windows returns ESRCH. 259 if e.errno not in (errno.ENOENT, errno.ESRCH): 260 raise 261 262def forget(modname): 263 '''"Forget" a module was ever imported by removing it from sys.modules and 264 deleting any .pyc and .pyo files.''' 265 unload(modname) 266 for dirname in sys.path: 267 unlink(os.path.join(dirname, modname + os.extsep + 'pyc')) 268 # Deleting the .pyo file cannot be within the 'try' for the .pyc since 269 # the chance exists that there is no .pyc (and thus the 'try' statement 270 # is exited) but there is a .pyo file. 271 unlink(os.path.join(dirname, modname + os.extsep + 'pyo')) 272 273# Check whether a gui is actually available 274def _is_gui_available(): 275 if hasattr(_is_gui_available, 'result'): 276 return _is_gui_available.result 277 reason = None 278 if sys.platform.startswith('win'): 279 # if Python is running as a service (such as the buildbot service), 280 # gui interaction may be disallowed 281 import ctypes 282 import ctypes.wintypes 283 UOI_FLAGS = 1 284 WSF_VISIBLE = 0x0001 285 class USEROBJECTFLAGS(ctypes.Structure): 286 _fields_ = [("fInherit", ctypes.wintypes.BOOL), 287 ("fReserved", ctypes.wintypes.BOOL), 288 ("dwFlags", ctypes.wintypes.DWORD)] 289 dll = ctypes.windll.user32 290 h = dll.GetProcessWindowStation() 291 if not h: 292 raise ctypes.WinError() 293 uof = USEROBJECTFLAGS() 294 needed = ctypes.wintypes.DWORD() 295 res = dll.GetUserObjectInformationW(h, 296 UOI_FLAGS, 297 ctypes.byref(uof), 298 ctypes.sizeof(uof), 299 ctypes.byref(needed)) 300 if not res: 301 raise ctypes.WinError() 302 if not bool(uof.dwFlags & WSF_VISIBLE): 303 reason = "gui not available (WSF_VISIBLE flag not set)" 304 elif sys.platform == 'darwin': 305 # The Aqua Tk implementations on OS X can abort the process if 306 # being called in an environment where a window server connection 307 # cannot be made, for instance when invoked by a buildbot or ssh 308 # process not running under the same user id as the current console 309 # user. To avoid that, raise an exception if the window manager 310 # connection is not available. 311 from ctypes import cdll, c_int, pointer, Structure 312 from ctypes.util import find_library 313 314 app_services = cdll.LoadLibrary(find_library("ApplicationServices")) 315 316 if app_services.CGMainDisplayID() == 0: 317 reason = "gui tests cannot run without OS X window manager" 318 else: 319 class ProcessSerialNumber(Structure): 320 _fields_ = [("highLongOfPSN", c_int), 321 ("lowLongOfPSN", c_int)] 322 psn = ProcessSerialNumber() 323 psn_p = pointer(psn) 324 if ( (app_services.GetCurrentProcess(psn_p) < 0) or 325 (app_services.SetFrontProcess(psn_p) < 0) ): 326 reason = "cannot run without OS X gui process" 327 328 # check on every platform whether tkinter can actually do anything 329 if not reason: 330 try: 331 from Tkinter import Tk 332 root = Tk() 333 root.destroy() 334 except Exception as e: 335 err_string = str(e) 336 if len(err_string) > 50: 337 err_string = err_string[:50] + ' [...]' 338 reason = 'Tk unavailable due to {}: {}'.format(type(e).__name__, 339 err_string) 340 341 _is_gui_available.reason = reason 342 _is_gui_available.result = not reason 343 344 return _is_gui_available.result 345 346def is_resource_enabled(resource): 347 """Test whether a resource is enabled. Known resources are set by 348 regrtest.py.""" 349 return use_resources is not None and resource in use_resources 350 351def requires(resource, msg=None): 352 """Raise ResourceDenied if the specified resource is not available. 353 354 If the caller's module is __main__ then automatically return True. The 355 possibility of False being returned occurs when regrtest.py is executing.""" 356 if resource == 'gui' and not _is_gui_available(): 357 raise ResourceDenied(_is_gui_available.reason) 358 # see if the caller's module is __main__ - if so, treat as if 359 # the resource was set 360 if sys._getframe(1).f_globals.get("__name__") == "__main__": 361 return 362 if not is_resource_enabled(resource): 363 if msg is None: 364 msg = "Use of the `%s' resource not enabled" % resource 365 raise ResourceDenied(msg) 366 367 368# Don't use "localhost", since resolving it uses the DNS under recent 369# Windows versions (see issue #18792). 370HOST = "127.0.0.1" 371HOSTv6 = "::1" 372 373 374def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM): 375 """Returns an unused port that should be suitable for binding. This is 376 achieved by creating a temporary socket with the same family and type as 377 the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to 378 the specified host address (defaults to 0.0.0.0) with the port set to 0, 379 eliciting an unused ephemeral port from the OS. The temporary socket is 380 then closed and deleted, and the ephemeral port is returned. 381 382 Either this method or bind_port() should be used for any tests where a 383 server socket needs to be bound to a particular port for the duration of 384 the test. Which one to use depends on whether the calling code is creating 385 a python socket, or if an unused port needs to be provided in a constructor 386 or passed to an external program (i.e. the -accept argument to openssl's 387 s_server mode). Always prefer bind_port() over find_unused_port() where 388 possible. Hard coded ports should *NEVER* be used. As soon as a server 389 socket is bound to a hard coded port, the ability to run multiple instances 390 of the test simultaneously on the same host is compromised, which makes the 391 test a ticking time bomb in a buildbot environment. On Unix buildbots, this 392 may simply manifest as a failed test, which can be recovered from without 393 intervention in most cases, but on Windows, the entire python process can 394 completely and utterly wedge, requiring someone to log in to the buildbot 395 and manually kill the affected process. 396 397 (This is easy to reproduce on Windows, unfortunately, and can be traced to 398 the SO_REUSEADDR socket option having different semantics on Windows versus 399 Unix/Linux. On Unix, you can't have two AF_INET SOCK_STREAM sockets bind, 400 listen and then accept connections on identical host/ports. An EADDRINUSE 401 socket.error will be raised at some point (depending on the platform and 402 the order bind and listen were called on each socket). 403 404 However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE 405 will ever be raised when attempting to bind two identical host/ports. When 406 accept() is called on each socket, the second caller's process will steal 407 the port from the first caller, leaving them both in an awkwardly wedged 408 state where they'll no longer respond to any signals or graceful kills, and 409 must be forcibly killed via OpenProcess()/TerminateProcess(). 410 411 The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option 412 instead of SO_REUSEADDR, which effectively affords the same semantics as 413 SO_REUSEADDR on Unix. Given the propensity of Unix developers in the Open 414 Source world compared to Windows ones, this is a common mistake. A quick 415 look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when 416 openssl.exe is called with the 's_server' option, for example. See 417 http://bugs.python.org/issue2550 for more info. The following site also 418 has a very thorough description about the implications of both REUSEADDR 419 and EXCLUSIVEADDRUSE on Windows: 420 http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx) 421 422 XXX: although this approach is a vast improvement on previous attempts to 423 elicit unused ports, it rests heavily on the assumption that the ephemeral 424 port returned to us by the OS won't immediately be dished back out to some 425 other process when we close and delete our temporary socket but before our 426 calling code has a chance to bind the returned port. We can deal with this 427 issue if/when we come across it.""" 428 tempsock = socket.socket(family, socktype) 429 port = bind_port(tempsock) 430 tempsock.close() 431 del tempsock 432 return port 433 434def bind_port(sock, host=HOST): 435 """Bind the socket to a free port and return the port number. Relies on 436 ephemeral ports in order to ensure we are using an unbound port. This is 437 important as many tests may be running simultaneously, especially in a 438 buildbot environment. This method raises an exception if the sock.family 439 is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR 440 or SO_REUSEPORT set on it. Tests should *never* set these socket options 441 for TCP/IP sockets. The only case for setting these options is testing 442 multicasting via multiple UDP sockets. 443 444 Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e. 445 on Windows), it will be set on the socket. This will prevent anyone else 446 from bind()'ing to our host/port for the duration of the test. 447 """ 448 if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM: 449 if hasattr(socket, 'SO_REUSEADDR'): 450 if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1: 451 raise TestFailed("tests should never set the SO_REUSEADDR " \ 452 "socket option on TCP/IP sockets!") 453 if hasattr(socket, 'SO_REUSEPORT'): 454 try: 455 if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1: 456 raise TestFailed("tests should never set the SO_REUSEPORT " \ 457 "socket option on TCP/IP sockets!") 458 except EnvironmentError: 459 # Python's socket module was compiled using modern headers 460 # thus defining SO_REUSEPORT but this process is running 461 # under an older kernel that does not support SO_REUSEPORT. 462 pass 463 if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'): 464 sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1) 465 466 sock.bind((host, 0)) 467 port = sock.getsockname()[1] 468 return port 469 470FUZZ = 1e-6 471 472def fcmp(x, y): # fuzzy comparison function 473 if isinstance(x, float) or isinstance(y, float): 474 try: 475 fuzz = (abs(x) + abs(y)) * FUZZ 476 if abs(x-y) <= fuzz: 477 return 0 478 except: 479 pass 480 elif type(x) == type(y) and isinstance(x, (tuple, list)): 481 for i in range(min(len(x), len(y))): 482 outcome = fcmp(x[i], y[i]) 483 if outcome != 0: 484 return outcome 485 return (len(x) > len(y)) - (len(x) < len(y)) 486 return (x > y) - (x < y) 487 488 489# A constant likely larger than the underlying OS pipe buffer size, to 490# make writes blocking. 491# Windows limit seems to be around 512 B, and many Unix kernels have a 492# 64 KiB pipe buffer size or 16 * PAGE_SIZE: take a few megs to be sure. 493# (see issue #17835 for a discussion of this number). 494PIPE_MAX_SIZE = 4 * 1024 * 1024 + 1 495 496# A constant likely larger than the underlying OS socket buffer size, to make 497# writes blocking. 498# The socket buffer sizes can usually be tuned system-wide (e.g. through sysctl 499# on Linux), or on a per-socket basis (SO_SNDBUF/SO_RCVBUF). See issue #18643 500# for a discussion of this number). 501SOCK_MAX_SIZE = 16 * 1024 * 1024 + 1 502 503try: 504 unicode 505 have_unicode = True 506except NameError: 507 have_unicode = False 508 509is_jython = sys.platform.startswith('java') 510 511# FS_NONASCII: non-ASCII Unicode character encodable by 512# sys.getfilesystemencoding(), or None if there is no such character. 513FS_NONASCII = None 514if have_unicode: 515 for character in ( 516 # First try printable and common characters to have a readable filename. 517 # For each character, the encoding list are just example of encodings able 518 # to encode the character (the list is not exhaustive). 519 520 # U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1 521 unichr(0x00E6), 522 # U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3 523 unichr(0x0130), 524 # U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257 525 unichr(0x0141), 526 # U+03C6 (Greek Small Letter Phi): cp1253 527 unichr(0x03C6), 528 # U+041A (Cyrillic Capital Letter Ka): cp1251 529 unichr(0x041A), 530 # U+05D0 (Hebrew Letter Alef): Encodable to cp424 531 unichr(0x05D0), 532 # U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic 533 unichr(0x060C), 534 # U+062A (Arabic Letter Teh): cp720 535 unichr(0x062A), 536 # U+0E01 (Thai Character Ko Kai): cp874 537 unichr(0x0E01), 538 539 # Then try more "special" characters. "special" because they may be 540 # interpreted or displayed differently depending on the exact locale 541 # encoding and the font. 542 543 # U+00A0 (No-Break Space) 544 unichr(0x00A0), 545 # U+20AC (Euro Sign) 546 unichr(0x20AC), 547 ): 548 try: 549 character.encode(sys.getfilesystemencoding())\ 550 .decode(sys.getfilesystemencoding()) 551 except UnicodeError: 552 pass 553 else: 554 FS_NONASCII = character 555 break 556 557# Filename used for testing 558if os.name == 'java': 559 # Jython disallows @ in module names 560 TESTFN = '$test' 561elif os.name == 'riscos': 562 TESTFN = 'testfile' 563else: 564 TESTFN = '@test' 565 # Unicode name only used if TEST_FN_ENCODING exists for the platform. 566 if have_unicode: 567 # Assuming sys.getfilesystemencoding()!=sys.getdefaultencoding() 568 # TESTFN_UNICODE is a filename that can be encoded using the 569 # file system encoding, but *not* with the default (ascii) encoding 570 if isinstance('', unicode): 571 # python -U 572 # XXX perhaps unicode() should accept Unicode strings? 573 TESTFN_UNICODE = "@test-\xe0\xf2" 574 else: 575 # 2 latin characters. 576 TESTFN_UNICODE = unicode("@test-\xe0\xf2", "latin-1") 577 TESTFN_ENCODING = sys.getfilesystemencoding() 578 # TESTFN_UNENCODABLE is a filename that should *not* be 579 # able to be encoded by *either* the default or filesystem encoding. 580 # This test really only makes sense on Windows NT platforms 581 # which have special Unicode support in posixmodule. 582 if (not hasattr(sys, "getwindowsversion") or 583 sys.getwindowsversion()[3] < 2): # 0=win32s or 1=9x/ME 584 TESTFN_UNENCODABLE = None 585 else: 586 # Japanese characters (I think - from bug 846133) 587 TESTFN_UNENCODABLE = eval('u"@test-\u5171\u6709\u3055\u308c\u308b"') 588 try: 589 # XXX - Note - should be using TESTFN_ENCODING here - but for 590 # Windows, "mbcs" currently always operates as if in 591 # errors=ignore' mode - hence we get '?' characters rather than 592 # the exception. 'Latin1' operates as we expect - ie, fails. 593 # See [ 850997 ] mbcs encoding ignores errors 594 TESTFN_UNENCODABLE.encode("Latin1") 595 except UnicodeEncodeError: 596 pass 597 else: 598 print \ 599 'WARNING: The filename %r CAN be encoded by the filesystem. ' \ 600 'Unicode filename tests may not be effective' \ 601 % TESTFN_UNENCODABLE 602 603 604# Disambiguate TESTFN for parallel testing, while letting it remain a valid 605# module name. 606TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid()) 607 608# Save the initial cwd 609SAVEDCWD = os.getcwd() 610 611@contextlib.contextmanager 612def temp_cwd(name='tempcwd', quiet=False): 613 """ 614 Context manager that creates a temporary directory and set it as CWD. 615 616 The new CWD is created in the current directory and it's named *name*. 617 If *quiet* is False (default) and it's not possible to create or change 618 the CWD, an error is raised. If it's True, only a warning is raised 619 and the original CWD is used. 620 """ 621 if have_unicode and isinstance(name, unicode): 622 try: 623 name = name.encode(sys.getfilesystemencoding() or 'ascii') 624 except UnicodeEncodeError: 625 if not quiet: 626 raise unittest.SkipTest('unable to encode the cwd name with ' 627 'the filesystem encoding.') 628 saved_dir = os.getcwd() 629 is_temporary = False 630 try: 631 os.mkdir(name) 632 os.chdir(name) 633 is_temporary = True 634 except OSError: 635 if not quiet: 636 raise 637 warnings.warn('tests may fail, unable to change the CWD to ' + name, 638 RuntimeWarning, stacklevel=3) 639 try: 640 yield os.getcwd() 641 finally: 642 os.chdir(saved_dir) 643 if is_temporary: 644 rmtree(name) 645 646 647def findfile(file, here=__file__, subdir=None): 648 """Try to find a file on sys.path and the working directory. If it is not 649 found the argument passed to the function is returned (this does not 650 necessarily signal failure; could still be the legitimate path).""" 651 if os.path.isabs(file): 652 return file 653 if subdir is not None: 654 file = os.path.join(subdir, file) 655 path = sys.path 656 path = [os.path.dirname(here)] + path 657 for dn in path: 658 fn = os.path.join(dn, file) 659 if os.path.exists(fn): return fn 660 return file 661 662def sortdict(dict): 663 "Like repr(dict), but in sorted order." 664 items = dict.items() 665 items.sort() 666 reprpairs = ["%r: %r" % pair for pair in items] 667 withcommas = ", ".join(reprpairs) 668 return "{%s}" % withcommas 669 670def make_bad_fd(): 671 """ 672 Create an invalid file descriptor by opening and closing a file and return 673 its fd. 674 """ 675 file = open(TESTFN, "wb") 676 try: 677 return file.fileno() 678 finally: 679 file.close() 680 unlink(TESTFN) 681 682def check_syntax_error(testcase, statement): 683 testcase.assertRaises(SyntaxError, compile, statement, 684 '<test string>', 'exec') 685 686def open_urlresource(url, check=None): 687 import urlparse, urllib2 688 689 filename = urlparse.urlparse(url)[2].split('/')[-1] # '/': it's URL! 690 691 fn = os.path.join(os.path.dirname(__file__), "data", filename) 692 693 def check_valid_file(fn): 694 f = open(fn) 695 if check is None: 696 return f 697 elif check(f): 698 f.seek(0) 699 return f 700 f.close() 701 702 if os.path.exists(fn): 703 f = check_valid_file(fn) 704 if f is not None: 705 return f 706 unlink(fn) 707 708 # Verify the requirement before downloading the file 709 requires('urlfetch') 710 711 print >> get_original_stdout(), '\tfetching %s ...' % url 712 f = urllib2.urlopen(url, timeout=15) 713 try: 714 with open(fn, "wb") as out: 715 s = f.read() 716 while s: 717 out.write(s) 718 s = f.read() 719 finally: 720 f.close() 721 722 f = check_valid_file(fn) 723 if f is not None: 724 return f 725 raise TestFailed('invalid resource "%s"' % fn) 726 727 728class WarningsRecorder(object): 729 """Convenience wrapper for the warnings list returned on 730 entry to the warnings.catch_warnings() context manager. 731 """ 732 def __init__(self, warnings_list): 733 self._warnings = warnings_list 734 self._last = 0 735 736 def __getattr__(self, attr): 737 if len(self._warnings) > self._last: 738 return getattr(self._warnings[-1], attr) 739 elif attr in warnings.WarningMessage._WARNING_DETAILS: 740 return None 741 raise AttributeError("%r has no attribute %r" % (self, attr)) 742 743 @property 744 def warnings(self): 745 return self._warnings[self._last:] 746 747 def reset(self): 748 self._last = len(self._warnings) 749 750 751def _filterwarnings(filters, quiet=False): 752 """Catch the warnings, then check if all the expected 753 warnings have been raised and re-raise unexpected warnings. 754 If 'quiet' is True, only re-raise the unexpected warnings. 755 """ 756 # Clear the warning registry of the calling module 757 # in order to re-raise the warnings. 758 frame = sys._getframe(2) 759 registry = frame.f_globals.get('__warningregistry__') 760 if registry: 761 registry.clear() 762 with warnings.catch_warnings(record=True) as w: 763 # Set filter "always" to record all warnings. Because 764 # test_warnings swap the module, we need to look up in 765 # the sys.modules dictionary. 766 sys.modules['warnings'].simplefilter("always") 767 yield WarningsRecorder(w) 768 # Filter the recorded warnings 769 reraise = [warning.message for warning in w] 770 missing = [] 771 for msg, cat in filters: 772 seen = False 773 for exc in reraise[:]: 774 message = str(exc) 775 # Filter out the matching messages 776 if (re.match(msg, message, re.I) and 777 issubclass(exc.__class__, cat)): 778 seen = True 779 reraise.remove(exc) 780 if not seen and not quiet: 781 # This filter caught nothing 782 missing.append((msg, cat.__name__)) 783 if reraise: 784 raise AssertionError("unhandled warning %r" % reraise[0]) 785 if missing: 786 raise AssertionError("filter (%r, %s) did not catch any warning" % 787 missing[0]) 788 789 790@contextlib.contextmanager 791def check_warnings(*filters, **kwargs): 792 """Context manager to silence warnings. 793 794 Accept 2-tuples as positional arguments: 795 ("message regexp", WarningCategory) 796 797 Optional argument: 798 - if 'quiet' is True, it does not fail if a filter catches nothing 799 (default True without argument, 800 default False if some filters are defined) 801 802 Without argument, it defaults to: 803 check_warnings(("", Warning), quiet=True) 804 """ 805 quiet = kwargs.get('quiet') 806 if not filters: 807 filters = (("", Warning),) 808 # Preserve backward compatibility 809 if quiet is None: 810 quiet = True 811 return _filterwarnings(filters, quiet) 812 813 814@contextlib.contextmanager 815def check_py3k_warnings(*filters, **kwargs): 816 """Context manager to silence py3k warnings. 817 818 Accept 2-tuples as positional arguments: 819 ("message regexp", WarningCategory) 820 821 Optional argument: 822 - if 'quiet' is True, it does not fail if a filter catches nothing 823 (default False) 824 825 Without argument, it defaults to: 826 check_py3k_warnings(("", DeprecationWarning), quiet=False) 827 """ 828 if sys.py3kwarning: 829 if not filters: 830 filters = (("", DeprecationWarning),) 831 else: 832 # It should not raise any py3k warning 833 filters = () 834 return _filterwarnings(filters, kwargs.get('quiet')) 835 836 837class CleanImport(object): 838 """Context manager to force import to return a new module reference. 839 840 This is useful for testing module-level behaviours, such as 841 the emission of a DeprecationWarning on import. 842 843 Use like this: 844 845 with CleanImport("foo"): 846 importlib.import_module("foo") # new reference 847 """ 848 849 def __init__(self, *module_names): 850 self.original_modules = sys.modules.copy() 851 for module_name in module_names: 852 if module_name in sys.modules: 853 module = sys.modules[module_name] 854 # It is possible that module_name is just an alias for 855 # another module (e.g. stub for modules renamed in 3.x). 856 # In that case, we also need delete the real module to clear 857 # the import cache. 858 if module.__name__ != module_name: 859 del sys.modules[module.__name__] 860 del sys.modules[module_name] 861 862 def __enter__(self): 863 return self 864 865 def __exit__(self, *ignore_exc): 866 sys.modules.update(self.original_modules) 867 868 869class EnvironmentVarGuard(UserDict.DictMixin): 870 871 """Class to help protect the environment variable properly. Can be used as 872 a context manager.""" 873 874 def __init__(self): 875 self._environ = os.environ 876 self._changed = {} 877 878 def __getitem__(self, envvar): 879 return self._environ[envvar] 880 881 def __setitem__(self, envvar, value): 882 # Remember the initial value on the first access 883 if envvar not in self._changed: 884 self._changed[envvar] = self._environ.get(envvar) 885 self._environ[envvar] = value 886 887 def __delitem__(self, envvar): 888 # Remember the initial value on the first access 889 if envvar not in self._changed: 890 self._changed[envvar] = self._environ.get(envvar) 891 if envvar in self._environ: 892 del self._environ[envvar] 893 894 def keys(self): 895 return self._environ.keys() 896 897 def set(self, envvar, value): 898 self[envvar] = value 899 900 def unset(self, envvar): 901 del self[envvar] 902 903 def __enter__(self): 904 return self 905 906 def __exit__(self, *ignore_exc): 907 for (k, v) in self._changed.items(): 908 if v is None: 909 if k in self._environ: 910 del self._environ[k] 911 else: 912 self._environ[k] = v 913 os.environ = self._environ 914 915 916class DirsOnSysPath(object): 917 """Context manager to temporarily add directories to sys.path. 918 919 This makes a copy of sys.path, appends any directories given 920 as positional arguments, then reverts sys.path to the copied 921 settings when the context ends. 922 923 Note that *all* sys.path modifications in the body of the 924 context manager, including replacement of the object, 925 will be reverted at the end of the block. 926 """ 927 928 def __init__(self, *paths): 929 self.original_value = sys.path[:] 930 self.original_object = sys.path 931 sys.path.extend(paths) 932 933 def __enter__(self): 934 return self 935 936 def __exit__(self, *ignore_exc): 937 sys.path = self.original_object 938 sys.path[:] = self.original_value 939 940 941class TransientResource(object): 942 943 """Raise ResourceDenied if an exception is raised while the context manager 944 is in effect that matches the specified exception and attributes.""" 945 946 def __init__(self, exc, **kwargs): 947 self.exc = exc 948 self.attrs = kwargs 949 950 def __enter__(self): 951 return self 952 953 def __exit__(self, type_=None, value=None, traceback=None): 954 """If type_ is a subclass of self.exc and value has attributes matching 955 self.attrs, raise ResourceDenied. Otherwise let the exception 956 propagate (if any).""" 957 if type_ is not None and issubclass(self.exc, type_): 958 for attr, attr_value in self.attrs.iteritems(): 959 if not hasattr(value, attr): 960 break 961 if getattr(value, attr) != attr_value: 962 break 963 else: 964 raise ResourceDenied("an optional resource is not available") 965 966 967@contextlib.contextmanager 968def transient_internet(resource_name, timeout=30.0, errnos=()): 969 """Return a context manager that raises ResourceDenied when various issues 970 with the Internet connection manifest themselves as exceptions.""" 971 default_errnos = [ 972 ('ECONNREFUSED', 111), 973 ('ECONNRESET', 104), 974 ('EHOSTUNREACH', 113), 975 ('ENETUNREACH', 101), 976 ('ETIMEDOUT', 110), 977 ] 978 default_gai_errnos = [ 979 ('EAI_AGAIN', -3), 980 ('EAI_FAIL', -4), 981 ('EAI_NONAME', -2), 982 ('EAI_NODATA', -5), 983 # Windows defines EAI_NODATA as 11001 but idiotic getaddrinfo() 984 # implementation actually returns WSANO_DATA i.e. 11004. 985 ('WSANO_DATA', 11004), 986 ] 987 988 denied = ResourceDenied("Resource '%s' is not available" % resource_name) 989 captured_errnos = errnos 990 gai_errnos = [] 991 if not captured_errnos: 992 captured_errnos = [getattr(errno, name, num) 993 for (name, num) in default_errnos] 994 gai_errnos = [getattr(socket, name, num) 995 for (name, num) in default_gai_errnos] 996 997 def filter_error(err): 998 n = getattr(err, 'errno', None) 999 if (isinstance(err, socket.timeout) or 1000 (isinstance(err, socket.gaierror) and n in gai_errnos) or 1001 n in captured_errnos): 1002 if not verbose: 1003 sys.stderr.write(denied.args[0] + "\n") 1004 raise denied 1005 1006 old_timeout = socket.getdefaulttimeout() 1007 try: 1008 if timeout is not None: 1009 socket.setdefaulttimeout(timeout) 1010 yield 1011 except IOError as err: 1012 # urllib can wrap original socket errors multiple times (!), we must 1013 # unwrap to get at the original error. 1014 while True: 1015 a = err.args 1016 if len(a) >= 1 and isinstance(a[0], IOError): 1017 err = a[0] 1018 # The error can also be wrapped as args[1]: 1019 # except socket.error as msg: 1020 # raise IOError('socket error', msg).with_traceback(sys.exc_info()[2]) 1021 elif len(a) >= 2 and isinstance(a[1], IOError): 1022 err = a[1] 1023 else: 1024 break 1025 filter_error(err) 1026 raise 1027 # XXX should we catch generic exceptions and look for their 1028 # __cause__ or __context__? 1029 finally: 1030 socket.setdefaulttimeout(old_timeout) 1031 1032 1033@contextlib.contextmanager 1034def captured_output(stream_name): 1035 """Return a context manager used by captured_stdout and captured_stdin 1036 that temporarily replaces the sys stream *stream_name* with a StringIO.""" 1037 import StringIO 1038 orig_stdout = getattr(sys, stream_name) 1039 setattr(sys, stream_name, StringIO.StringIO()) 1040 try: 1041 yield getattr(sys, stream_name) 1042 finally: 1043 setattr(sys, stream_name, orig_stdout) 1044 1045def captured_stdout(): 1046 """Capture the output of sys.stdout: 1047 1048 with captured_stdout() as s: 1049 print "hello" 1050 self.assertEqual(s.getvalue(), "hello") 1051 """ 1052 return captured_output("stdout") 1053 1054def captured_stderr(): 1055 return captured_output("stderr") 1056 1057def captured_stdin(): 1058 return captured_output("stdin") 1059 1060def gc_collect(): 1061 """Force as many objects as possible to be collected. 1062 1063 In non-CPython implementations of Python, this is needed because timely 1064 deallocation is not guaranteed by the garbage collector. (Even in CPython 1065 this can be the case in case of reference cycles.) This means that __del__ 1066 methods may be called later than expected and weakrefs may remain alive for 1067 longer than expected. This function tries its best to force all garbage 1068 objects to disappear. 1069 """ 1070 gc.collect() 1071 if is_jython: 1072 time.sleep(0.1) 1073 gc.collect() 1074 gc.collect() 1075 1076 1077_header = '2P' 1078if hasattr(sys, "gettotalrefcount"): 1079 _header = '2P' + _header 1080_vheader = _header + 'P' 1081 1082def calcobjsize(fmt): 1083 return struct.calcsize(_header + fmt + '0P') 1084 1085def calcvobjsize(fmt): 1086 return struct.calcsize(_vheader + fmt + '0P') 1087 1088 1089_TPFLAGS_HAVE_GC = 1<<14 1090_TPFLAGS_HEAPTYPE = 1<<9 1091 1092def check_sizeof(test, o, size): 1093 import _testcapi 1094 result = sys.getsizeof(o) 1095 # add GC header size 1096 if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\ 1097 ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))): 1098 size += _testcapi.SIZEOF_PYGC_HEAD 1099 msg = 'wrong size for %s: got %d, expected %d' \ 1100 % (type(o), result, size) 1101 test.assertEqual(result, size, msg) 1102 1103 1104#======================================================================= 1105# Decorator for running a function in a different locale, correctly resetting 1106# it afterwards. 1107 1108def run_with_locale(catstr, *locales): 1109 def decorator(func): 1110 def inner(*args, **kwds): 1111 try: 1112 import locale 1113 category = getattr(locale, catstr) 1114 orig_locale = locale.setlocale(category) 1115 except AttributeError: 1116 # if the test author gives us an invalid category string 1117 raise 1118 except: 1119 # cannot retrieve original locale, so do nothing 1120 locale = orig_locale = None 1121 else: 1122 for loc in locales: 1123 try: 1124 locale.setlocale(category, loc) 1125 break 1126 except: 1127 pass 1128 1129 # now run the function, resetting the locale on exceptions 1130 try: 1131 return func(*args, **kwds) 1132 finally: 1133 if locale and orig_locale: 1134 locale.setlocale(category, orig_locale) 1135 inner.func_name = func.func_name 1136 inner.__doc__ = func.__doc__ 1137 return inner 1138 return decorator 1139 1140#======================================================================= 1141# Big-memory-test support. Separate from 'resources' because memory use should be configurable. 1142 1143# Some handy shorthands. Note that these are used for byte-limits as well 1144# as size-limits, in the various bigmem tests 1145_1M = 1024*1024 1146_1G = 1024 * _1M 1147_2G = 2 * _1G 1148_4G = 4 * _1G 1149 1150MAX_Py_ssize_t = sys.maxsize 1151 1152def set_memlimit(limit): 1153 global max_memuse 1154 global real_max_memuse 1155 sizes = { 1156 'k': 1024, 1157 'm': _1M, 1158 'g': _1G, 1159 't': 1024*_1G, 1160 } 1161 m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit, 1162 re.IGNORECASE | re.VERBOSE) 1163 if m is None: 1164 raise ValueError('Invalid memory limit %r' % (limit,)) 1165 memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()]) 1166 real_max_memuse = memlimit 1167 if memlimit > MAX_Py_ssize_t: 1168 memlimit = MAX_Py_ssize_t 1169 if memlimit < _2G - 1: 1170 raise ValueError('Memory limit %r too low to be useful' % (limit,)) 1171 max_memuse = memlimit 1172 1173def bigmemtest(minsize, memuse, overhead=5*_1M): 1174 """Decorator for bigmem tests. 1175 1176 'minsize' is the minimum useful size for the test (in arbitrary, 1177 test-interpreted units.) 'memuse' is the number of 'bytes per size' for 1178 the test, or a good estimate of it. 'overhead' specifies fixed overhead, 1179 independent of the testsize, and defaults to 5Mb. 1180 1181 The decorator tries to guess a good value for 'size' and passes it to 1182 the decorated test function. If minsize * memuse is more than the 1183 allowed memory use (as defined by max_memuse), the test is skipped. 1184 Otherwise, minsize is adjusted upward to use up to max_memuse. 1185 """ 1186 def decorator(f): 1187 def wrapper(self): 1188 if not max_memuse: 1189 # If max_memuse is 0 (the default), 1190 # we still want to run the tests with size set to a few kb, 1191 # to make sure they work. We still want to avoid using 1192 # too much memory, though, but we do that noisily. 1193 maxsize = 5147 1194 self.assertFalse(maxsize * memuse + overhead > 20 * _1M) 1195 else: 1196 maxsize = int((max_memuse - overhead) / memuse) 1197 if maxsize < minsize: 1198 # Really ought to print 'test skipped' or something 1199 if verbose: 1200 sys.stderr.write("Skipping %s because of memory " 1201 "constraint\n" % (f.__name__,)) 1202 return 1203 # Try to keep some breathing room in memory use 1204 maxsize = max(maxsize - 50 * _1M, minsize) 1205 return f(self, maxsize) 1206 wrapper.minsize = minsize 1207 wrapper.memuse = memuse 1208 wrapper.overhead = overhead 1209 return wrapper 1210 return decorator 1211 1212def precisionbigmemtest(size, memuse, overhead=5*_1M, dry_run=True): 1213 def decorator(f): 1214 def wrapper(self): 1215 if not real_max_memuse: 1216 maxsize = 5147 1217 else: 1218 maxsize = size 1219 1220 if ((real_max_memuse or not dry_run) 1221 and real_max_memuse < maxsize * memuse): 1222 if verbose: 1223 sys.stderr.write("Skipping %s because of memory " 1224 "constraint\n" % (f.__name__,)) 1225 return 1226 1227 return f(self, maxsize) 1228 wrapper.size = size 1229 wrapper.memuse = memuse 1230 wrapper.overhead = overhead 1231 return wrapper 1232 return decorator 1233 1234def bigaddrspacetest(f): 1235 """Decorator for tests that fill the address space.""" 1236 def wrapper(self): 1237 if max_memuse < MAX_Py_ssize_t: 1238 if verbose: 1239 sys.stderr.write("Skipping %s because of memory " 1240 "constraint\n" % (f.__name__,)) 1241 else: 1242 return f(self) 1243 return wrapper 1244 1245#======================================================================= 1246# unittest integration. 1247 1248class BasicTestRunner: 1249 def run(self, test): 1250 result = unittest.TestResult() 1251 test(result) 1252 return result 1253 1254def _id(obj): 1255 return obj 1256 1257def requires_resource(resource): 1258 if resource == 'gui' and not _is_gui_available(): 1259 return unittest.skip(_is_gui_available.reason) 1260 if is_resource_enabled(resource): 1261 return _id 1262 else: 1263 return unittest.skip("resource {0!r} is not enabled".format(resource)) 1264 1265def cpython_only(test): 1266 """ 1267 Decorator for tests only applicable on CPython. 1268 """ 1269 return impl_detail(cpython=True)(test) 1270 1271def impl_detail(msg=None, **guards): 1272 if check_impl_detail(**guards): 1273 return _id 1274 if msg is None: 1275 guardnames, default = _parse_guards(guards) 1276 if default: 1277 msg = "implementation detail not available on {0}" 1278 else: 1279 msg = "implementation detail specific to {0}" 1280 guardnames = sorted(guardnames.keys()) 1281 msg = msg.format(' or '.join(guardnames)) 1282 return unittest.skip(msg) 1283 1284def _parse_guards(guards): 1285 # Returns a tuple ({platform_name: run_me}, default_value) 1286 if not guards: 1287 return ({'cpython': True}, False) 1288 is_true = guards.values()[0] 1289 assert guards.values() == [is_true] * len(guards) # all True or all False 1290 return (guards, not is_true) 1291 1292# Use the following check to guard CPython's implementation-specific tests -- 1293# or to run them only on the implementation(s) guarded by the arguments. 1294def check_impl_detail(**guards): 1295 """This function returns True or False depending on the host platform. 1296 Examples: 1297 if check_impl_detail(): # only on CPython (default) 1298 if check_impl_detail(jython=True): # only on Jython 1299 if check_impl_detail(cpython=False): # everywhere except on CPython 1300 """ 1301 guards, default = _parse_guards(guards) 1302 return guards.get(platform.python_implementation().lower(), default) 1303 1304 1305 1306def _run_suite(suite): 1307 """Run tests from a unittest.TestSuite-derived class.""" 1308 if verbose: 1309 runner = unittest.TextTestRunner(sys.stdout, verbosity=2) 1310 else: 1311 runner = BasicTestRunner() 1312 1313 result = runner.run(suite) 1314 if not result.wasSuccessful(): 1315 if len(result.errors) == 1 and not result.failures: 1316 err = result.errors[0][1] 1317 elif len(result.failures) == 1 and not result.errors: 1318 err = result.failures[0][1] 1319 else: 1320 err = "multiple errors occurred" 1321 if not verbose: 1322 err += "; run in verbose mode for details" 1323 raise TestFailed(err) 1324 1325 1326def run_unittest(*classes): 1327 """Run tests from unittest.TestCase-derived classes.""" 1328 valid_types = (unittest.TestSuite, unittest.TestCase) 1329 suite = unittest.TestSuite() 1330 for cls in classes: 1331 if isinstance(cls, str): 1332 if cls in sys.modules: 1333 suite.addTest(unittest.findTestCases(sys.modules[cls])) 1334 else: 1335 raise ValueError("str arguments must be keys in sys.modules") 1336 elif isinstance(cls, valid_types): 1337 suite.addTest(cls) 1338 else: 1339 suite.addTest(unittest.makeSuite(cls)) 1340 _run_suite(suite) 1341 1342#======================================================================= 1343# Check for the presence of docstrings. 1344 1345HAVE_DOCSTRINGS = (check_impl_detail(cpython=False) or 1346 sys.platform == 'win32' or 1347 sysconfig.get_config_var('WITH_DOC_STRINGS')) 1348 1349requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS, 1350 "test requires docstrings") 1351 1352 1353#======================================================================= 1354# doctest driver. 1355 1356def run_doctest(module, verbosity=None): 1357 """Run doctest on the given module. Return (#failures, #tests). 1358 1359 If optional argument verbosity is not specified (or is None), pass 1360 test_support's belief about verbosity on to doctest. Else doctest's 1361 usual behavior is used (it searches sys.argv for -v). 1362 """ 1363 1364 import doctest 1365 1366 if verbosity is None: 1367 verbosity = verbose 1368 else: 1369 verbosity = None 1370 1371 # Direct doctest output (normally just errors) to real stdout; doctest 1372 # output shouldn't be compared by regrtest. 1373 save_stdout = sys.stdout 1374 sys.stdout = get_original_stdout() 1375 try: 1376 f, t = doctest.testmod(module, verbose=verbosity) 1377 if f: 1378 raise TestFailed("%d of %d doctests failed" % (f, t)) 1379 finally: 1380 sys.stdout = save_stdout 1381 if verbose: 1382 print 'doctest (%s) ... %d tests with zero failures' % (module.__name__, t) 1383 return f, t 1384 1385#======================================================================= 1386# Threading support to prevent reporting refleaks when running regrtest.py -R 1387 1388# NOTE: we use thread._count() rather than threading.enumerate() (or the 1389# moral equivalent thereof) because a threading.Thread object is still alive 1390# until its __bootstrap() method has returned, even after it has been 1391# unregistered from the threading module. 1392# thread._count(), on the other hand, only gets decremented *after* the 1393# __bootstrap() method has returned, which gives us reliable reference counts 1394# at the end of a test run. 1395 1396def threading_setup(): 1397 if thread: 1398 return thread._count(), 1399 else: 1400 return 1, 1401 1402def threading_cleanup(nb_threads): 1403 if not thread: 1404 return 1405 1406 _MAX_COUNT = 10 1407 for count in range(_MAX_COUNT): 1408 n = thread._count() 1409 if n == nb_threads: 1410 break 1411 time.sleep(0.1) 1412 # XXX print a warning in case of failure? 1413 1414def reap_threads(func): 1415 """Use this function when threads are being used. This will 1416 ensure that the threads are cleaned up even when the test fails. 1417 If threading is unavailable this function does nothing. 1418 """ 1419 if not thread: 1420 return func 1421 1422 @functools.wraps(func) 1423 def decorator(*args): 1424 key = threading_setup() 1425 try: 1426 return func(*args) 1427 finally: 1428 threading_cleanup(*key) 1429 return decorator 1430 1431def reap_children(): 1432 """Use this function at the end of test_main() whenever sub-processes 1433 are started. This will help ensure that no extra children (zombies) 1434 stick around to hog resources and create problems when looking 1435 for refleaks. 1436 """ 1437 1438 # Reap all our dead child processes so we don't leave zombies around. 1439 # These hog resources and might be causing some of the buildbots to die. 1440 if hasattr(os, 'waitpid'): 1441 any_process = -1 1442 while True: 1443 try: 1444 # This will raise an exception on Windows. That's ok. 1445 pid, status = os.waitpid(any_process, os.WNOHANG) 1446 if pid == 0: 1447 break 1448 except: 1449 break 1450 1451@contextlib.contextmanager 1452def swap_attr(obj, attr, new_val): 1453 """Temporary swap out an attribute with a new object. 1454 1455 Usage: 1456 with swap_attr(obj, "attr", 5): 1457 ... 1458 1459 This will set obj.attr to 5 for the duration of the with: block, 1460 restoring the old value at the end of the block. If `attr` doesn't 1461 exist on `obj`, it will be created and then deleted at the end of the 1462 block. 1463 """ 1464 if hasattr(obj, attr): 1465 real_val = getattr(obj, attr) 1466 setattr(obj, attr, new_val) 1467 try: 1468 yield 1469 finally: 1470 setattr(obj, attr, real_val) 1471 else: 1472 setattr(obj, attr, new_val) 1473 try: 1474 yield 1475 finally: 1476 delattr(obj, attr) 1477 1478def py3k_bytes(b): 1479 """Emulate the py3k bytes() constructor. 1480 1481 NOTE: This is only a best effort function. 1482 """ 1483 try: 1484 # memoryview? 1485 return b.tobytes() 1486 except AttributeError: 1487 try: 1488 # iterable of ints? 1489 return b"".join(chr(x) for x in b) 1490 except TypeError: 1491 return bytes(b) 1492 1493def args_from_interpreter_flags(): 1494 """Return a list of command-line arguments reproducing the current 1495 settings in sys.flags.""" 1496 import subprocess 1497 return subprocess._args_from_interpreter_flags() 1498 1499def strip_python_stderr(stderr): 1500 """Strip the stderr of a Python process from potential debug output 1501 emitted by the interpreter. 1502 1503 This will typically be run on the result of the communicate() method 1504 of a subprocess.Popen object. 1505 """ 1506 stderr = re.sub(br"\[\d+ refs\]\r?\n?$", b"", stderr).strip() 1507 return stderr 1508