1import contextlib 2import errno 3import importlib 4import io 5import os 6import shutil 7import socket 8import stat 9import subprocess 10import sys 11import tempfile 12import textwrap 13import time 14import unittest 15from test import support 16from test.support import script_helper 17 18TESTFN = support.TESTFN 19 20 21class TestSupport(unittest.TestCase): 22 23 def test_import_module(self): 24 support.import_module("ftplib") 25 self.assertRaises(unittest.SkipTest, support.import_module, "foo") 26 27 def test_import_fresh_module(self): 28 support.import_fresh_module("ftplib") 29 30 def test_get_attribute(self): 31 self.assertEqual(support.get_attribute(self, "test_get_attribute"), 32 self.test_get_attribute) 33 self.assertRaises(unittest.SkipTest, support.get_attribute, self, "foo") 34 35 @unittest.skip("failing buildbots") 36 def test_get_original_stdout(self): 37 self.assertEqual(support.get_original_stdout(), sys.stdout) 38 39 def test_unload(self): 40 import sched 41 self.assertIn("sched", sys.modules) 42 support.unload("sched") 43 self.assertNotIn("sched", sys.modules) 44 45 def test_unlink(self): 46 with open(TESTFN, "w") as f: 47 pass 48 support.unlink(TESTFN) 49 self.assertFalse(os.path.exists(TESTFN)) 50 support.unlink(TESTFN) 51 52 def test_rmtree(self): 53 dirpath = support.TESTFN + 'd' 54 subdirpath = os.path.join(dirpath, 'subdir') 55 os.mkdir(dirpath) 56 os.mkdir(subdirpath) 57 support.rmtree(dirpath) 58 self.assertFalse(os.path.exists(dirpath)) 59 with support.swap_attr(support, 'verbose', 0): 60 support.rmtree(dirpath) 61 62 os.mkdir(dirpath) 63 os.mkdir(subdirpath) 64 os.chmod(dirpath, stat.S_IRUSR|stat.S_IXUSR) 65 with support.swap_attr(support, 'verbose', 0): 66 support.rmtree(dirpath) 67 self.assertFalse(os.path.exists(dirpath)) 68 69 os.mkdir(dirpath) 70 os.mkdir(subdirpath) 71 os.chmod(dirpath, 0) 72 with support.swap_attr(support, 'verbose', 0): 73 support.rmtree(dirpath) 74 self.assertFalse(os.path.exists(dirpath)) 75 76 def test_forget(self): 77 mod_filename = TESTFN + '.py' 78 with open(mod_filename, 'w') as f: 79 print('foo = 1', file=f) 80 sys.path.insert(0, os.curdir) 81 importlib.invalidate_caches() 82 try: 83 mod = __import__(TESTFN) 84 self.assertIn(TESTFN, sys.modules) 85 86 support.forget(TESTFN) 87 self.assertNotIn(TESTFN, sys.modules) 88 finally: 89 del sys.path[0] 90 support.unlink(mod_filename) 91 support.rmtree('__pycache__') 92 93 def test_HOST(self): 94 s = socket.create_server((support.HOST, 0)) 95 s.close() 96 97 def test_find_unused_port(self): 98 port = support.find_unused_port() 99 s = socket.create_server((support.HOST, port)) 100 s.close() 101 102 def test_bind_port(self): 103 s = socket.socket() 104 support.bind_port(s) 105 s.listen() 106 s.close() 107 108 # Tests for temp_dir() 109 110 def test_temp_dir(self): 111 """Test that temp_dir() creates and destroys its directory.""" 112 parent_dir = tempfile.mkdtemp() 113 parent_dir = os.path.realpath(parent_dir) 114 115 try: 116 path = os.path.join(parent_dir, 'temp') 117 self.assertFalse(os.path.isdir(path)) 118 with support.temp_dir(path) as temp_path: 119 self.assertEqual(temp_path, path) 120 self.assertTrue(os.path.isdir(path)) 121 self.assertFalse(os.path.isdir(path)) 122 finally: 123 support.rmtree(parent_dir) 124 125 def test_temp_dir__path_none(self): 126 """Test passing no path.""" 127 with support.temp_dir() as temp_path: 128 self.assertTrue(os.path.isdir(temp_path)) 129 self.assertFalse(os.path.isdir(temp_path)) 130 131 def test_temp_dir__existing_dir__quiet_default(self): 132 """Test passing a directory that already exists.""" 133 def call_temp_dir(path): 134 with support.temp_dir(path) as temp_path: 135 raise Exception("should not get here") 136 137 path = tempfile.mkdtemp() 138 path = os.path.realpath(path) 139 try: 140 self.assertTrue(os.path.isdir(path)) 141 self.assertRaises(FileExistsError, call_temp_dir, path) 142 # Make sure temp_dir did not delete the original directory. 143 self.assertTrue(os.path.isdir(path)) 144 finally: 145 shutil.rmtree(path) 146 147 def test_temp_dir__existing_dir__quiet_true(self): 148 """Test passing a directory that already exists with quiet=True.""" 149 path = tempfile.mkdtemp() 150 path = os.path.realpath(path) 151 152 try: 153 with support.check_warnings() as recorder: 154 with support.temp_dir(path, quiet=True) as temp_path: 155 self.assertEqual(path, temp_path) 156 warnings = [str(w.message) for w in recorder.warnings] 157 # Make sure temp_dir did not delete the original directory. 158 self.assertTrue(os.path.isdir(path)) 159 finally: 160 shutil.rmtree(path) 161 162 self.assertEqual(len(warnings), 1, warnings) 163 warn = warnings[0] 164 self.assertTrue(warn.startswith(f'tests may fail, unable to create ' 165 f'temporary directory {path!r}: '), 166 warn) 167 168 @unittest.skipUnless(hasattr(os, "fork"), "test requires os.fork") 169 def test_temp_dir__forked_child(self): 170 """Test that a forked child process does not remove the directory.""" 171 # See bpo-30028 for details. 172 # Run the test as an external script, because it uses fork. 173 script_helper.assert_python_ok("-c", textwrap.dedent(""" 174 import os 175 from test import support 176 with support.temp_cwd() as temp_path: 177 pid = os.fork() 178 if pid != 0: 179 # parent process (child has pid == 0) 180 181 # wait for the child to terminate 182 (pid, status) = os.waitpid(pid, 0) 183 if status != 0: 184 raise AssertionError(f"Child process failed with exit " 185 f"status indication 0x{status:x}.") 186 187 # Make sure that temp_path is still present. When the child 188 # process leaves the 'temp_cwd'-context, the __exit__()- 189 # method of the context must not remove the temporary 190 # directory. 191 if not os.path.isdir(temp_path): 192 raise AssertionError("Child removed temp_path.") 193 """)) 194 195 # Tests for change_cwd() 196 197 def test_change_cwd(self): 198 original_cwd = os.getcwd() 199 200 with support.temp_dir() as temp_path: 201 with support.change_cwd(temp_path) as new_cwd: 202 self.assertEqual(new_cwd, temp_path) 203 self.assertEqual(os.getcwd(), new_cwd) 204 205 self.assertEqual(os.getcwd(), original_cwd) 206 207 def test_change_cwd__non_existent_dir(self): 208 """Test passing a non-existent directory.""" 209 original_cwd = os.getcwd() 210 211 def call_change_cwd(path): 212 with support.change_cwd(path) as new_cwd: 213 raise Exception("should not get here") 214 215 with support.temp_dir() as parent_dir: 216 non_existent_dir = os.path.join(parent_dir, 'does_not_exist') 217 self.assertRaises(FileNotFoundError, call_change_cwd, 218 non_existent_dir) 219 220 self.assertEqual(os.getcwd(), original_cwd) 221 222 def test_change_cwd__non_existent_dir__quiet_true(self): 223 """Test passing a non-existent directory with quiet=True.""" 224 original_cwd = os.getcwd() 225 226 with support.temp_dir() as parent_dir: 227 bad_dir = os.path.join(parent_dir, 'does_not_exist') 228 with support.check_warnings() as recorder: 229 with support.change_cwd(bad_dir, quiet=True) as new_cwd: 230 self.assertEqual(new_cwd, original_cwd) 231 self.assertEqual(os.getcwd(), new_cwd) 232 warnings = [str(w.message) for w in recorder.warnings] 233 234 self.assertEqual(len(warnings), 1, warnings) 235 warn = warnings[0] 236 self.assertTrue(warn.startswith(f'tests may fail, unable to change ' 237 f'the current working directory ' 238 f'to {bad_dir!r}: '), 239 warn) 240 241 # Tests for change_cwd() 242 243 def test_change_cwd__chdir_warning(self): 244 """Check the warning message when os.chdir() fails.""" 245 path = TESTFN + '_does_not_exist' 246 with support.check_warnings() as recorder: 247 with support.change_cwd(path=path, quiet=True): 248 pass 249 messages = [str(w.message) for w in recorder.warnings] 250 251 self.assertEqual(len(messages), 1, messages) 252 msg = messages[0] 253 self.assertTrue(msg.startswith(f'tests may fail, unable to change ' 254 f'the current working directory ' 255 f'to {path!r}: '), 256 msg) 257 258 # Tests for temp_cwd() 259 260 def test_temp_cwd(self): 261 here = os.getcwd() 262 with support.temp_cwd(name=TESTFN): 263 self.assertEqual(os.path.basename(os.getcwd()), TESTFN) 264 self.assertFalse(os.path.exists(TESTFN)) 265 self.assertEqual(os.getcwd(), here) 266 267 268 def test_temp_cwd__name_none(self): 269 """Test passing None to temp_cwd().""" 270 original_cwd = os.getcwd() 271 with support.temp_cwd(name=None) as new_cwd: 272 self.assertNotEqual(new_cwd, original_cwd) 273 self.assertTrue(os.path.isdir(new_cwd)) 274 self.assertEqual(os.getcwd(), new_cwd) 275 self.assertEqual(os.getcwd(), original_cwd) 276 277 def test_sortdict(self): 278 self.assertEqual(support.sortdict({3:3, 2:2, 1:1}), "{1: 1, 2: 2, 3: 3}") 279 280 def test_make_bad_fd(self): 281 fd = support.make_bad_fd() 282 with self.assertRaises(OSError) as cm: 283 os.write(fd, b"foo") 284 self.assertEqual(cm.exception.errno, errno.EBADF) 285 286 def test_check_syntax_error(self): 287 support.check_syntax_error(self, "def class", lineno=1, offset=5) 288 with self.assertRaises(AssertionError): 289 support.check_syntax_error(self, "x=1") 290 291 def test_CleanImport(self): 292 import importlib 293 with support.CleanImport("asyncore"): 294 importlib.import_module("asyncore") 295 296 def test_DirsOnSysPath(self): 297 with support.DirsOnSysPath('foo', 'bar'): 298 self.assertIn("foo", sys.path) 299 self.assertIn("bar", sys.path) 300 self.assertNotIn("foo", sys.path) 301 self.assertNotIn("bar", sys.path) 302 303 def test_captured_stdout(self): 304 with support.captured_stdout() as stdout: 305 print("hello") 306 self.assertEqual(stdout.getvalue(), "hello\n") 307 308 def test_captured_stderr(self): 309 with support.captured_stderr() as stderr: 310 print("hello", file=sys.stderr) 311 self.assertEqual(stderr.getvalue(), "hello\n") 312 313 def test_captured_stdin(self): 314 with support.captured_stdin() as stdin: 315 stdin.write('hello\n') 316 stdin.seek(0) 317 # call test code that consumes from sys.stdin 318 captured = input() 319 self.assertEqual(captured, "hello") 320 321 def test_gc_collect(self): 322 support.gc_collect() 323 324 def test_python_is_optimized(self): 325 self.assertIsInstance(support.python_is_optimized(), bool) 326 327 def test_swap_attr(self): 328 class Obj: 329 pass 330 obj = Obj() 331 obj.x = 1 332 with support.swap_attr(obj, "x", 5) as x: 333 self.assertEqual(obj.x, 5) 334 self.assertEqual(x, 1) 335 self.assertEqual(obj.x, 1) 336 with support.swap_attr(obj, "y", 5) as y: 337 self.assertEqual(obj.y, 5) 338 self.assertIsNone(y) 339 self.assertFalse(hasattr(obj, 'y')) 340 with support.swap_attr(obj, "y", 5): 341 del obj.y 342 self.assertFalse(hasattr(obj, 'y')) 343 344 def test_swap_item(self): 345 D = {"x":1} 346 with support.swap_item(D, "x", 5) as x: 347 self.assertEqual(D["x"], 5) 348 self.assertEqual(x, 1) 349 self.assertEqual(D["x"], 1) 350 with support.swap_item(D, "y", 5) as y: 351 self.assertEqual(D["y"], 5) 352 self.assertIsNone(y) 353 self.assertNotIn("y", D) 354 with support.swap_item(D, "y", 5): 355 del D["y"] 356 self.assertNotIn("y", D) 357 358 class RefClass: 359 attribute1 = None 360 attribute2 = None 361 _hidden_attribute1 = None 362 __magic_1__ = None 363 364 class OtherClass: 365 attribute2 = None 366 attribute3 = None 367 __magic_1__ = None 368 __magic_2__ = None 369 370 def test_detect_api_mismatch(self): 371 missing_items = support.detect_api_mismatch(self.RefClass, 372 self.OtherClass) 373 self.assertEqual({'attribute1'}, missing_items) 374 375 missing_items = support.detect_api_mismatch(self.OtherClass, 376 self.RefClass) 377 self.assertEqual({'attribute3', '__magic_2__'}, missing_items) 378 379 def test_detect_api_mismatch__ignore(self): 380 ignore = ['attribute1', 'attribute3', '__magic_2__', 'not_in_either'] 381 382 missing_items = support.detect_api_mismatch( 383 self.RefClass, self.OtherClass, ignore=ignore) 384 self.assertEqual(set(), missing_items) 385 386 missing_items = support.detect_api_mismatch( 387 self.OtherClass, self.RefClass, ignore=ignore) 388 self.assertEqual(set(), missing_items) 389 390 def test_check__all__(self): 391 extra = {'tempdir'} 392 blacklist = {'template'} 393 support.check__all__(self, 394 tempfile, 395 extra=extra, 396 blacklist=blacklist) 397 398 extra = {'TextTestResult', 'installHandler'} 399 blacklist = {'load_tests', "TestProgram", "BaseTestSuite"} 400 401 support.check__all__(self, 402 unittest, 403 ("unittest.result", "unittest.case", 404 "unittest.suite", "unittest.loader", 405 "unittest.main", "unittest.runner", 406 "unittest.signals", "unittest.async_case"), 407 extra=extra, 408 blacklist=blacklist) 409 410 self.assertRaises(AssertionError, support.check__all__, self, unittest) 411 412 @unittest.skipUnless(hasattr(os, 'waitpid') and hasattr(os, 'WNOHANG'), 413 'need os.waitpid() and os.WNOHANG') 414 def test_reap_children(self): 415 # Make sure that there is no other pending child process 416 support.reap_children() 417 418 # Create a child process 419 pid = os.fork() 420 if pid == 0: 421 # child process: do nothing, just exit 422 os._exit(0) 423 424 t0 = time.monotonic() 425 deadline = time.monotonic() + 60.0 426 427 was_altered = support.environment_altered 428 try: 429 support.environment_altered = False 430 stderr = io.StringIO() 431 432 while True: 433 if time.monotonic() > deadline: 434 self.fail("timeout") 435 436 old_stderr = sys.__stderr__ 437 try: 438 sys.__stderr__ = stderr 439 support.reap_children() 440 finally: 441 sys.__stderr__ = old_stderr 442 443 # Use environment_altered to check if reap_children() found 444 # the child process 445 if support.environment_altered: 446 break 447 448 # loop until the child process completed 449 time.sleep(0.100) 450 451 msg = "Warning -- reap_children() reaped child process %s" % pid 452 self.assertIn(msg, stderr.getvalue()) 453 self.assertTrue(support.environment_altered) 454 finally: 455 support.environment_altered = was_altered 456 457 # Just in case, check again that there is no other 458 # pending child process 459 support.reap_children() 460 461 def check_options(self, args, func, expected=None): 462 code = f'from test.support import {func}; print(repr({func}()))' 463 cmd = [sys.executable, *args, '-c', code] 464 env = {key: value for key, value in os.environ.items() 465 if not key.startswith('PYTHON')} 466 proc = subprocess.run(cmd, 467 stdout=subprocess.PIPE, 468 stderr=subprocess.DEVNULL, 469 universal_newlines=True, 470 env=env) 471 if expected is None: 472 expected = args 473 self.assertEqual(proc.stdout.rstrip(), repr(expected)) 474 self.assertEqual(proc.returncode, 0) 475 476 def test_args_from_interpreter_flags(self): 477 # Test test.support.args_from_interpreter_flags() 478 for opts in ( 479 # no option 480 [], 481 # single option 482 ['-B'], 483 ['-s'], 484 ['-S'], 485 ['-E'], 486 ['-v'], 487 ['-b'], 488 ['-q'], 489 ['-I'], 490 # same option multiple times 491 ['-bb'], 492 ['-vvv'], 493 # -W options 494 ['-Wignore'], 495 # -X options 496 ['-X', 'dev'], 497 ['-Wignore', '-X', 'dev'], 498 ['-X', 'faulthandler'], 499 ['-X', 'importtime'], 500 ['-X', 'showalloccount'], 501 ['-X', 'showrefcount'], 502 ['-X', 'tracemalloc'], 503 ['-X', 'tracemalloc=3'], 504 ): 505 with self.subTest(opts=opts): 506 self.check_options(opts, 'args_from_interpreter_flags') 507 508 self.check_options(['-I', '-E', '-s'], 'args_from_interpreter_flags', 509 ['-I']) 510 511 def test_optim_args_from_interpreter_flags(self): 512 # Test test.support.optim_args_from_interpreter_flags() 513 for opts in ( 514 # no option 515 [], 516 ['-O'], 517 ['-OO'], 518 ['-OOOO'], 519 ): 520 with self.subTest(opts=opts): 521 self.check_options(opts, 'optim_args_from_interpreter_flags') 522 523 def test_match_test(self): 524 class Test: 525 def __init__(self, test_id): 526 self.test_id = test_id 527 528 def id(self): 529 return self.test_id 530 531 test_access = Test('test.test_os.FileTests.test_access') 532 test_chdir = Test('test.test_os.Win32ErrorTests.test_chdir') 533 534 # Test acceptance 535 with support.swap_attr(support, '_match_test_func', None): 536 # match all 537 support.set_match_tests([]) 538 self.assertTrue(support.match_test(test_access)) 539 self.assertTrue(support.match_test(test_chdir)) 540 541 # match all using None 542 support.set_match_tests(None, None) 543 self.assertTrue(support.match_test(test_access)) 544 self.assertTrue(support.match_test(test_chdir)) 545 546 # match the full test identifier 547 support.set_match_tests([test_access.id()], None) 548 self.assertTrue(support.match_test(test_access)) 549 self.assertFalse(support.match_test(test_chdir)) 550 551 # match the module name 552 support.set_match_tests(['test_os'], None) 553 self.assertTrue(support.match_test(test_access)) 554 self.assertTrue(support.match_test(test_chdir)) 555 556 # Test '*' pattern 557 support.set_match_tests(['test_*'], None) 558 self.assertTrue(support.match_test(test_access)) 559 self.assertTrue(support.match_test(test_chdir)) 560 561 # Test case sensitivity 562 support.set_match_tests(['filetests'], None) 563 self.assertFalse(support.match_test(test_access)) 564 support.set_match_tests(['FileTests'], None) 565 self.assertTrue(support.match_test(test_access)) 566 567 # Test pattern containing '.' and a '*' metacharacter 568 support.set_match_tests(['*test_os.*.test_*'], None) 569 self.assertTrue(support.match_test(test_access)) 570 self.assertTrue(support.match_test(test_chdir)) 571 572 # Multiple patterns 573 support.set_match_tests([test_access.id(), test_chdir.id()], None) 574 self.assertTrue(support.match_test(test_access)) 575 self.assertTrue(support.match_test(test_chdir)) 576 577 support.set_match_tests(['test_access', 'DONTMATCH'], None) 578 self.assertTrue(support.match_test(test_access)) 579 self.assertFalse(support.match_test(test_chdir)) 580 581 # Test rejection 582 with support.swap_attr(support, '_match_test_func', None): 583 # match all 584 support.set_match_tests(ignore_patterns=[]) 585 self.assertTrue(support.match_test(test_access)) 586 self.assertTrue(support.match_test(test_chdir)) 587 588 # match all using None 589 support.set_match_tests(None, None) 590 self.assertTrue(support.match_test(test_access)) 591 self.assertTrue(support.match_test(test_chdir)) 592 593 # match the full test identifier 594 support.set_match_tests(None, [test_access.id()]) 595 self.assertFalse(support.match_test(test_access)) 596 self.assertTrue(support.match_test(test_chdir)) 597 598 # match the module name 599 support.set_match_tests(None, ['test_os']) 600 self.assertFalse(support.match_test(test_access)) 601 self.assertFalse(support.match_test(test_chdir)) 602 603 # Test '*' pattern 604 support.set_match_tests(None, ['test_*']) 605 self.assertFalse(support.match_test(test_access)) 606 self.assertFalse(support.match_test(test_chdir)) 607 608 # Test case sensitivity 609 support.set_match_tests(None, ['filetests']) 610 self.assertTrue(support.match_test(test_access)) 611 support.set_match_tests(None, ['FileTests']) 612 self.assertFalse(support.match_test(test_access)) 613 614 # Test pattern containing '.' and a '*' metacharacter 615 support.set_match_tests(None, ['*test_os.*.test_*']) 616 self.assertFalse(support.match_test(test_access)) 617 self.assertFalse(support.match_test(test_chdir)) 618 619 # Multiple patterns 620 support.set_match_tests(None, [test_access.id(), test_chdir.id()]) 621 self.assertFalse(support.match_test(test_access)) 622 self.assertFalse(support.match_test(test_chdir)) 623 624 support.set_match_tests(None, ['test_access', 'DONTMATCH']) 625 self.assertFalse(support.match_test(test_access)) 626 self.assertTrue(support.match_test(test_chdir)) 627 628 def test_fd_count(self): 629 # We cannot test the absolute value of fd_count(): on old Linux 630 # kernel or glibc versions, os.urandom() keeps a FD open on 631 # /dev/urandom device and Python has 4 FD opens instead of 3. 632 start = support.fd_count() 633 fd = os.open(__file__, os.O_RDONLY) 634 try: 635 more = support.fd_count() 636 finally: 637 os.close(fd) 638 self.assertEqual(more - start, 1) 639 640 def check_print_warning(self, msg, expected): 641 stderr = io.StringIO() 642 643 old_stderr = sys.__stderr__ 644 try: 645 sys.__stderr__ = stderr 646 support.print_warning(msg) 647 finally: 648 sys.__stderr__ = old_stderr 649 650 self.assertEqual(stderr.getvalue(), expected) 651 652 def test_print_warning(self): 653 self.check_print_warning("msg", 654 "Warning -- msg\n") 655 self.check_print_warning("a\nb", 656 'Warning -- a\nWarning -- b\n') 657 658 # XXX -follows a list of untested API 659 # make_legacy_pyc 660 # is_resource_enabled 661 # requires 662 # fcmp 663 # umaks 664 # findfile 665 # check_warnings 666 # EnvironmentVarGuard 667 # TransientResource 668 # transient_internet 669 # run_with_locale 670 # set_memlimit 671 # bigmemtest 672 # precisionbigmemtest 673 # bigaddrspacetest 674 # requires_resource 675 # run_doctest 676 # threading_cleanup 677 # reap_threads 678 # strip_python_stderr 679 # can_symlink 680 # skip_unless_symlink 681 # SuppressCrashReport 682 683 684def test_main(): 685 tests = [TestSupport] 686 support.run_unittest(*tests) 687 688if __name__ == '__main__': 689 test_main() 690