1import contextlib 2import errno 3import importlib 4import io 5import os 6import shutil 7import socket 8import stat 9import subprocess 10import sys 11import tempfile 12import textwrap 13import time 14import unittest 15from test import support 16from test.support import script_helper 17 18TESTFN = support.TESTFN 19 20 21class TestSupport(unittest.TestCase): 22 23 def test_import_module(self): 24 support.import_module("ftplib") 25 self.assertRaises(unittest.SkipTest, support.import_module, "foo") 26 27 def test_import_fresh_module(self): 28 support.import_fresh_module("ftplib") 29 30 def test_get_attribute(self): 31 self.assertEqual(support.get_attribute(self, "test_get_attribute"), 32 self.test_get_attribute) 33 self.assertRaises(unittest.SkipTest, support.get_attribute, self, "foo") 34 35 @unittest.skip("failing buildbots") 36 def test_get_original_stdout(self): 37 self.assertEqual(support.get_original_stdout(), sys.stdout) 38 39 def test_unload(self): 40 import sched 41 self.assertIn("sched", sys.modules) 42 support.unload("sched") 43 self.assertNotIn("sched", sys.modules) 44 45 def test_unlink(self): 46 with open(TESTFN, "w") as f: 47 pass 48 support.unlink(TESTFN) 49 self.assertFalse(os.path.exists(TESTFN)) 50 support.unlink(TESTFN) 51 52 def test_rmtree(self): 53 dirpath = support.TESTFN + 'd' 54 subdirpath = os.path.join(dirpath, 'subdir') 55 os.mkdir(dirpath) 56 os.mkdir(subdirpath) 57 support.rmtree(dirpath) 58 self.assertFalse(os.path.exists(dirpath)) 59 with support.swap_attr(support, 'verbose', 0): 60 support.rmtree(dirpath) 61 62 os.mkdir(dirpath) 63 os.mkdir(subdirpath) 64 os.chmod(dirpath, stat.S_IRUSR|stat.S_IXUSR) 65 with support.swap_attr(support, 'verbose', 0): 66 support.rmtree(dirpath) 67 self.assertFalse(os.path.exists(dirpath)) 68 69 os.mkdir(dirpath) 70 os.mkdir(subdirpath) 71 os.chmod(dirpath, 0) 72 with support.swap_attr(support, 'verbose', 0): 73 support.rmtree(dirpath) 74 self.assertFalse(os.path.exists(dirpath)) 75 76 def test_forget(self): 77 mod_filename = TESTFN + '.py' 78 with open(mod_filename, 'w') as f: 79 print('foo = 1', file=f) 80 sys.path.insert(0, os.curdir) 81 importlib.invalidate_caches() 82 try: 83 mod = __import__(TESTFN) 84 self.assertIn(TESTFN, sys.modules) 85 86 support.forget(TESTFN) 87 self.assertNotIn(TESTFN, sys.modules) 88 finally: 89 del sys.path[0] 90 support.unlink(mod_filename) 91 support.rmtree('__pycache__') 92 93 def test_HOST(self): 94 s = socket.socket() 95 s.bind((support.HOST, 0)) 96 s.close() 97 98 def test_find_unused_port(self): 99 port = support.find_unused_port() 100 s = socket.socket() 101 s.bind((support.HOST, port)) 102 s.close() 103 104 def test_bind_port(self): 105 s = socket.socket() 106 support.bind_port(s) 107 s.listen() 108 s.close() 109 110 # Tests for temp_dir() 111 112 def test_temp_dir(self): 113 """Test that temp_dir() creates and destroys its directory.""" 114 parent_dir = tempfile.mkdtemp() 115 parent_dir = os.path.realpath(parent_dir) 116 117 try: 118 path = os.path.join(parent_dir, 'temp') 119 self.assertFalse(os.path.isdir(path)) 120 with support.temp_dir(path) as temp_path: 121 self.assertEqual(temp_path, path) 122 self.assertTrue(os.path.isdir(path)) 123 self.assertFalse(os.path.isdir(path)) 124 finally: 125 support.rmtree(parent_dir) 126 127 def test_temp_dir__path_none(self): 128 """Test passing no path.""" 129 with support.temp_dir() as temp_path: 130 self.assertTrue(os.path.isdir(temp_path)) 131 self.assertFalse(os.path.isdir(temp_path)) 132 133 def test_temp_dir__existing_dir__quiet_default(self): 134 """Test passing a directory that already exists.""" 135 def call_temp_dir(path): 136 with support.temp_dir(path) as temp_path: 137 raise Exception("should not get here") 138 139 path = tempfile.mkdtemp() 140 path = os.path.realpath(path) 141 try: 142 self.assertTrue(os.path.isdir(path)) 143 self.assertRaises(FileExistsError, call_temp_dir, path) 144 # Make sure temp_dir did not delete the original directory. 145 self.assertTrue(os.path.isdir(path)) 146 finally: 147 shutil.rmtree(path) 148 149 def test_temp_dir__existing_dir__quiet_true(self): 150 """Test passing a directory that already exists with quiet=True.""" 151 path = tempfile.mkdtemp() 152 path = os.path.realpath(path) 153 154 try: 155 with support.check_warnings() as recorder: 156 with support.temp_dir(path, quiet=True) as temp_path: 157 self.assertEqual(path, temp_path) 158 warnings = [str(w.message) for w in recorder.warnings] 159 # Make sure temp_dir did not delete the original directory. 160 self.assertTrue(os.path.isdir(path)) 161 finally: 162 shutil.rmtree(path) 163 164 self.assertEqual(len(warnings), 1, warnings) 165 warn = warnings[0] 166 self.assertTrue(warn.startswith(f'tests may fail, unable to create ' 167 f'temporary directory {path!r}: '), 168 warn) 169 170 @unittest.skipUnless(hasattr(os, "fork"), "test requires os.fork") 171 def test_temp_dir__forked_child(self): 172 """Test that a forked child process does not remove the directory.""" 173 # See bpo-30028 for details. 174 # Run the test as an external script, because it uses fork. 175 script_helper.assert_python_ok("-c", textwrap.dedent(""" 176 import os 177 from test import support 178 with support.temp_cwd() as temp_path: 179 pid = os.fork() 180 if pid != 0: 181 # parent process (child has pid == 0) 182 183 # wait for the child to terminate 184 (pid, status) = os.waitpid(pid, 0) 185 if status != 0: 186 raise AssertionError(f"Child process failed with exit " 187 f"status indication 0x{status:x}.") 188 189 # Make sure that temp_path is still present. When the child 190 # process leaves the 'temp_cwd'-context, the __exit__()- 191 # method of the context must not remove the temporary 192 # directory. 193 if not os.path.isdir(temp_path): 194 raise AssertionError("Child removed temp_path.") 195 """)) 196 197 # Tests for change_cwd() 198 199 def test_change_cwd(self): 200 original_cwd = os.getcwd() 201 202 with support.temp_dir() as temp_path: 203 with support.change_cwd(temp_path) as new_cwd: 204 self.assertEqual(new_cwd, temp_path) 205 self.assertEqual(os.getcwd(), new_cwd) 206 207 self.assertEqual(os.getcwd(), original_cwd) 208 209 def test_change_cwd__non_existent_dir(self): 210 """Test passing a non-existent directory.""" 211 original_cwd = os.getcwd() 212 213 def call_change_cwd(path): 214 with support.change_cwd(path) as new_cwd: 215 raise Exception("should not get here") 216 217 with support.temp_dir() as parent_dir: 218 non_existent_dir = os.path.join(parent_dir, 'does_not_exist') 219 self.assertRaises(FileNotFoundError, call_change_cwd, 220 non_existent_dir) 221 222 self.assertEqual(os.getcwd(), original_cwd) 223 224 def test_change_cwd__non_existent_dir__quiet_true(self): 225 """Test passing a non-existent directory with quiet=True.""" 226 original_cwd = os.getcwd() 227 228 with support.temp_dir() as parent_dir: 229 bad_dir = os.path.join(parent_dir, 'does_not_exist') 230 with support.check_warnings() as recorder: 231 with support.change_cwd(bad_dir, quiet=True) as new_cwd: 232 self.assertEqual(new_cwd, original_cwd) 233 self.assertEqual(os.getcwd(), new_cwd) 234 warnings = [str(w.message) for w in recorder.warnings] 235 236 self.assertEqual(len(warnings), 1, warnings) 237 warn = warnings[0] 238 self.assertTrue(warn.startswith(f'tests may fail, unable to change ' 239 f'the current working directory ' 240 f'to {bad_dir!r}: '), 241 warn) 242 243 # Tests for change_cwd() 244 245 def test_change_cwd__chdir_warning(self): 246 """Check the warning message when os.chdir() fails.""" 247 path = TESTFN + '_does_not_exist' 248 with support.check_warnings() as recorder: 249 with support.change_cwd(path=path, quiet=True): 250 pass 251 messages = [str(w.message) for w in recorder.warnings] 252 253 self.assertEqual(len(messages), 1, messages) 254 msg = messages[0] 255 self.assertTrue(msg.startswith(f'tests may fail, unable to change ' 256 f'the current working directory ' 257 f'to {path!r}: '), 258 msg) 259 260 # Tests for temp_cwd() 261 262 def test_temp_cwd(self): 263 here = os.getcwd() 264 with support.temp_cwd(name=TESTFN): 265 self.assertEqual(os.path.basename(os.getcwd()), TESTFN) 266 self.assertFalse(os.path.exists(TESTFN)) 267 self.assertEqual(os.getcwd(), here) 268 269 270 def test_temp_cwd__name_none(self): 271 """Test passing None to temp_cwd().""" 272 original_cwd = os.getcwd() 273 with support.temp_cwd(name=None) as new_cwd: 274 self.assertNotEqual(new_cwd, original_cwd) 275 self.assertTrue(os.path.isdir(new_cwd)) 276 self.assertEqual(os.getcwd(), new_cwd) 277 self.assertEqual(os.getcwd(), original_cwd) 278 279 def test_sortdict(self): 280 self.assertEqual(support.sortdict({3:3, 2:2, 1:1}), "{1: 1, 2: 2, 3: 3}") 281 282 def test_make_bad_fd(self): 283 fd = support.make_bad_fd() 284 with self.assertRaises(OSError) as cm: 285 os.write(fd, b"foo") 286 self.assertEqual(cm.exception.errno, errno.EBADF) 287 288 def test_check_syntax_error(self): 289 support.check_syntax_error(self, "def class", lineno=1, offset=9) 290 with self.assertRaises(AssertionError): 291 support.check_syntax_error(self, "x=1") 292 293 def test_CleanImport(self): 294 import importlib 295 with support.CleanImport("asyncore"): 296 importlib.import_module("asyncore") 297 298 def test_DirsOnSysPath(self): 299 with support.DirsOnSysPath('foo', 'bar'): 300 self.assertIn("foo", sys.path) 301 self.assertIn("bar", sys.path) 302 self.assertNotIn("foo", sys.path) 303 self.assertNotIn("bar", sys.path) 304 305 def test_captured_stdout(self): 306 with support.captured_stdout() as stdout: 307 print("hello") 308 self.assertEqual(stdout.getvalue(), "hello\n") 309 310 def test_captured_stderr(self): 311 with support.captured_stderr() as stderr: 312 print("hello", file=sys.stderr) 313 self.assertEqual(stderr.getvalue(), "hello\n") 314 315 def test_captured_stdin(self): 316 with support.captured_stdin() as stdin: 317 stdin.write('hello\n') 318 stdin.seek(0) 319 # call test code that consumes from sys.stdin 320 captured = input() 321 self.assertEqual(captured, "hello") 322 323 def test_gc_collect(self): 324 support.gc_collect() 325 326 def test_python_is_optimized(self): 327 self.assertIsInstance(support.python_is_optimized(), bool) 328 329 def test_swap_attr(self): 330 class Obj: 331 pass 332 obj = Obj() 333 obj.x = 1 334 with support.swap_attr(obj, "x", 5) as x: 335 self.assertEqual(obj.x, 5) 336 self.assertEqual(x, 1) 337 self.assertEqual(obj.x, 1) 338 with support.swap_attr(obj, "y", 5) as y: 339 self.assertEqual(obj.y, 5) 340 self.assertIsNone(y) 341 self.assertFalse(hasattr(obj, 'y')) 342 with support.swap_attr(obj, "y", 5): 343 del obj.y 344 self.assertFalse(hasattr(obj, 'y')) 345 346 def test_swap_item(self): 347 D = {"x":1} 348 with support.swap_item(D, "x", 5) as x: 349 self.assertEqual(D["x"], 5) 350 self.assertEqual(x, 1) 351 self.assertEqual(D["x"], 1) 352 with support.swap_item(D, "y", 5) as y: 353 self.assertEqual(D["y"], 5) 354 self.assertIsNone(y) 355 self.assertNotIn("y", D) 356 with support.swap_item(D, "y", 5): 357 del D["y"] 358 self.assertNotIn("y", D) 359 360 class RefClass: 361 attribute1 = None 362 attribute2 = None 363 _hidden_attribute1 = None 364 __magic_1__ = None 365 366 class OtherClass: 367 attribute2 = None 368 attribute3 = None 369 __magic_1__ = None 370 __magic_2__ = None 371 372 def test_detect_api_mismatch(self): 373 missing_items = support.detect_api_mismatch(self.RefClass, 374 self.OtherClass) 375 self.assertEqual({'attribute1'}, missing_items) 376 377 missing_items = support.detect_api_mismatch(self.OtherClass, 378 self.RefClass) 379 self.assertEqual({'attribute3', '__magic_2__'}, missing_items) 380 381 def test_detect_api_mismatch__ignore(self): 382 ignore = ['attribute1', 'attribute3', '__magic_2__', 'not_in_either'] 383 384 missing_items = support.detect_api_mismatch( 385 self.RefClass, self.OtherClass, ignore=ignore) 386 self.assertEqual(set(), missing_items) 387 388 missing_items = support.detect_api_mismatch( 389 self.OtherClass, self.RefClass, ignore=ignore) 390 self.assertEqual(set(), missing_items) 391 392 def test_check__all__(self): 393 extra = {'tempdir'} 394 blacklist = {'template'} 395 support.check__all__(self, 396 tempfile, 397 extra=extra, 398 blacklist=blacklist) 399 400 extra = {'TextTestResult', 'installHandler'} 401 blacklist = {'load_tests', "TestProgram", "BaseTestSuite"} 402 403 support.check__all__(self, 404 unittest, 405 ("unittest.result", "unittest.case", 406 "unittest.suite", "unittest.loader", 407 "unittest.main", "unittest.runner", 408 "unittest.signals"), 409 extra=extra, 410 blacklist=blacklist) 411 412 self.assertRaises(AssertionError, support.check__all__, self, unittest) 413 414 @unittest.skipUnless(hasattr(os, 'waitpid') and hasattr(os, 'WNOHANG'), 415 'need os.waitpid() and os.WNOHANG') 416 def test_reap_children(self): 417 # Make sure that there is no other pending child process 418 support.reap_children() 419 420 # Create a child process 421 pid = os.fork() 422 if pid == 0: 423 # child process: do nothing, just exit 424 os._exit(0) 425 426 t0 = time.monotonic() 427 deadline = time.monotonic() + 60.0 428 429 was_altered = support.environment_altered 430 try: 431 support.environment_altered = False 432 stderr = io.StringIO() 433 434 while True: 435 if time.monotonic() > deadline: 436 self.fail("timeout") 437 438 with contextlib.redirect_stderr(stderr): 439 support.reap_children() 440 441 # Use environment_altered to check if reap_children() found 442 # the child process 443 if support.environment_altered: 444 break 445 446 # loop until the child process completed 447 time.sleep(0.100) 448 449 msg = "Warning -- reap_children() reaped child process %s" % pid 450 self.assertIn(msg, stderr.getvalue()) 451 self.assertTrue(support.environment_altered) 452 finally: 453 support.environment_altered = was_altered 454 455 # Just in case, check again that there is no other 456 # pending child process 457 support.reap_children() 458 459 def check_options(self, args, func, expected=None): 460 code = f'from test.support import {func}; print(repr({func}()))' 461 cmd = [sys.executable, *args, '-c', code] 462 env = {key: value for key, value in os.environ.items() 463 if not key.startswith('PYTHON')} 464 proc = subprocess.run(cmd, 465 stdout=subprocess.PIPE, 466 stderr=subprocess.DEVNULL, 467 universal_newlines=True, 468 env=env) 469 if expected is None: 470 expected = args 471 self.assertEqual(proc.stdout.rstrip(), repr(expected)) 472 self.assertEqual(proc.returncode, 0) 473 474 def test_args_from_interpreter_flags(self): 475 # Test test.support.args_from_interpreter_flags() 476 for opts in ( 477 # no option 478 [], 479 # single option 480 ['-B'], 481 ['-s'], 482 ['-S'], 483 ['-E'], 484 ['-v'], 485 ['-b'], 486 ['-q'], 487 ['-I'], 488 # same option multiple times 489 ['-bb'], 490 ['-vvv'], 491 # -W options 492 ['-Wignore'], 493 # -X options 494 ['-X', 'dev'], 495 ['-Wignore', '-X', 'dev'], 496 ['-X', 'faulthandler'], 497 ['-X', 'importtime'], 498 ['-X', 'showalloccount'], 499 ['-X', 'showrefcount'], 500 ['-X', 'tracemalloc'], 501 ['-X', 'tracemalloc=3'], 502 ): 503 with self.subTest(opts=opts): 504 self.check_options(opts, 'args_from_interpreter_flags') 505 506 self.check_options(['-I', '-E', '-s'], 'args_from_interpreter_flags', 507 ['-I']) 508 509 def test_optim_args_from_interpreter_flags(self): 510 # Test test.support.optim_args_from_interpreter_flags() 511 for opts in ( 512 # no option 513 [], 514 ['-O'], 515 ['-OO'], 516 ['-OOOO'], 517 ): 518 with self.subTest(opts=opts): 519 self.check_options(opts, 'optim_args_from_interpreter_flags') 520 521 def test_match_test(self): 522 class Test: 523 def __init__(self, test_id): 524 self.test_id = test_id 525 526 def id(self): 527 return self.test_id 528 529 test_access = Test('test.test_os.FileTests.test_access') 530 test_chdir = Test('test.test_os.Win32ErrorTests.test_chdir') 531 532 # Test acceptance 533 with support.swap_attr(support, '_match_test_func', None): 534 # match all 535 support.set_match_tests([]) 536 self.assertTrue(support.match_test(test_access)) 537 self.assertTrue(support.match_test(test_chdir)) 538 539 # match all using None 540 support.set_match_tests(None, None) 541 self.assertTrue(support.match_test(test_access)) 542 self.assertTrue(support.match_test(test_chdir)) 543 544 # match the full test identifier 545 support.set_match_tests([test_access.id()], None) 546 self.assertTrue(support.match_test(test_access)) 547 self.assertFalse(support.match_test(test_chdir)) 548 549 # match the module name 550 support.set_match_tests(['test_os'], None) 551 self.assertTrue(support.match_test(test_access)) 552 self.assertTrue(support.match_test(test_chdir)) 553 554 # Test '*' pattern 555 support.set_match_tests(['test_*'], None) 556 self.assertTrue(support.match_test(test_access)) 557 self.assertTrue(support.match_test(test_chdir)) 558 559 # Test case sensitivity 560 support.set_match_tests(['filetests'], None) 561 self.assertFalse(support.match_test(test_access)) 562 support.set_match_tests(['FileTests'], None) 563 self.assertTrue(support.match_test(test_access)) 564 565 # Test pattern containing '.' and a '*' metacharacter 566 support.set_match_tests(['*test_os.*.test_*'], None) 567 self.assertTrue(support.match_test(test_access)) 568 self.assertTrue(support.match_test(test_chdir)) 569 570 # Multiple patterns 571 support.set_match_tests([test_access.id(), test_chdir.id()], None) 572 self.assertTrue(support.match_test(test_access)) 573 self.assertTrue(support.match_test(test_chdir)) 574 575 support.set_match_tests(['test_access', 'DONTMATCH'], None) 576 self.assertTrue(support.match_test(test_access)) 577 self.assertFalse(support.match_test(test_chdir)) 578 579 # Test rejection 580 with support.swap_attr(support, '_match_test_func', None): 581 # match all 582 support.set_match_tests(ignore_patterns=[]) 583 self.assertTrue(support.match_test(test_access)) 584 self.assertTrue(support.match_test(test_chdir)) 585 586 # match all using None 587 support.set_match_tests(None, None) 588 self.assertTrue(support.match_test(test_access)) 589 self.assertTrue(support.match_test(test_chdir)) 590 591 # match the full test identifier 592 support.set_match_tests(None, [test_access.id()]) 593 self.assertFalse(support.match_test(test_access)) 594 self.assertTrue(support.match_test(test_chdir)) 595 596 # match the module name 597 support.set_match_tests(None, ['test_os']) 598 self.assertFalse(support.match_test(test_access)) 599 self.assertFalse(support.match_test(test_chdir)) 600 601 # Test '*' pattern 602 support.set_match_tests(None, ['test_*']) 603 self.assertFalse(support.match_test(test_access)) 604 self.assertFalse(support.match_test(test_chdir)) 605 606 # Test case sensitivity 607 support.set_match_tests(None, ['filetests']) 608 self.assertTrue(support.match_test(test_access)) 609 support.set_match_tests(None, ['FileTests']) 610 self.assertFalse(support.match_test(test_access)) 611 612 # Test pattern containing '.' and a '*' metacharacter 613 support.set_match_tests(None, ['*test_os.*.test_*']) 614 self.assertFalse(support.match_test(test_access)) 615 self.assertFalse(support.match_test(test_chdir)) 616 617 # Multiple patterns 618 support.set_match_tests(None, [test_access.id(), test_chdir.id()]) 619 self.assertFalse(support.match_test(test_access)) 620 self.assertFalse(support.match_test(test_chdir)) 621 622 support.set_match_tests(None, ['test_access', 'DONTMATCH']) 623 self.assertFalse(support.match_test(test_access)) 624 self.assertTrue(support.match_test(test_chdir)) 625 626 def test_fd_count(self): 627 # We cannot test the absolute value of fd_count(): on old Linux 628 # kernel or glibc versions, os.urandom() keeps a FD open on 629 # /dev/urandom device and Python has 4 FD opens instead of 3. 630 start = support.fd_count() 631 fd = os.open(__file__, os.O_RDONLY) 632 try: 633 more = support.fd_count() 634 finally: 635 os.close(fd) 636 self.assertEqual(more - start, 1) 637 638 # XXX -follows a list of untested API 639 # make_legacy_pyc 640 # is_resource_enabled 641 # requires 642 # fcmp 643 # umaks 644 # findfile 645 # check_warnings 646 # EnvironmentVarGuard 647 # TransientResource 648 # transient_internet 649 # run_with_locale 650 # set_memlimit 651 # bigmemtest 652 # precisionbigmemtest 653 # bigaddrspacetest 654 # requires_resource 655 # run_doctest 656 # threading_cleanup 657 # reap_threads 658 # strip_python_stderr 659 # can_symlink 660 # skip_unless_symlink 661 # SuppressCrashReport 662 663 664def test_main(): 665 tests = [TestSupport] 666 support.run_unittest(*tests) 667 668if __name__ == '__main__': 669 test_main() 670