1# tempfile.py unit tests. 2import tempfile 3import errno 4import io 5import os 6import pathlib 7import signal 8import sys 9import re 10import warnings 11import contextlib 12import weakref 13from unittest import mock 14 15import unittest 16from test import support 17from test.support import script_helper 18 19 20if hasattr(os, 'stat'): 21 import stat 22 has_stat = 1 23else: 24 has_stat = 0 25 26has_textmode = (tempfile._text_openflags != tempfile._bin_openflags) 27has_spawnl = hasattr(os, 'spawnl') 28 29# TEST_FILES may need to be tweaked for systems depending on the maximum 30# number of files that can be opened at one time (see ulimit -n) 31if sys.platform.startswith('openbsd'): 32 TEST_FILES = 48 33else: 34 TEST_FILES = 100 35 36# This is organized as one test for each chunk of code in tempfile.py, 37# in order of their appearance in the file. Testing which requires 38# threads is not done here. 39 40class TestLowLevelInternals(unittest.TestCase): 41 def test_infer_return_type_singles(self): 42 self.assertIs(str, tempfile._infer_return_type('')) 43 self.assertIs(bytes, tempfile._infer_return_type(b'')) 44 self.assertIs(str, tempfile._infer_return_type(None)) 45 46 def test_infer_return_type_multiples(self): 47 self.assertIs(str, tempfile._infer_return_type('', '')) 48 self.assertIs(bytes, tempfile._infer_return_type(b'', b'')) 49 with self.assertRaises(TypeError): 50 tempfile._infer_return_type('', b'') 51 with self.assertRaises(TypeError): 52 tempfile._infer_return_type(b'', '') 53 54 def test_infer_return_type_multiples_and_none(self): 55 self.assertIs(str, tempfile._infer_return_type(None, '')) 56 self.assertIs(str, tempfile._infer_return_type('', None)) 57 self.assertIs(str, tempfile._infer_return_type(None, None)) 58 self.assertIs(bytes, tempfile._infer_return_type(b'', None)) 59 self.assertIs(bytes, tempfile._infer_return_type(None, b'')) 60 with self.assertRaises(TypeError): 61 tempfile._infer_return_type('', None, b'') 62 with self.assertRaises(TypeError): 63 tempfile._infer_return_type(b'', None, '') 64 65 def test_infer_return_type_pathlib(self): 66 self.assertIs(str, tempfile._infer_return_type(pathlib.Path('/'))) 67 68 69# Common functionality. 70 71class BaseTestCase(unittest.TestCase): 72 73 str_check = re.compile(r"^[a-z0-9_-]{8}$") 74 b_check = re.compile(br"^[a-z0-9_-]{8}$") 75 76 def setUp(self): 77 self._warnings_manager = support.check_warnings() 78 self._warnings_manager.__enter__() 79 warnings.filterwarnings("ignore", category=RuntimeWarning, 80 message="mktemp", module=__name__) 81 82 def tearDown(self): 83 self._warnings_manager.__exit__(None, None, None) 84 85 def nameCheck(self, name, dir, pre, suf): 86 (ndir, nbase) = os.path.split(name) 87 npre = nbase[:len(pre)] 88 nsuf = nbase[len(nbase)-len(suf):] 89 90 if dir is not None: 91 self.assertIs( 92 type(name), 93 str 94 if type(dir) is str or isinstance(dir, os.PathLike) else 95 bytes, 96 "unexpected return type", 97 ) 98 if pre is not None: 99 self.assertIs(type(name), str if type(pre) is str else bytes, 100 "unexpected return type") 101 if suf is not None: 102 self.assertIs(type(name), str if type(suf) is str else bytes, 103 "unexpected return type") 104 if (dir, pre, suf) == (None, None, None): 105 self.assertIs(type(name), str, "default return type must be str") 106 107 # check for equality of the absolute paths! 108 self.assertEqual(os.path.abspath(ndir), os.path.abspath(dir), 109 "file %r not in directory %r" % (name, dir)) 110 self.assertEqual(npre, pre, 111 "file %r does not begin with %r" % (nbase, pre)) 112 self.assertEqual(nsuf, suf, 113 "file %r does not end with %r" % (nbase, suf)) 114 115 nbase = nbase[len(pre):len(nbase)-len(suf)] 116 check = self.str_check if isinstance(nbase, str) else self.b_check 117 self.assertTrue(check.match(nbase), 118 "random characters %r do not match %r" 119 % (nbase, check.pattern)) 120 121 122class TestExports(BaseTestCase): 123 def test_exports(self): 124 # There are no surprising symbols in the tempfile module 125 dict = tempfile.__dict__ 126 127 expected = { 128 "NamedTemporaryFile" : 1, 129 "TemporaryFile" : 1, 130 "mkstemp" : 1, 131 "mkdtemp" : 1, 132 "mktemp" : 1, 133 "TMP_MAX" : 1, 134 "gettempprefix" : 1, 135 "gettempprefixb" : 1, 136 "gettempdir" : 1, 137 "gettempdirb" : 1, 138 "tempdir" : 1, 139 "template" : 1, 140 "SpooledTemporaryFile" : 1, 141 "TemporaryDirectory" : 1, 142 } 143 144 unexp = [] 145 for key in dict: 146 if key[0] != '_' and key not in expected: 147 unexp.append(key) 148 self.assertTrue(len(unexp) == 0, 149 "unexpected keys: %s" % unexp) 150 151 152class TestRandomNameSequence(BaseTestCase): 153 """Test the internal iterator object _RandomNameSequence.""" 154 155 def setUp(self): 156 self.r = tempfile._RandomNameSequence() 157 super().setUp() 158 159 def test_get_six_char_str(self): 160 # _RandomNameSequence returns a six-character string 161 s = next(self.r) 162 self.nameCheck(s, '', '', '') 163 164 def test_many(self): 165 # _RandomNameSequence returns no duplicate strings (stochastic) 166 167 dict = {} 168 r = self.r 169 for i in range(TEST_FILES): 170 s = next(r) 171 self.nameCheck(s, '', '', '') 172 self.assertNotIn(s, dict) 173 dict[s] = 1 174 175 def supports_iter(self): 176 # _RandomNameSequence supports the iterator protocol 177 178 i = 0 179 r = self.r 180 for s in r: 181 i += 1 182 if i == 20: 183 break 184 185 @unittest.skipUnless(hasattr(os, 'fork'), 186 "os.fork is required for this test") 187 def test_process_awareness(self): 188 # ensure that the random source differs between 189 # child and parent. 190 read_fd, write_fd = os.pipe() 191 pid = None 192 try: 193 pid = os.fork() 194 if not pid: 195 # child process 196 os.close(read_fd) 197 os.write(write_fd, next(self.r).encode("ascii")) 198 os.close(write_fd) 199 # bypass the normal exit handlers- leave those to 200 # the parent. 201 os._exit(0) 202 203 # parent process 204 parent_value = next(self.r) 205 child_value = os.read(read_fd, len(parent_value)).decode("ascii") 206 finally: 207 if pid: 208 # best effort to ensure the process can't bleed out 209 # via any bugs above 210 try: 211 os.kill(pid, signal.SIGKILL) 212 except OSError: 213 pass 214 215 # Read the process exit status to avoid zombie process 216 os.waitpid(pid, 0) 217 218 os.close(read_fd) 219 os.close(write_fd) 220 self.assertNotEqual(child_value, parent_value) 221 222 223 224class TestCandidateTempdirList(BaseTestCase): 225 """Test the internal function _candidate_tempdir_list.""" 226 227 def test_nonempty_list(self): 228 # _candidate_tempdir_list returns a nonempty list of strings 229 230 cand = tempfile._candidate_tempdir_list() 231 232 self.assertFalse(len(cand) == 0) 233 for c in cand: 234 self.assertIsInstance(c, str) 235 236 def test_wanted_dirs(self): 237 # _candidate_tempdir_list contains the expected directories 238 239 # Make sure the interesting environment variables are all set. 240 with support.EnvironmentVarGuard() as env: 241 for envname in 'TMPDIR', 'TEMP', 'TMP': 242 dirname = os.getenv(envname) 243 if not dirname: 244 env[envname] = os.path.abspath(envname) 245 246 cand = tempfile._candidate_tempdir_list() 247 248 for envname in 'TMPDIR', 'TEMP', 'TMP': 249 dirname = os.getenv(envname) 250 if not dirname: raise ValueError 251 self.assertIn(dirname, cand) 252 253 try: 254 dirname = os.getcwd() 255 except (AttributeError, OSError): 256 dirname = os.curdir 257 258 self.assertIn(dirname, cand) 259 260 # Not practical to try to verify the presence of OS-specific 261 # paths in this list. 262 263 264# We test _get_default_tempdir some more by testing gettempdir. 265 266class TestGetDefaultTempdir(BaseTestCase): 267 """Test _get_default_tempdir().""" 268 269 def test_no_files_left_behind(self): 270 # use a private empty directory 271 with tempfile.TemporaryDirectory() as our_temp_directory: 272 # force _get_default_tempdir() to consider our empty directory 273 def our_candidate_list(): 274 return [our_temp_directory] 275 276 with support.swap_attr(tempfile, "_candidate_tempdir_list", 277 our_candidate_list): 278 # verify our directory is empty after _get_default_tempdir() 279 tempfile._get_default_tempdir() 280 self.assertEqual(os.listdir(our_temp_directory), []) 281 282 def raise_OSError(*args, **kwargs): 283 raise OSError() 284 285 with support.swap_attr(io, "open", raise_OSError): 286 # test again with failing io.open() 287 with self.assertRaises(FileNotFoundError): 288 tempfile._get_default_tempdir() 289 self.assertEqual(os.listdir(our_temp_directory), []) 290 291 def bad_writer(*args, **kwargs): 292 fp = orig_open(*args, **kwargs) 293 fp.write = raise_OSError 294 return fp 295 296 with support.swap_attr(io, "open", bad_writer) as orig_open: 297 # test again with failing write() 298 with self.assertRaises(FileNotFoundError): 299 tempfile._get_default_tempdir() 300 self.assertEqual(os.listdir(our_temp_directory), []) 301 302 303class TestGetCandidateNames(BaseTestCase): 304 """Test the internal function _get_candidate_names.""" 305 306 def test_retval(self): 307 # _get_candidate_names returns a _RandomNameSequence object 308 obj = tempfile._get_candidate_names() 309 self.assertIsInstance(obj, tempfile._RandomNameSequence) 310 311 def test_same_thing(self): 312 # _get_candidate_names always returns the same object 313 a = tempfile._get_candidate_names() 314 b = tempfile._get_candidate_names() 315 316 self.assertTrue(a is b) 317 318 319@contextlib.contextmanager 320def _inside_empty_temp_dir(): 321 dir = tempfile.mkdtemp() 322 try: 323 with support.swap_attr(tempfile, 'tempdir', dir): 324 yield 325 finally: 326 support.rmtree(dir) 327 328 329def _mock_candidate_names(*names): 330 return support.swap_attr(tempfile, 331 '_get_candidate_names', 332 lambda: iter(names)) 333 334 335class TestBadTempdir: 336 337 def test_read_only_directory(self): 338 with _inside_empty_temp_dir(): 339 oldmode = mode = os.stat(tempfile.tempdir).st_mode 340 mode &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH) 341 os.chmod(tempfile.tempdir, mode) 342 try: 343 if os.access(tempfile.tempdir, os.W_OK): 344 self.skipTest("can't set the directory read-only") 345 with self.assertRaises(PermissionError): 346 self.make_temp() 347 self.assertEqual(os.listdir(tempfile.tempdir), []) 348 finally: 349 os.chmod(tempfile.tempdir, oldmode) 350 351 def test_nonexisting_directory(self): 352 with _inside_empty_temp_dir(): 353 tempdir = os.path.join(tempfile.tempdir, 'nonexistent') 354 with support.swap_attr(tempfile, 'tempdir', tempdir): 355 with self.assertRaises(FileNotFoundError): 356 self.make_temp() 357 358 def test_non_directory(self): 359 with _inside_empty_temp_dir(): 360 tempdir = os.path.join(tempfile.tempdir, 'file') 361 open(tempdir, 'wb').close() 362 with support.swap_attr(tempfile, 'tempdir', tempdir): 363 with self.assertRaises((NotADirectoryError, FileNotFoundError)): 364 self.make_temp() 365 366 367class TestMkstempInner(TestBadTempdir, BaseTestCase): 368 """Test the internal function _mkstemp_inner.""" 369 370 class mkstemped: 371 _bflags = tempfile._bin_openflags 372 _tflags = tempfile._text_openflags 373 _close = os.close 374 _unlink = os.unlink 375 376 def __init__(self, dir, pre, suf, bin): 377 if bin: flags = self._bflags 378 else: flags = self._tflags 379 380 output_type = tempfile._infer_return_type(dir, pre, suf) 381 (self.fd, self.name) = tempfile._mkstemp_inner(dir, pre, suf, flags, output_type) 382 383 def write(self, str): 384 os.write(self.fd, str) 385 386 def __del__(self): 387 self._close(self.fd) 388 self._unlink(self.name) 389 390 def do_create(self, dir=None, pre=None, suf=None, bin=1): 391 output_type = tempfile._infer_return_type(dir, pre, suf) 392 if dir is None: 393 if output_type is str: 394 dir = tempfile.gettempdir() 395 else: 396 dir = tempfile.gettempdirb() 397 if pre is None: 398 pre = output_type() 399 if suf is None: 400 suf = output_type() 401 file = self.mkstemped(dir, pre, suf, bin) 402 403 self.nameCheck(file.name, dir, pre, suf) 404 return file 405 406 def test_basic(self): 407 # _mkstemp_inner can create files 408 self.do_create().write(b"blat") 409 self.do_create(pre="a").write(b"blat") 410 self.do_create(suf="b").write(b"blat") 411 self.do_create(pre="a", suf="b").write(b"blat") 412 self.do_create(pre="aa", suf=".txt").write(b"blat") 413 414 def test_basic_with_bytes_names(self): 415 # _mkstemp_inner can create files when given name parts all 416 # specified as bytes. 417 dir_b = tempfile.gettempdirb() 418 self.do_create(dir=dir_b, suf=b"").write(b"blat") 419 self.do_create(dir=dir_b, pre=b"a").write(b"blat") 420 self.do_create(dir=dir_b, suf=b"b").write(b"blat") 421 self.do_create(dir=dir_b, pre=b"a", suf=b"b").write(b"blat") 422 self.do_create(dir=dir_b, pre=b"aa", suf=b".txt").write(b"blat") 423 # Can't mix str & binary types in the args. 424 with self.assertRaises(TypeError): 425 self.do_create(dir="", suf=b"").write(b"blat") 426 with self.assertRaises(TypeError): 427 self.do_create(dir=dir_b, pre="").write(b"blat") 428 with self.assertRaises(TypeError): 429 self.do_create(dir=dir_b, pre=b"", suf="").write(b"blat") 430 431 def test_basic_many(self): 432 # _mkstemp_inner can create many files (stochastic) 433 extant = list(range(TEST_FILES)) 434 for i in extant: 435 extant[i] = self.do_create(pre="aa") 436 437 def test_choose_directory(self): 438 # _mkstemp_inner can create files in a user-selected directory 439 dir = tempfile.mkdtemp() 440 try: 441 self.do_create(dir=dir).write(b"blat") 442 self.do_create(dir=pathlib.Path(dir)).write(b"blat") 443 finally: 444 os.rmdir(dir) 445 446 @unittest.skipUnless(has_stat, 'os.stat not available') 447 def test_file_mode(self): 448 # _mkstemp_inner creates files with the proper mode 449 450 file = self.do_create() 451 mode = stat.S_IMODE(os.stat(file.name).st_mode) 452 expected = 0o600 453 if sys.platform == 'win32': 454 # There's no distinction among 'user', 'group' and 'world'; 455 # replicate the 'user' bits. 456 user = expected >> 6 457 expected = user * (1 + 8 + 64) 458 self.assertEqual(mode, expected) 459 460 @unittest.skipUnless(has_spawnl, 'os.spawnl not available') 461 def test_noinherit(self): 462 # _mkstemp_inner file handles are not inherited by child processes 463 464 if support.verbose: 465 v="v" 466 else: 467 v="q" 468 469 file = self.do_create() 470 self.assertEqual(os.get_inheritable(file.fd), False) 471 fd = "%d" % file.fd 472 473 try: 474 me = __file__ 475 except NameError: 476 me = sys.argv[0] 477 478 # We have to exec something, so that FD_CLOEXEC will take 479 # effect. The core of this test is therefore in 480 # tf_inherit_check.py, which see. 481 tester = os.path.join(os.path.dirname(os.path.abspath(me)), 482 "tf_inherit_check.py") 483 484 # On Windows a spawn* /path/ with embedded spaces shouldn't be quoted, 485 # but an arg with embedded spaces should be decorated with double 486 # quotes on each end 487 if sys.platform == 'win32': 488 decorated = '"%s"' % sys.executable 489 tester = '"%s"' % tester 490 else: 491 decorated = sys.executable 492 493 retval = os.spawnl(os.P_WAIT, sys.executable, decorated, tester, v, fd) 494 self.assertFalse(retval < 0, 495 "child process caught fatal signal %d" % -retval) 496 self.assertFalse(retval > 0, "child process reports failure %d"%retval) 497 498 @unittest.skipUnless(has_textmode, "text mode not available") 499 def test_textmode(self): 500 # _mkstemp_inner can create files in text mode 501 502 # A text file is truncated at the first Ctrl+Z byte 503 f = self.do_create(bin=0) 504 f.write(b"blat\x1a") 505 f.write(b"extra\n") 506 os.lseek(f.fd, 0, os.SEEK_SET) 507 self.assertEqual(os.read(f.fd, 20), b"blat") 508 509 def make_temp(self): 510 return tempfile._mkstemp_inner(tempfile.gettempdir(), 511 tempfile.gettempprefix(), 512 '', 513 tempfile._bin_openflags, 514 str) 515 516 def test_collision_with_existing_file(self): 517 # _mkstemp_inner tries another name when a file with 518 # the chosen name already exists 519 with _inside_empty_temp_dir(), \ 520 _mock_candidate_names('aaa', 'aaa', 'bbb'): 521 (fd1, name1) = self.make_temp() 522 os.close(fd1) 523 self.assertTrue(name1.endswith('aaa')) 524 525 (fd2, name2) = self.make_temp() 526 os.close(fd2) 527 self.assertTrue(name2.endswith('bbb')) 528 529 def test_collision_with_existing_directory(self): 530 # _mkstemp_inner tries another name when a directory with 531 # the chosen name already exists 532 with _inside_empty_temp_dir(), \ 533 _mock_candidate_names('aaa', 'aaa', 'bbb'): 534 dir = tempfile.mkdtemp() 535 self.assertTrue(dir.endswith('aaa')) 536 537 (fd, name) = self.make_temp() 538 os.close(fd) 539 self.assertTrue(name.endswith('bbb')) 540 541 542class TestGetTempPrefix(BaseTestCase): 543 """Test gettempprefix().""" 544 545 def test_sane_template(self): 546 # gettempprefix returns a nonempty prefix string 547 p = tempfile.gettempprefix() 548 549 self.assertIsInstance(p, str) 550 self.assertGreater(len(p), 0) 551 552 pb = tempfile.gettempprefixb() 553 554 self.assertIsInstance(pb, bytes) 555 self.assertGreater(len(pb), 0) 556 557 def test_usable_template(self): 558 # gettempprefix returns a usable prefix string 559 560 # Create a temp directory, avoiding use of the prefix. 561 # Then attempt to create a file whose name is 562 # prefix + 'xxxxxx.xxx' in that directory. 563 p = tempfile.gettempprefix() + "xxxxxx.xxx" 564 d = tempfile.mkdtemp(prefix="") 565 try: 566 p = os.path.join(d, p) 567 fd = os.open(p, os.O_RDWR | os.O_CREAT) 568 os.close(fd) 569 os.unlink(p) 570 finally: 571 os.rmdir(d) 572 573 574class TestGetTempDir(BaseTestCase): 575 """Test gettempdir().""" 576 577 def test_directory_exists(self): 578 # gettempdir returns a directory which exists 579 580 for d in (tempfile.gettempdir(), tempfile.gettempdirb()): 581 self.assertTrue(os.path.isabs(d) or d == os.curdir, 582 "%r is not an absolute path" % d) 583 self.assertTrue(os.path.isdir(d), 584 "%r is not a directory" % d) 585 586 def test_directory_writable(self): 587 # gettempdir returns a directory writable by the user 588 589 # sneaky: just instantiate a NamedTemporaryFile, which 590 # defaults to writing into the directory returned by 591 # gettempdir. 592 file = tempfile.NamedTemporaryFile() 593 file.write(b"blat") 594 file.close() 595 596 def test_same_thing(self): 597 # gettempdir always returns the same object 598 a = tempfile.gettempdir() 599 b = tempfile.gettempdir() 600 c = tempfile.gettempdirb() 601 602 self.assertTrue(a is b) 603 self.assertNotEqual(type(a), type(c)) 604 self.assertEqual(a, os.fsdecode(c)) 605 606 def test_case_sensitive(self): 607 # gettempdir should not flatten its case 608 # even on a case-insensitive file system 609 case_sensitive_tempdir = tempfile.mkdtemp("-Temp") 610 _tempdir, tempfile.tempdir = tempfile.tempdir, None 611 try: 612 with support.EnvironmentVarGuard() as env: 613 # Fake the first env var which is checked as a candidate 614 env["TMPDIR"] = case_sensitive_tempdir 615 self.assertEqual(tempfile.gettempdir(), case_sensitive_tempdir) 616 finally: 617 tempfile.tempdir = _tempdir 618 support.rmdir(case_sensitive_tempdir) 619 620 621class TestMkstemp(BaseTestCase): 622 """Test mkstemp().""" 623 624 def do_create(self, dir=None, pre=None, suf=None): 625 output_type = tempfile._infer_return_type(dir, pre, suf) 626 if dir is None: 627 if output_type is str: 628 dir = tempfile.gettempdir() 629 else: 630 dir = tempfile.gettempdirb() 631 if pre is None: 632 pre = output_type() 633 if suf is None: 634 suf = output_type() 635 (fd, name) = tempfile.mkstemp(dir=dir, prefix=pre, suffix=suf) 636 (ndir, nbase) = os.path.split(name) 637 adir = os.path.abspath(dir) 638 self.assertEqual(adir, ndir, 639 "Directory '%s' incorrectly returned as '%s'" % (adir, ndir)) 640 641 try: 642 self.nameCheck(name, dir, pre, suf) 643 finally: 644 os.close(fd) 645 os.unlink(name) 646 647 def test_basic(self): 648 # mkstemp can create files 649 self.do_create() 650 self.do_create(pre="a") 651 self.do_create(suf="b") 652 self.do_create(pre="a", suf="b") 653 self.do_create(pre="aa", suf=".txt") 654 self.do_create(dir=".") 655 656 def test_basic_with_bytes_names(self): 657 # mkstemp can create files when given name parts all 658 # specified as bytes. 659 d = tempfile.gettempdirb() 660 self.do_create(dir=d, suf=b"") 661 self.do_create(dir=d, pre=b"a") 662 self.do_create(dir=d, suf=b"b") 663 self.do_create(dir=d, pre=b"a", suf=b"b") 664 self.do_create(dir=d, pre=b"aa", suf=b".txt") 665 self.do_create(dir=b".") 666 with self.assertRaises(TypeError): 667 self.do_create(dir=".", pre=b"aa", suf=b".txt") 668 with self.assertRaises(TypeError): 669 self.do_create(dir=b".", pre="aa", suf=b".txt") 670 with self.assertRaises(TypeError): 671 self.do_create(dir=b".", pre=b"aa", suf=".txt") 672 673 674 def test_choose_directory(self): 675 # mkstemp can create directories in a user-selected directory 676 dir = tempfile.mkdtemp() 677 try: 678 self.do_create(dir=dir) 679 self.do_create(dir=pathlib.Path(dir)) 680 finally: 681 os.rmdir(dir) 682 683 684class TestMkdtemp(TestBadTempdir, BaseTestCase): 685 """Test mkdtemp().""" 686 687 def make_temp(self): 688 return tempfile.mkdtemp() 689 690 def do_create(self, dir=None, pre=None, suf=None): 691 output_type = tempfile._infer_return_type(dir, pre, suf) 692 if dir is None: 693 if output_type is str: 694 dir = tempfile.gettempdir() 695 else: 696 dir = tempfile.gettempdirb() 697 if pre is None: 698 pre = output_type() 699 if suf is None: 700 suf = output_type() 701 name = tempfile.mkdtemp(dir=dir, prefix=pre, suffix=suf) 702 703 try: 704 self.nameCheck(name, dir, pre, suf) 705 return name 706 except: 707 os.rmdir(name) 708 raise 709 710 def test_basic(self): 711 # mkdtemp can create directories 712 os.rmdir(self.do_create()) 713 os.rmdir(self.do_create(pre="a")) 714 os.rmdir(self.do_create(suf="b")) 715 os.rmdir(self.do_create(pre="a", suf="b")) 716 os.rmdir(self.do_create(pre="aa", suf=".txt")) 717 718 def test_basic_with_bytes_names(self): 719 # mkdtemp can create directories when given all binary parts 720 d = tempfile.gettempdirb() 721 os.rmdir(self.do_create(dir=d)) 722 os.rmdir(self.do_create(dir=d, pre=b"a")) 723 os.rmdir(self.do_create(dir=d, suf=b"b")) 724 os.rmdir(self.do_create(dir=d, pre=b"a", suf=b"b")) 725 os.rmdir(self.do_create(dir=d, pre=b"aa", suf=b".txt")) 726 with self.assertRaises(TypeError): 727 os.rmdir(self.do_create(dir=d, pre="aa", suf=b".txt")) 728 with self.assertRaises(TypeError): 729 os.rmdir(self.do_create(dir=d, pre=b"aa", suf=".txt")) 730 with self.assertRaises(TypeError): 731 os.rmdir(self.do_create(dir="", pre=b"aa", suf=b".txt")) 732 733 def test_basic_many(self): 734 # mkdtemp can create many directories (stochastic) 735 extant = list(range(TEST_FILES)) 736 try: 737 for i in extant: 738 extant[i] = self.do_create(pre="aa") 739 finally: 740 for i in extant: 741 if(isinstance(i, str)): 742 os.rmdir(i) 743 744 def test_choose_directory(self): 745 # mkdtemp can create directories in a user-selected directory 746 dir = tempfile.mkdtemp() 747 try: 748 os.rmdir(self.do_create(dir=dir)) 749 os.rmdir(self.do_create(dir=pathlib.Path(dir))) 750 finally: 751 os.rmdir(dir) 752 753 @unittest.skipUnless(has_stat, 'os.stat not available') 754 def test_mode(self): 755 # mkdtemp creates directories with the proper mode 756 757 dir = self.do_create() 758 try: 759 mode = stat.S_IMODE(os.stat(dir).st_mode) 760 mode &= 0o777 # Mask off sticky bits inherited from /tmp 761 expected = 0o700 762 if sys.platform == 'win32': 763 # There's no distinction among 'user', 'group' and 'world'; 764 # replicate the 'user' bits. 765 user = expected >> 6 766 expected = user * (1 + 8 + 64) 767 self.assertEqual(mode, expected) 768 finally: 769 os.rmdir(dir) 770 771 def test_collision_with_existing_file(self): 772 # mkdtemp tries another name when a file with 773 # the chosen name already exists 774 with _inside_empty_temp_dir(), \ 775 _mock_candidate_names('aaa', 'aaa', 'bbb'): 776 file = tempfile.NamedTemporaryFile(delete=False) 777 file.close() 778 self.assertTrue(file.name.endswith('aaa')) 779 dir = tempfile.mkdtemp() 780 self.assertTrue(dir.endswith('bbb')) 781 782 def test_collision_with_existing_directory(self): 783 # mkdtemp tries another name when a directory with 784 # the chosen name already exists 785 with _inside_empty_temp_dir(), \ 786 _mock_candidate_names('aaa', 'aaa', 'bbb'): 787 dir1 = tempfile.mkdtemp() 788 self.assertTrue(dir1.endswith('aaa')) 789 dir2 = tempfile.mkdtemp() 790 self.assertTrue(dir2.endswith('bbb')) 791 792 793class TestMktemp(BaseTestCase): 794 """Test mktemp().""" 795 796 # For safety, all use of mktemp must occur in a private directory. 797 # We must also suppress the RuntimeWarning it generates. 798 def setUp(self): 799 self.dir = tempfile.mkdtemp() 800 super().setUp() 801 802 def tearDown(self): 803 if self.dir: 804 os.rmdir(self.dir) 805 self.dir = None 806 super().tearDown() 807 808 class mktemped: 809 _unlink = os.unlink 810 _bflags = tempfile._bin_openflags 811 812 def __init__(self, dir, pre, suf): 813 self.name = tempfile.mktemp(dir=dir, prefix=pre, suffix=suf) 814 # Create the file. This will raise an exception if it's 815 # mysteriously appeared in the meanwhile. 816 os.close(os.open(self.name, self._bflags, 0o600)) 817 818 def __del__(self): 819 self._unlink(self.name) 820 821 def do_create(self, pre="", suf=""): 822 file = self.mktemped(self.dir, pre, suf) 823 824 self.nameCheck(file.name, self.dir, pre, suf) 825 return file 826 827 def test_basic(self): 828 # mktemp can choose usable file names 829 self.do_create() 830 self.do_create(pre="a") 831 self.do_create(suf="b") 832 self.do_create(pre="a", suf="b") 833 self.do_create(pre="aa", suf=".txt") 834 835 def test_many(self): 836 # mktemp can choose many usable file names (stochastic) 837 extant = list(range(TEST_FILES)) 838 for i in extant: 839 extant[i] = self.do_create(pre="aa") 840 841## def test_warning(self): 842## # mktemp issues a warning when used 843## warnings.filterwarnings("error", 844## category=RuntimeWarning, 845## message="mktemp") 846## self.assertRaises(RuntimeWarning, 847## tempfile.mktemp, dir=self.dir) 848 849 850# We test _TemporaryFileWrapper by testing NamedTemporaryFile. 851 852 853class TestNamedTemporaryFile(BaseTestCase): 854 """Test NamedTemporaryFile().""" 855 856 def do_create(self, dir=None, pre="", suf="", delete=True): 857 if dir is None: 858 dir = tempfile.gettempdir() 859 file = tempfile.NamedTemporaryFile(dir=dir, prefix=pre, suffix=suf, 860 delete=delete) 861 862 self.nameCheck(file.name, dir, pre, suf) 863 return file 864 865 866 def test_basic(self): 867 # NamedTemporaryFile can create files 868 self.do_create() 869 self.do_create(pre="a") 870 self.do_create(suf="b") 871 self.do_create(pre="a", suf="b") 872 self.do_create(pre="aa", suf=".txt") 873 874 def test_method_lookup(self): 875 # Issue #18879: Looking up a temporary file method should keep it 876 # alive long enough. 877 f = self.do_create() 878 wr = weakref.ref(f) 879 write = f.write 880 write2 = f.write 881 del f 882 write(b'foo') 883 del write 884 write2(b'bar') 885 del write2 886 if support.check_impl_detail(cpython=True): 887 # No reference cycle was created. 888 self.assertIsNone(wr()) 889 890 def test_iter(self): 891 # Issue #23700: getting iterator from a temporary file should keep 892 # it alive as long as it's being iterated over 893 lines = [b'spam\n', b'eggs\n', b'beans\n'] 894 def make_file(): 895 f = tempfile.NamedTemporaryFile(mode='w+b') 896 f.write(b''.join(lines)) 897 f.seek(0) 898 return f 899 for i, l in enumerate(make_file()): 900 self.assertEqual(l, lines[i]) 901 self.assertEqual(i, len(lines) - 1) 902 903 def test_creates_named(self): 904 # NamedTemporaryFile creates files with names 905 f = tempfile.NamedTemporaryFile() 906 self.assertTrue(os.path.exists(f.name), 907 "NamedTemporaryFile %s does not exist" % f.name) 908 909 def test_del_on_close(self): 910 # A NamedTemporaryFile is deleted when closed 911 dir = tempfile.mkdtemp() 912 try: 913 f = tempfile.NamedTemporaryFile(dir=dir) 914 f.write(b'blat') 915 f.close() 916 self.assertFalse(os.path.exists(f.name), 917 "NamedTemporaryFile %s exists after close" % f.name) 918 finally: 919 os.rmdir(dir) 920 921 def test_dis_del_on_close(self): 922 # Tests that delete-on-close can be disabled 923 dir = tempfile.mkdtemp() 924 tmp = None 925 try: 926 f = tempfile.NamedTemporaryFile(dir=dir, delete=False) 927 tmp = f.name 928 f.write(b'blat') 929 f.close() 930 self.assertTrue(os.path.exists(f.name), 931 "NamedTemporaryFile %s missing after close" % f.name) 932 finally: 933 if tmp is not None: 934 os.unlink(tmp) 935 os.rmdir(dir) 936 937 def test_multiple_close(self): 938 # A NamedTemporaryFile can be closed many times without error 939 f = tempfile.NamedTemporaryFile() 940 f.write(b'abc\n') 941 f.close() 942 f.close() 943 f.close() 944 945 def test_context_manager(self): 946 # A NamedTemporaryFile can be used as a context manager 947 with tempfile.NamedTemporaryFile() as f: 948 self.assertTrue(os.path.exists(f.name)) 949 self.assertFalse(os.path.exists(f.name)) 950 def use_closed(): 951 with f: 952 pass 953 self.assertRaises(ValueError, use_closed) 954 955 def test_no_leak_fd(self): 956 # Issue #21058: don't leak file descriptor when io.open() fails 957 closed = [] 958 os_close = os.close 959 def close(fd): 960 closed.append(fd) 961 os_close(fd) 962 963 with mock.patch('os.close', side_effect=close): 964 with mock.patch('io.open', side_effect=ValueError): 965 self.assertRaises(ValueError, tempfile.NamedTemporaryFile) 966 self.assertEqual(len(closed), 1) 967 968 def test_bad_mode(self): 969 dir = tempfile.mkdtemp() 970 self.addCleanup(support.rmtree, dir) 971 with self.assertRaises(ValueError): 972 tempfile.NamedTemporaryFile(mode='wr', dir=dir) 973 with self.assertRaises(TypeError): 974 tempfile.NamedTemporaryFile(mode=2, dir=dir) 975 self.assertEqual(os.listdir(dir), []) 976 977 # How to test the mode and bufsize parameters? 978 979class TestSpooledTemporaryFile(BaseTestCase): 980 """Test SpooledTemporaryFile().""" 981 982 def do_create(self, max_size=0, dir=None, pre="", suf=""): 983 if dir is None: 984 dir = tempfile.gettempdir() 985 file = tempfile.SpooledTemporaryFile(max_size=max_size, dir=dir, prefix=pre, suffix=suf) 986 987 return file 988 989 990 def test_basic(self): 991 # SpooledTemporaryFile can create files 992 f = self.do_create() 993 self.assertFalse(f._rolled) 994 f = self.do_create(max_size=100, pre="a", suf=".txt") 995 self.assertFalse(f._rolled) 996 997 def test_del_on_close(self): 998 # A SpooledTemporaryFile is deleted when closed 999 dir = tempfile.mkdtemp() 1000 try: 1001 f = tempfile.SpooledTemporaryFile(max_size=10, dir=dir) 1002 self.assertFalse(f._rolled) 1003 f.write(b'blat ' * 5) 1004 self.assertTrue(f._rolled) 1005 filename = f.name 1006 f.close() 1007 self.assertFalse(isinstance(filename, str) and os.path.exists(filename), 1008 "SpooledTemporaryFile %s exists after close" % filename) 1009 finally: 1010 os.rmdir(dir) 1011 1012 def test_rewrite_small(self): 1013 # A SpooledTemporaryFile can be written to multiple within the max_size 1014 f = self.do_create(max_size=30) 1015 self.assertFalse(f._rolled) 1016 for i in range(5): 1017 f.seek(0, 0) 1018 f.write(b'x' * 20) 1019 self.assertFalse(f._rolled) 1020 1021 def test_write_sequential(self): 1022 # A SpooledTemporaryFile should hold exactly max_size bytes, and roll 1023 # over afterward 1024 f = self.do_create(max_size=30) 1025 self.assertFalse(f._rolled) 1026 f.write(b'x' * 20) 1027 self.assertFalse(f._rolled) 1028 f.write(b'x' * 10) 1029 self.assertFalse(f._rolled) 1030 f.write(b'x') 1031 self.assertTrue(f._rolled) 1032 1033 def test_writelines(self): 1034 # Verify writelines with a SpooledTemporaryFile 1035 f = self.do_create() 1036 f.writelines((b'x', b'y', b'z')) 1037 pos = f.seek(0) 1038 self.assertEqual(pos, 0) 1039 buf = f.read() 1040 self.assertEqual(buf, b'xyz') 1041 1042 def test_writelines_sequential(self): 1043 # A SpooledTemporaryFile should hold exactly max_size bytes, and roll 1044 # over afterward 1045 f = self.do_create(max_size=35) 1046 f.writelines((b'x' * 20, b'x' * 10, b'x' * 5)) 1047 self.assertFalse(f._rolled) 1048 f.write(b'x') 1049 self.assertTrue(f._rolled) 1050 1051 def test_sparse(self): 1052 # A SpooledTemporaryFile that is written late in the file will extend 1053 # when that occurs 1054 f = self.do_create(max_size=30) 1055 self.assertFalse(f._rolled) 1056 pos = f.seek(100, 0) 1057 self.assertEqual(pos, 100) 1058 self.assertFalse(f._rolled) 1059 f.write(b'x') 1060 self.assertTrue(f._rolled) 1061 1062 def test_fileno(self): 1063 # A SpooledTemporaryFile should roll over to a real file on fileno() 1064 f = self.do_create(max_size=30) 1065 self.assertFalse(f._rolled) 1066 self.assertTrue(f.fileno() > 0) 1067 self.assertTrue(f._rolled) 1068 1069 def test_multiple_close_before_rollover(self): 1070 # A SpooledTemporaryFile can be closed many times without error 1071 f = tempfile.SpooledTemporaryFile() 1072 f.write(b'abc\n') 1073 self.assertFalse(f._rolled) 1074 f.close() 1075 f.close() 1076 f.close() 1077 1078 def test_multiple_close_after_rollover(self): 1079 # A SpooledTemporaryFile can be closed many times without error 1080 f = tempfile.SpooledTemporaryFile(max_size=1) 1081 f.write(b'abc\n') 1082 self.assertTrue(f._rolled) 1083 f.close() 1084 f.close() 1085 f.close() 1086 1087 def test_bound_methods(self): 1088 # It should be OK to steal a bound method from a SpooledTemporaryFile 1089 # and use it independently; when the file rolls over, those bound 1090 # methods should continue to function 1091 f = self.do_create(max_size=30) 1092 read = f.read 1093 write = f.write 1094 seek = f.seek 1095 1096 write(b"a" * 35) 1097 write(b"b" * 35) 1098 seek(0, 0) 1099 self.assertEqual(read(70), b'a'*35 + b'b'*35) 1100 1101 def test_properties(self): 1102 f = tempfile.SpooledTemporaryFile(max_size=10) 1103 f.write(b'x' * 10) 1104 self.assertFalse(f._rolled) 1105 self.assertEqual(f.mode, 'w+b') 1106 self.assertIsNone(f.name) 1107 with self.assertRaises(AttributeError): 1108 f.newlines 1109 with self.assertRaises(AttributeError): 1110 f.encoding 1111 1112 f.write(b'x') 1113 self.assertTrue(f._rolled) 1114 self.assertEqual(f.mode, 'rb+') 1115 self.assertIsNotNone(f.name) 1116 with self.assertRaises(AttributeError): 1117 f.newlines 1118 with self.assertRaises(AttributeError): 1119 f.encoding 1120 1121 def test_text_mode(self): 1122 # Creating a SpooledTemporaryFile with a text mode should produce 1123 # a file object reading and writing (Unicode) text strings. 1124 f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10, 1125 encoding="utf-8") 1126 f.write("abc\n") 1127 f.seek(0) 1128 self.assertEqual(f.read(), "abc\n") 1129 f.write("def\n") 1130 f.seek(0) 1131 self.assertEqual(f.read(), "abc\ndef\n") 1132 self.assertFalse(f._rolled) 1133 self.assertEqual(f.mode, 'w+') 1134 self.assertIsNone(f.name) 1135 self.assertEqual(f.newlines, os.linesep) 1136 self.assertEqual(f.encoding, "utf-8") 1137 1138 f.write("xyzzy\n") 1139 f.seek(0) 1140 self.assertEqual(f.read(), "abc\ndef\nxyzzy\n") 1141 # Check that Ctrl+Z doesn't truncate the file 1142 f.write("foo\x1abar\n") 1143 f.seek(0) 1144 self.assertEqual(f.read(), "abc\ndef\nxyzzy\nfoo\x1abar\n") 1145 self.assertTrue(f._rolled) 1146 self.assertEqual(f.mode, 'w+') 1147 self.assertIsNotNone(f.name) 1148 self.assertEqual(f.newlines, os.linesep) 1149 self.assertEqual(f.encoding, "utf-8") 1150 1151 def test_text_newline_and_encoding(self): 1152 f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10, 1153 newline='', encoding='utf-8') 1154 f.write("\u039B\r\n") 1155 f.seek(0) 1156 self.assertEqual(f.read(), "\u039B\r\n") 1157 self.assertFalse(f._rolled) 1158 self.assertEqual(f.mode, 'w+') 1159 self.assertIsNone(f.name) 1160 self.assertIsNotNone(f.newlines) 1161 self.assertEqual(f.encoding, "utf-8") 1162 1163 f.write("\u039C" * 10 + "\r\n") 1164 f.write("\u039D" * 20) 1165 f.seek(0) 1166 self.assertEqual(f.read(), 1167 "\u039B\r\n" + ("\u039C" * 10) + "\r\n" + ("\u039D" * 20)) 1168 self.assertTrue(f._rolled) 1169 self.assertEqual(f.mode, 'w+') 1170 self.assertIsNotNone(f.name) 1171 self.assertIsNotNone(f.newlines) 1172 self.assertEqual(f.encoding, 'utf-8') 1173 1174 def test_context_manager_before_rollover(self): 1175 # A SpooledTemporaryFile can be used as a context manager 1176 with tempfile.SpooledTemporaryFile(max_size=1) as f: 1177 self.assertFalse(f._rolled) 1178 self.assertFalse(f.closed) 1179 self.assertTrue(f.closed) 1180 def use_closed(): 1181 with f: 1182 pass 1183 self.assertRaises(ValueError, use_closed) 1184 1185 def test_context_manager_during_rollover(self): 1186 # A SpooledTemporaryFile can be used as a context manager 1187 with tempfile.SpooledTemporaryFile(max_size=1) as f: 1188 self.assertFalse(f._rolled) 1189 f.write(b'abc\n') 1190 f.flush() 1191 self.assertTrue(f._rolled) 1192 self.assertFalse(f.closed) 1193 self.assertTrue(f.closed) 1194 def use_closed(): 1195 with f: 1196 pass 1197 self.assertRaises(ValueError, use_closed) 1198 1199 def test_context_manager_after_rollover(self): 1200 # A SpooledTemporaryFile can be used as a context manager 1201 f = tempfile.SpooledTemporaryFile(max_size=1) 1202 f.write(b'abc\n') 1203 f.flush() 1204 self.assertTrue(f._rolled) 1205 with f: 1206 self.assertFalse(f.closed) 1207 self.assertTrue(f.closed) 1208 def use_closed(): 1209 with f: 1210 pass 1211 self.assertRaises(ValueError, use_closed) 1212 1213 def test_truncate_with_size_parameter(self): 1214 # A SpooledTemporaryFile can be truncated to zero size 1215 f = tempfile.SpooledTemporaryFile(max_size=10) 1216 f.write(b'abcdefg\n') 1217 f.seek(0) 1218 f.truncate() 1219 self.assertFalse(f._rolled) 1220 self.assertEqual(f._file.getvalue(), b'') 1221 # A SpooledTemporaryFile can be truncated to a specific size 1222 f = tempfile.SpooledTemporaryFile(max_size=10) 1223 f.write(b'abcdefg\n') 1224 f.truncate(4) 1225 self.assertFalse(f._rolled) 1226 self.assertEqual(f._file.getvalue(), b'abcd') 1227 # A SpooledTemporaryFile rolls over if truncated to large size 1228 f = tempfile.SpooledTemporaryFile(max_size=10) 1229 f.write(b'abcdefg\n') 1230 f.truncate(20) 1231 self.assertTrue(f._rolled) 1232 if has_stat: 1233 self.assertEqual(os.fstat(f.fileno()).st_size, 20) 1234 1235 1236if tempfile.NamedTemporaryFile is not tempfile.TemporaryFile: 1237 1238 class TestTemporaryFile(BaseTestCase): 1239 """Test TemporaryFile().""" 1240 1241 def test_basic(self): 1242 # TemporaryFile can create files 1243 # No point in testing the name params - the file has no name. 1244 tempfile.TemporaryFile() 1245 1246 def test_has_no_name(self): 1247 # TemporaryFile creates files with no names (on this system) 1248 dir = tempfile.mkdtemp() 1249 f = tempfile.TemporaryFile(dir=dir) 1250 f.write(b'blat') 1251 1252 # Sneaky: because this file has no name, it should not prevent 1253 # us from removing the directory it was created in. 1254 try: 1255 os.rmdir(dir) 1256 except: 1257 # cleanup 1258 f.close() 1259 os.rmdir(dir) 1260 raise 1261 1262 def test_multiple_close(self): 1263 # A TemporaryFile can be closed many times without error 1264 f = tempfile.TemporaryFile() 1265 f.write(b'abc\n') 1266 f.close() 1267 f.close() 1268 f.close() 1269 1270 # How to test the mode and bufsize parameters? 1271 def test_mode_and_encoding(self): 1272 1273 def roundtrip(input, *args, **kwargs): 1274 with tempfile.TemporaryFile(*args, **kwargs) as fileobj: 1275 fileobj.write(input) 1276 fileobj.seek(0) 1277 self.assertEqual(input, fileobj.read()) 1278 1279 roundtrip(b"1234", "w+b") 1280 roundtrip("abdc\n", "w+") 1281 roundtrip("\u039B", "w+", encoding="utf-16") 1282 roundtrip("foo\r\n", "w+", newline="") 1283 1284 def test_no_leak_fd(self): 1285 # Issue #21058: don't leak file descriptor when io.open() fails 1286 closed = [] 1287 os_close = os.close 1288 def close(fd): 1289 closed.append(fd) 1290 os_close(fd) 1291 1292 with mock.patch('os.close', side_effect=close): 1293 with mock.patch('io.open', side_effect=ValueError): 1294 self.assertRaises(ValueError, tempfile.TemporaryFile) 1295 self.assertEqual(len(closed), 1) 1296 1297 1298 1299# Helper for test_del_on_shutdown 1300class NulledModules: 1301 def __init__(self, *modules): 1302 self.refs = [mod.__dict__ for mod in modules] 1303 self.contents = [ref.copy() for ref in self.refs] 1304 1305 def __enter__(self): 1306 for d in self.refs: 1307 for key in d: 1308 d[key] = None 1309 1310 def __exit__(self, *exc_info): 1311 for d, c in zip(self.refs, self.contents): 1312 d.clear() 1313 d.update(c) 1314 1315class TestTemporaryDirectory(BaseTestCase): 1316 """Test TemporaryDirectory().""" 1317 1318 def do_create(self, dir=None, pre="", suf="", recurse=1): 1319 if dir is None: 1320 dir = tempfile.gettempdir() 1321 tmp = tempfile.TemporaryDirectory(dir=dir, prefix=pre, suffix=suf) 1322 self.nameCheck(tmp.name, dir, pre, suf) 1323 # Create a subdirectory and some files 1324 if recurse: 1325 d1 = self.do_create(tmp.name, pre, suf, recurse-1) 1326 d1.name = None 1327 with open(os.path.join(tmp.name, "test.txt"), "wb") as f: 1328 f.write(b"Hello world!") 1329 return tmp 1330 1331 def test_mkdtemp_failure(self): 1332 # Check no additional exception if mkdtemp fails 1333 # Previously would raise AttributeError instead 1334 # (noted as part of Issue #10188) 1335 with tempfile.TemporaryDirectory() as nonexistent: 1336 pass 1337 with self.assertRaises(FileNotFoundError) as cm: 1338 tempfile.TemporaryDirectory(dir=nonexistent) 1339 self.assertEqual(cm.exception.errno, errno.ENOENT) 1340 1341 def test_explicit_cleanup(self): 1342 # A TemporaryDirectory is deleted when cleaned up 1343 dir = tempfile.mkdtemp() 1344 try: 1345 d = self.do_create(dir=dir) 1346 self.assertTrue(os.path.exists(d.name), 1347 "TemporaryDirectory %s does not exist" % d.name) 1348 d.cleanup() 1349 self.assertFalse(os.path.exists(d.name), 1350 "TemporaryDirectory %s exists after cleanup" % d.name) 1351 finally: 1352 os.rmdir(dir) 1353 1354 @support.skip_unless_symlink 1355 def test_cleanup_with_symlink_to_a_directory(self): 1356 # cleanup() should not follow symlinks to directories (issue #12464) 1357 d1 = self.do_create() 1358 d2 = self.do_create(recurse=0) 1359 1360 # Symlink d1/foo -> d2 1361 os.symlink(d2.name, os.path.join(d1.name, "foo")) 1362 1363 # This call to cleanup() should not follow the "foo" symlink 1364 d1.cleanup() 1365 1366 self.assertFalse(os.path.exists(d1.name), 1367 "TemporaryDirectory %s exists after cleanup" % d1.name) 1368 self.assertTrue(os.path.exists(d2.name), 1369 "Directory pointed to by a symlink was deleted") 1370 self.assertEqual(os.listdir(d2.name), ['test.txt'], 1371 "Contents of the directory pointed to by a symlink " 1372 "were deleted") 1373 d2.cleanup() 1374 1375 @support.cpython_only 1376 def test_del_on_collection(self): 1377 # A TemporaryDirectory is deleted when garbage collected 1378 dir = tempfile.mkdtemp() 1379 try: 1380 d = self.do_create(dir=dir) 1381 name = d.name 1382 del d # Rely on refcounting to invoke __del__ 1383 self.assertFalse(os.path.exists(name), 1384 "TemporaryDirectory %s exists after __del__" % name) 1385 finally: 1386 os.rmdir(dir) 1387 1388 def test_del_on_shutdown(self): 1389 # A TemporaryDirectory may be cleaned up during shutdown 1390 with self.do_create() as dir: 1391 for mod in ('builtins', 'os', 'shutil', 'sys', 'tempfile', 'warnings'): 1392 code = """if True: 1393 import builtins 1394 import os 1395 import shutil 1396 import sys 1397 import tempfile 1398 import warnings 1399 1400 tmp = tempfile.TemporaryDirectory(dir={dir!r}) 1401 sys.stdout.buffer.write(tmp.name.encode()) 1402 1403 tmp2 = os.path.join(tmp.name, 'test_dir') 1404 os.mkdir(tmp2) 1405 with open(os.path.join(tmp2, "test.txt"), "w") as f: 1406 f.write("Hello world!") 1407 1408 {mod}.tmp = tmp 1409 1410 warnings.filterwarnings("always", category=ResourceWarning) 1411 """.format(dir=dir, mod=mod) 1412 rc, out, err = script_helper.assert_python_ok("-c", code) 1413 tmp_name = out.decode().strip() 1414 self.assertFalse(os.path.exists(tmp_name), 1415 "TemporaryDirectory %s exists after cleanup" % tmp_name) 1416 err = err.decode('utf-8', 'backslashreplace') 1417 self.assertNotIn("Exception ", err) 1418 self.assertIn("ResourceWarning: Implicitly cleaning up", err) 1419 1420 def test_exit_on_shutdown(self): 1421 # Issue #22427 1422 with self.do_create() as dir: 1423 code = """if True: 1424 import sys 1425 import tempfile 1426 import warnings 1427 1428 def generator(): 1429 with tempfile.TemporaryDirectory(dir={dir!r}) as tmp: 1430 yield tmp 1431 g = generator() 1432 sys.stdout.buffer.write(next(g).encode()) 1433 1434 warnings.filterwarnings("always", category=ResourceWarning) 1435 """.format(dir=dir) 1436 rc, out, err = script_helper.assert_python_ok("-c", code) 1437 tmp_name = out.decode().strip() 1438 self.assertFalse(os.path.exists(tmp_name), 1439 "TemporaryDirectory %s exists after cleanup" % tmp_name) 1440 err = err.decode('utf-8', 'backslashreplace') 1441 self.assertNotIn("Exception ", err) 1442 self.assertIn("ResourceWarning: Implicitly cleaning up", err) 1443 1444 def test_warnings_on_cleanup(self): 1445 # ResourceWarning will be triggered by __del__ 1446 with self.do_create() as dir: 1447 d = self.do_create(dir=dir, recurse=3) 1448 name = d.name 1449 1450 # Check for the resource warning 1451 with support.check_warnings(('Implicitly', ResourceWarning), quiet=False): 1452 warnings.filterwarnings("always", category=ResourceWarning) 1453 del d 1454 support.gc_collect() 1455 self.assertFalse(os.path.exists(name), 1456 "TemporaryDirectory %s exists after __del__" % name) 1457 1458 def test_multiple_close(self): 1459 # Can be cleaned-up many times without error 1460 d = self.do_create() 1461 d.cleanup() 1462 d.cleanup() 1463 d.cleanup() 1464 1465 def test_context_manager(self): 1466 # Can be used as a context manager 1467 d = self.do_create() 1468 with d as name: 1469 self.assertTrue(os.path.exists(name)) 1470 self.assertEqual(name, d.name) 1471 self.assertFalse(os.path.exists(name)) 1472 1473 1474if __name__ == "__main__": 1475 unittest.main() 1476