1# tempfile.py unit tests. 2import tempfile 3import errno 4import io 5import os 6import pathlib 7import signal 8import sys 9import re 10import warnings 11import contextlib 12import stat 13import weakref 14from unittest import mock 15 16import unittest 17from test import support 18from test.support import script_helper 19 20 21has_textmode = (tempfile._text_openflags != tempfile._bin_openflags) 22has_spawnl = hasattr(os, 'spawnl') 23 24# TEST_FILES may need to be tweaked for systems depending on the maximum 25# number of files that can be opened at one time (see ulimit -n) 26if sys.platform.startswith('openbsd'): 27 TEST_FILES = 48 28else: 29 TEST_FILES = 100 30 31# This is organized as one test for each chunk of code in tempfile.py, 32# in order of their appearance in the file. Testing which requires 33# threads is not done here. 34 35class TestLowLevelInternals(unittest.TestCase): 36 def test_infer_return_type_singles(self): 37 self.assertIs(str, tempfile._infer_return_type('')) 38 self.assertIs(bytes, tempfile._infer_return_type(b'')) 39 self.assertIs(str, tempfile._infer_return_type(None)) 40 41 def test_infer_return_type_multiples(self): 42 self.assertIs(str, tempfile._infer_return_type('', '')) 43 self.assertIs(bytes, tempfile._infer_return_type(b'', b'')) 44 with self.assertRaises(TypeError): 45 tempfile._infer_return_type('', b'') 46 with self.assertRaises(TypeError): 47 tempfile._infer_return_type(b'', '') 48 49 def test_infer_return_type_multiples_and_none(self): 50 self.assertIs(str, tempfile._infer_return_type(None, '')) 51 self.assertIs(str, tempfile._infer_return_type('', None)) 52 self.assertIs(str, tempfile._infer_return_type(None, None)) 53 self.assertIs(bytes, tempfile._infer_return_type(b'', None)) 54 self.assertIs(bytes, tempfile._infer_return_type(None, b'')) 55 with self.assertRaises(TypeError): 56 tempfile._infer_return_type('', None, b'') 57 with self.assertRaises(TypeError): 58 tempfile._infer_return_type(b'', None, '') 59 60 def test_infer_return_type_pathlib(self): 61 self.assertIs(str, tempfile._infer_return_type(pathlib.Path('/'))) 62 63 64# Common functionality. 65 66class BaseTestCase(unittest.TestCase): 67 68 str_check = re.compile(r"^[a-z0-9_-]{8}$") 69 b_check = re.compile(br"^[a-z0-9_-]{8}$") 70 71 def setUp(self): 72 self._warnings_manager = support.check_warnings() 73 self._warnings_manager.__enter__() 74 warnings.filterwarnings("ignore", category=RuntimeWarning, 75 message="mktemp", module=__name__) 76 77 def tearDown(self): 78 self._warnings_manager.__exit__(None, None, None) 79 80 def nameCheck(self, name, dir, pre, suf): 81 (ndir, nbase) = os.path.split(name) 82 npre = nbase[:len(pre)] 83 nsuf = nbase[len(nbase)-len(suf):] 84 85 if dir is not None: 86 self.assertIs( 87 type(name), 88 str 89 if type(dir) is str or isinstance(dir, os.PathLike) else 90 bytes, 91 "unexpected return type", 92 ) 93 if pre is not None: 94 self.assertIs(type(name), str if type(pre) is str else bytes, 95 "unexpected return type") 96 if suf is not None: 97 self.assertIs(type(name), str if type(suf) is str else bytes, 98 "unexpected return type") 99 if (dir, pre, suf) == (None, None, None): 100 self.assertIs(type(name), str, "default return type must be str") 101 102 # check for equality of the absolute paths! 103 self.assertEqual(os.path.abspath(ndir), os.path.abspath(dir), 104 "file %r not in directory %r" % (name, dir)) 105 self.assertEqual(npre, pre, 106 "file %r does not begin with %r" % (nbase, pre)) 107 self.assertEqual(nsuf, suf, 108 "file %r does not end with %r" % (nbase, suf)) 109 110 nbase = nbase[len(pre):len(nbase)-len(suf)] 111 check = self.str_check if isinstance(nbase, str) else self.b_check 112 self.assertTrue(check.match(nbase), 113 "random characters %r do not match %r" 114 % (nbase, check.pattern)) 115 116 117class TestExports(BaseTestCase): 118 def test_exports(self): 119 # There are no surprising symbols in the tempfile module 120 dict = tempfile.__dict__ 121 122 expected = { 123 "NamedTemporaryFile" : 1, 124 "TemporaryFile" : 1, 125 "mkstemp" : 1, 126 "mkdtemp" : 1, 127 "mktemp" : 1, 128 "TMP_MAX" : 1, 129 "gettempprefix" : 1, 130 "gettempprefixb" : 1, 131 "gettempdir" : 1, 132 "gettempdirb" : 1, 133 "tempdir" : 1, 134 "template" : 1, 135 "SpooledTemporaryFile" : 1, 136 "TemporaryDirectory" : 1, 137 } 138 139 unexp = [] 140 for key in dict: 141 if key[0] != '_' and key not in expected: 142 unexp.append(key) 143 self.assertTrue(len(unexp) == 0, 144 "unexpected keys: %s" % unexp) 145 146 147class TestRandomNameSequence(BaseTestCase): 148 """Test the internal iterator object _RandomNameSequence.""" 149 150 def setUp(self): 151 self.r = tempfile._RandomNameSequence() 152 super().setUp() 153 154 def test_get_six_char_str(self): 155 # _RandomNameSequence returns a six-character string 156 s = next(self.r) 157 self.nameCheck(s, '', '', '') 158 159 def test_many(self): 160 # _RandomNameSequence returns no duplicate strings (stochastic) 161 162 dict = {} 163 r = self.r 164 for i in range(TEST_FILES): 165 s = next(r) 166 self.nameCheck(s, '', '', '') 167 self.assertNotIn(s, dict) 168 dict[s] = 1 169 170 def supports_iter(self): 171 # _RandomNameSequence supports the iterator protocol 172 173 i = 0 174 r = self.r 175 for s in r: 176 i += 1 177 if i == 20: 178 break 179 180 @unittest.skipUnless(hasattr(os, 'fork'), 181 "os.fork is required for this test") 182 def test_process_awareness(self): 183 # ensure that the random source differs between 184 # child and parent. 185 read_fd, write_fd = os.pipe() 186 pid = None 187 try: 188 pid = os.fork() 189 if not pid: 190 # child process 191 os.close(read_fd) 192 os.write(write_fd, next(self.r).encode("ascii")) 193 os.close(write_fd) 194 # bypass the normal exit handlers- leave those to 195 # the parent. 196 os._exit(0) 197 198 # parent process 199 parent_value = next(self.r) 200 child_value = os.read(read_fd, len(parent_value)).decode("ascii") 201 finally: 202 if pid: 203 # best effort to ensure the process can't bleed out 204 # via any bugs above 205 try: 206 os.kill(pid, signal.SIGKILL) 207 except OSError: 208 pass 209 210 # Read the process exit status to avoid zombie process 211 os.waitpid(pid, 0) 212 213 os.close(read_fd) 214 os.close(write_fd) 215 self.assertNotEqual(child_value, parent_value) 216 217 218 219class TestCandidateTempdirList(BaseTestCase): 220 """Test the internal function _candidate_tempdir_list.""" 221 222 def test_nonempty_list(self): 223 # _candidate_tempdir_list returns a nonempty list of strings 224 225 cand = tempfile._candidate_tempdir_list() 226 227 self.assertFalse(len(cand) == 0) 228 for c in cand: 229 self.assertIsInstance(c, str) 230 231 def test_wanted_dirs(self): 232 # _candidate_tempdir_list contains the expected directories 233 234 # Make sure the interesting environment variables are all set. 235 with support.EnvironmentVarGuard() as env: 236 for envname in 'TMPDIR', 'TEMP', 'TMP': 237 dirname = os.getenv(envname) 238 if not dirname: 239 env[envname] = os.path.abspath(envname) 240 241 cand = tempfile._candidate_tempdir_list() 242 243 for envname in 'TMPDIR', 'TEMP', 'TMP': 244 dirname = os.getenv(envname) 245 if not dirname: raise ValueError 246 self.assertIn(dirname, cand) 247 248 try: 249 dirname = os.getcwd() 250 except (AttributeError, OSError): 251 dirname = os.curdir 252 253 self.assertIn(dirname, cand) 254 255 # Not practical to try to verify the presence of OS-specific 256 # paths in this list. 257 258 259# We test _get_default_tempdir some more by testing gettempdir. 260 261class TestGetDefaultTempdir(BaseTestCase): 262 """Test _get_default_tempdir().""" 263 264 def test_no_files_left_behind(self): 265 # use a private empty directory 266 with tempfile.TemporaryDirectory() as our_temp_directory: 267 # force _get_default_tempdir() to consider our empty directory 268 def our_candidate_list(): 269 return [our_temp_directory] 270 271 with support.swap_attr(tempfile, "_candidate_tempdir_list", 272 our_candidate_list): 273 # verify our directory is empty after _get_default_tempdir() 274 tempfile._get_default_tempdir() 275 self.assertEqual(os.listdir(our_temp_directory), []) 276 277 def raise_OSError(*args, **kwargs): 278 raise OSError() 279 280 with support.swap_attr(io, "open", raise_OSError): 281 # test again with failing io.open() 282 with self.assertRaises(FileNotFoundError): 283 tempfile._get_default_tempdir() 284 self.assertEqual(os.listdir(our_temp_directory), []) 285 286 def bad_writer(*args, **kwargs): 287 fp = orig_open(*args, **kwargs) 288 fp.write = raise_OSError 289 return fp 290 291 with support.swap_attr(io, "open", bad_writer) as orig_open: 292 # test again with failing write() 293 with self.assertRaises(FileNotFoundError): 294 tempfile._get_default_tempdir() 295 self.assertEqual(os.listdir(our_temp_directory), []) 296 297 298class TestGetCandidateNames(BaseTestCase): 299 """Test the internal function _get_candidate_names.""" 300 301 def test_retval(self): 302 # _get_candidate_names returns a _RandomNameSequence object 303 obj = tempfile._get_candidate_names() 304 self.assertIsInstance(obj, tempfile._RandomNameSequence) 305 306 def test_same_thing(self): 307 # _get_candidate_names always returns the same object 308 a = tempfile._get_candidate_names() 309 b = tempfile._get_candidate_names() 310 311 self.assertTrue(a is b) 312 313 314@contextlib.contextmanager 315def _inside_empty_temp_dir(): 316 dir = tempfile.mkdtemp() 317 try: 318 with support.swap_attr(tempfile, 'tempdir', dir): 319 yield 320 finally: 321 support.rmtree(dir) 322 323 324def _mock_candidate_names(*names): 325 return support.swap_attr(tempfile, 326 '_get_candidate_names', 327 lambda: iter(names)) 328 329 330class TestBadTempdir: 331 332 def test_read_only_directory(self): 333 with _inside_empty_temp_dir(): 334 oldmode = mode = os.stat(tempfile.tempdir).st_mode 335 mode &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH) 336 os.chmod(tempfile.tempdir, mode) 337 try: 338 if os.access(tempfile.tempdir, os.W_OK): 339 self.skipTest("can't set the directory read-only") 340 with self.assertRaises(PermissionError): 341 self.make_temp() 342 self.assertEqual(os.listdir(tempfile.tempdir), []) 343 finally: 344 os.chmod(tempfile.tempdir, oldmode) 345 346 def test_nonexisting_directory(self): 347 with _inside_empty_temp_dir(): 348 tempdir = os.path.join(tempfile.tempdir, 'nonexistent') 349 with support.swap_attr(tempfile, 'tempdir', tempdir): 350 with self.assertRaises(FileNotFoundError): 351 self.make_temp() 352 353 def test_non_directory(self): 354 with _inside_empty_temp_dir(): 355 tempdir = os.path.join(tempfile.tempdir, 'file') 356 open(tempdir, 'wb').close() 357 with support.swap_attr(tempfile, 'tempdir', tempdir): 358 with self.assertRaises((NotADirectoryError, FileNotFoundError)): 359 self.make_temp() 360 361 362class TestMkstempInner(TestBadTempdir, BaseTestCase): 363 """Test the internal function _mkstemp_inner.""" 364 365 class mkstemped: 366 _bflags = tempfile._bin_openflags 367 _tflags = tempfile._text_openflags 368 _close = os.close 369 _unlink = os.unlink 370 371 def __init__(self, dir, pre, suf, bin): 372 if bin: flags = self._bflags 373 else: flags = self._tflags 374 375 output_type = tempfile._infer_return_type(dir, pre, suf) 376 (self.fd, self.name) = tempfile._mkstemp_inner(dir, pre, suf, flags, output_type) 377 378 def write(self, str): 379 os.write(self.fd, str) 380 381 def __del__(self): 382 self._close(self.fd) 383 self._unlink(self.name) 384 385 def do_create(self, dir=None, pre=None, suf=None, bin=1): 386 output_type = tempfile._infer_return_type(dir, pre, suf) 387 if dir is None: 388 if output_type is str: 389 dir = tempfile.gettempdir() 390 else: 391 dir = tempfile.gettempdirb() 392 if pre is None: 393 pre = output_type() 394 if suf is None: 395 suf = output_type() 396 file = self.mkstemped(dir, pre, suf, bin) 397 398 self.nameCheck(file.name, dir, pre, suf) 399 return file 400 401 def test_basic(self): 402 # _mkstemp_inner can create files 403 self.do_create().write(b"blat") 404 self.do_create(pre="a").write(b"blat") 405 self.do_create(suf="b").write(b"blat") 406 self.do_create(pre="a", suf="b").write(b"blat") 407 self.do_create(pre="aa", suf=".txt").write(b"blat") 408 409 def test_basic_with_bytes_names(self): 410 # _mkstemp_inner can create files when given name parts all 411 # specified as bytes. 412 dir_b = tempfile.gettempdirb() 413 self.do_create(dir=dir_b, suf=b"").write(b"blat") 414 self.do_create(dir=dir_b, pre=b"a").write(b"blat") 415 self.do_create(dir=dir_b, suf=b"b").write(b"blat") 416 self.do_create(dir=dir_b, pre=b"a", suf=b"b").write(b"blat") 417 self.do_create(dir=dir_b, pre=b"aa", suf=b".txt").write(b"blat") 418 # Can't mix str & binary types in the args. 419 with self.assertRaises(TypeError): 420 self.do_create(dir="", suf=b"").write(b"blat") 421 with self.assertRaises(TypeError): 422 self.do_create(dir=dir_b, pre="").write(b"blat") 423 with self.assertRaises(TypeError): 424 self.do_create(dir=dir_b, pre=b"", suf="").write(b"blat") 425 426 def test_basic_many(self): 427 # _mkstemp_inner can create many files (stochastic) 428 extant = list(range(TEST_FILES)) 429 for i in extant: 430 extant[i] = self.do_create(pre="aa") 431 432 def test_choose_directory(self): 433 # _mkstemp_inner can create files in a user-selected directory 434 dir = tempfile.mkdtemp() 435 try: 436 self.do_create(dir=dir).write(b"blat") 437 self.do_create(dir=pathlib.Path(dir)).write(b"blat") 438 finally: 439 os.rmdir(dir) 440 441 def test_file_mode(self): 442 # _mkstemp_inner creates files with the proper mode 443 444 file = self.do_create() 445 mode = stat.S_IMODE(os.stat(file.name).st_mode) 446 expected = 0o600 447 if sys.platform == 'win32': 448 # There's no distinction among 'user', 'group' and 'world'; 449 # replicate the 'user' bits. 450 user = expected >> 6 451 expected = user * (1 + 8 + 64) 452 self.assertEqual(mode, expected) 453 454 @unittest.skipUnless(has_spawnl, 'os.spawnl not available') 455 def test_noinherit(self): 456 # _mkstemp_inner file handles are not inherited by child processes 457 458 if support.verbose: 459 v="v" 460 else: 461 v="q" 462 463 file = self.do_create() 464 self.assertEqual(os.get_inheritable(file.fd), False) 465 fd = "%d" % file.fd 466 467 try: 468 me = __file__ 469 except NameError: 470 me = sys.argv[0] 471 472 # We have to exec something, so that FD_CLOEXEC will take 473 # effect. The core of this test is therefore in 474 # tf_inherit_check.py, which see. 475 tester = os.path.join(os.path.dirname(os.path.abspath(me)), 476 "tf_inherit_check.py") 477 478 # On Windows a spawn* /path/ with embedded spaces shouldn't be quoted, 479 # but an arg with embedded spaces should be decorated with double 480 # quotes on each end 481 if sys.platform == 'win32': 482 decorated = '"%s"' % sys.executable 483 tester = '"%s"' % tester 484 else: 485 decorated = sys.executable 486 487 retval = os.spawnl(os.P_WAIT, sys.executable, decorated, tester, v, fd) 488 self.assertFalse(retval < 0, 489 "child process caught fatal signal %d" % -retval) 490 self.assertFalse(retval > 0, "child process reports failure %d"%retval) 491 492 @unittest.skipUnless(has_textmode, "text mode not available") 493 def test_textmode(self): 494 # _mkstemp_inner can create files in text mode 495 496 # A text file is truncated at the first Ctrl+Z byte 497 f = self.do_create(bin=0) 498 f.write(b"blat\x1a") 499 f.write(b"extra\n") 500 os.lseek(f.fd, 0, os.SEEK_SET) 501 self.assertEqual(os.read(f.fd, 20), b"blat") 502 503 def make_temp(self): 504 return tempfile._mkstemp_inner(tempfile.gettempdir(), 505 tempfile.gettempprefix(), 506 '', 507 tempfile._bin_openflags, 508 str) 509 510 def test_collision_with_existing_file(self): 511 # _mkstemp_inner tries another name when a file with 512 # the chosen name already exists 513 with _inside_empty_temp_dir(), \ 514 _mock_candidate_names('aaa', 'aaa', 'bbb'): 515 (fd1, name1) = self.make_temp() 516 os.close(fd1) 517 self.assertTrue(name1.endswith('aaa')) 518 519 (fd2, name2) = self.make_temp() 520 os.close(fd2) 521 self.assertTrue(name2.endswith('bbb')) 522 523 def test_collision_with_existing_directory(self): 524 # _mkstemp_inner tries another name when a directory with 525 # the chosen name already exists 526 with _inside_empty_temp_dir(), \ 527 _mock_candidate_names('aaa', 'aaa', 'bbb'): 528 dir = tempfile.mkdtemp() 529 self.assertTrue(dir.endswith('aaa')) 530 531 (fd, name) = self.make_temp() 532 os.close(fd) 533 self.assertTrue(name.endswith('bbb')) 534 535 536class TestGetTempPrefix(BaseTestCase): 537 """Test gettempprefix().""" 538 539 def test_sane_template(self): 540 # gettempprefix returns a nonempty prefix string 541 p = tempfile.gettempprefix() 542 543 self.assertIsInstance(p, str) 544 self.assertGreater(len(p), 0) 545 546 pb = tempfile.gettempprefixb() 547 548 self.assertIsInstance(pb, bytes) 549 self.assertGreater(len(pb), 0) 550 551 def test_usable_template(self): 552 # gettempprefix returns a usable prefix string 553 554 # Create a temp directory, avoiding use of the prefix. 555 # Then attempt to create a file whose name is 556 # prefix + 'xxxxxx.xxx' in that directory. 557 p = tempfile.gettempprefix() + "xxxxxx.xxx" 558 d = tempfile.mkdtemp(prefix="") 559 try: 560 p = os.path.join(d, p) 561 fd = os.open(p, os.O_RDWR | os.O_CREAT) 562 os.close(fd) 563 os.unlink(p) 564 finally: 565 os.rmdir(d) 566 567 568class TestGetTempDir(BaseTestCase): 569 """Test gettempdir().""" 570 571 def test_directory_exists(self): 572 # gettempdir returns a directory which exists 573 574 for d in (tempfile.gettempdir(), tempfile.gettempdirb()): 575 self.assertTrue(os.path.isabs(d) or d == os.curdir, 576 "%r is not an absolute path" % d) 577 self.assertTrue(os.path.isdir(d), 578 "%r is not a directory" % d) 579 580 def test_directory_writable(self): 581 # gettempdir returns a directory writable by the user 582 583 # sneaky: just instantiate a NamedTemporaryFile, which 584 # defaults to writing into the directory returned by 585 # gettempdir. 586 with tempfile.NamedTemporaryFile() as file: 587 file.write(b"blat") 588 589 def test_same_thing(self): 590 # gettempdir always returns the same object 591 a = tempfile.gettempdir() 592 b = tempfile.gettempdir() 593 c = tempfile.gettempdirb() 594 595 self.assertTrue(a is b) 596 self.assertNotEqual(type(a), type(c)) 597 self.assertEqual(a, os.fsdecode(c)) 598 599 def test_case_sensitive(self): 600 # gettempdir should not flatten its case 601 # even on a case-insensitive file system 602 case_sensitive_tempdir = tempfile.mkdtemp("-Temp") 603 _tempdir, tempfile.tempdir = tempfile.tempdir, None 604 try: 605 with support.EnvironmentVarGuard() as env: 606 # Fake the first env var which is checked as a candidate 607 env["TMPDIR"] = case_sensitive_tempdir 608 self.assertEqual(tempfile.gettempdir(), case_sensitive_tempdir) 609 finally: 610 tempfile.tempdir = _tempdir 611 support.rmdir(case_sensitive_tempdir) 612 613 614class TestMkstemp(BaseTestCase): 615 """Test mkstemp().""" 616 617 def do_create(self, dir=None, pre=None, suf=None): 618 output_type = tempfile._infer_return_type(dir, pre, suf) 619 if dir is None: 620 if output_type is str: 621 dir = tempfile.gettempdir() 622 else: 623 dir = tempfile.gettempdirb() 624 if pre is None: 625 pre = output_type() 626 if suf is None: 627 suf = output_type() 628 (fd, name) = tempfile.mkstemp(dir=dir, prefix=pre, suffix=suf) 629 (ndir, nbase) = os.path.split(name) 630 adir = os.path.abspath(dir) 631 self.assertEqual(adir, ndir, 632 "Directory '%s' incorrectly returned as '%s'" % (adir, ndir)) 633 634 try: 635 self.nameCheck(name, dir, pre, suf) 636 finally: 637 os.close(fd) 638 os.unlink(name) 639 640 def test_basic(self): 641 # mkstemp can create files 642 self.do_create() 643 self.do_create(pre="a") 644 self.do_create(suf="b") 645 self.do_create(pre="a", suf="b") 646 self.do_create(pre="aa", suf=".txt") 647 self.do_create(dir=".") 648 649 def test_basic_with_bytes_names(self): 650 # mkstemp can create files when given name parts all 651 # specified as bytes. 652 d = tempfile.gettempdirb() 653 self.do_create(dir=d, suf=b"") 654 self.do_create(dir=d, pre=b"a") 655 self.do_create(dir=d, suf=b"b") 656 self.do_create(dir=d, pre=b"a", suf=b"b") 657 self.do_create(dir=d, pre=b"aa", suf=b".txt") 658 self.do_create(dir=b".") 659 with self.assertRaises(TypeError): 660 self.do_create(dir=".", pre=b"aa", suf=b".txt") 661 with self.assertRaises(TypeError): 662 self.do_create(dir=b".", pre="aa", suf=b".txt") 663 with self.assertRaises(TypeError): 664 self.do_create(dir=b".", pre=b"aa", suf=".txt") 665 666 667 def test_choose_directory(self): 668 # mkstemp can create directories in a user-selected directory 669 dir = tempfile.mkdtemp() 670 try: 671 self.do_create(dir=dir) 672 self.do_create(dir=pathlib.Path(dir)) 673 finally: 674 os.rmdir(dir) 675 676 677class TestMkdtemp(TestBadTempdir, BaseTestCase): 678 """Test mkdtemp().""" 679 680 def make_temp(self): 681 return tempfile.mkdtemp() 682 683 def do_create(self, dir=None, pre=None, suf=None): 684 output_type = tempfile._infer_return_type(dir, pre, suf) 685 if dir is None: 686 if output_type is str: 687 dir = tempfile.gettempdir() 688 else: 689 dir = tempfile.gettempdirb() 690 if pre is None: 691 pre = output_type() 692 if suf is None: 693 suf = output_type() 694 name = tempfile.mkdtemp(dir=dir, prefix=pre, suffix=suf) 695 696 try: 697 self.nameCheck(name, dir, pre, suf) 698 return name 699 except: 700 os.rmdir(name) 701 raise 702 703 def test_basic(self): 704 # mkdtemp can create directories 705 os.rmdir(self.do_create()) 706 os.rmdir(self.do_create(pre="a")) 707 os.rmdir(self.do_create(suf="b")) 708 os.rmdir(self.do_create(pre="a", suf="b")) 709 os.rmdir(self.do_create(pre="aa", suf=".txt")) 710 711 def test_basic_with_bytes_names(self): 712 # mkdtemp can create directories when given all binary parts 713 d = tempfile.gettempdirb() 714 os.rmdir(self.do_create(dir=d)) 715 os.rmdir(self.do_create(dir=d, pre=b"a")) 716 os.rmdir(self.do_create(dir=d, suf=b"b")) 717 os.rmdir(self.do_create(dir=d, pre=b"a", suf=b"b")) 718 os.rmdir(self.do_create(dir=d, pre=b"aa", suf=b".txt")) 719 with self.assertRaises(TypeError): 720 os.rmdir(self.do_create(dir=d, pre="aa", suf=b".txt")) 721 with self.assertRaises(TypeError): 722 os.rmdir(self.do_create(dir=d, pre=b"aa", suf=".txt")) 723 with self.assertRaises(TypeError): 724 os.rmdir(self.do_create(dir="", pre=b"aa", suf=b".txt")) 725 726 def test_basic_many(self): 727 # mkdtemp can create many directories (stochastic) 728 extant = list(range(TEST_FILES)) 729 try: 730 for i in extant: 731 extant[i] = self.do_create(pre="aa") 732 finally: 733 for i in extant: 734 if(isinstance(i, str)): 735 os.rmdir(i) 736 737 def test_choose_directory(self): 738 # mkdtemp can create directories in a user-selected directory 739 dir = tempfile.mkdtemp() 740 try: 741 os.rmdir(self.do_create(dir=dir)) 742 os.rmdir(self.do_create(dir=pathlib.Path(dir))) 743 finally: 744 os.rmdir(dir) 745 746 def test_mode(self): 747 # mkdtemp creates directories with the proper mode 748 749 dir = self.do_create() 750 try: 751 mode = stat.S_IMODE(os.stat(dir).st_mode) 752 mode &= 0o777 # Mask off sticky bits inherited from /tmp 753 expected = 0o700 754 if sys.platform == 'win32': 755 # There's no distinction among 'user', 'group' and 'world'; 756 # replicate the 'user' bits. 757 user = expected >> 6 758 expected = user * (1 + 8 + 64) 759 self.assertEqual(mode, expected) 760 finally: 761 os.rmdir(dir) 762 763 def test_collision_with_existing_file(self): 764 # mkdtemp tries another name when a file with 765 # the chosen name already exists 766 with _inside_empty_temp_dir(), \ 767 _mock_candidate_names('aaa', 'aaa', 'bbb'): 768 file = tempfile.NamedTemporaryFile(delete=False) 769 file.close() 770 self.assertTrue(file.name.endswith('aaa')) 771 dir = tempfile.mkdtemp() 772 self.assertTrue(dir.endswith('bbb')) 773 774 def test_collision_with_existing_directory(self): 775 # mkdtemp tries another name when a directory with 776 # the chosen name already exists 777 with _inside_empty_temp_dir(), \ 778 _mock_candidate_names('aaa', 'aaa', 'bbb'): 779 dir1 = tempfile.mkdtemp() 780 self.assertTrue(dir1.endswith('aaa')) 781 dir2 = tempfile.mkdtemp() 782 self.assertTrue(dir2.endswith('bbb')) 783 784 785class TestMktemp(BaseTestCase): 786 """Test mktemp().""" 787 788 # For safety, all use of mktemp must occur in a private directory. 789 # We must also suppress the RuntimeWarning it generates. 790 def setUp(self): 791 self.dir = tempfile.mkdtemp() 792 super().setUp() 793 794 def tearDown(self): 795 if self.dir: 796 os.rmdir(self.dir) 797 self.dir = None 798 super().tearDown() 799 800 class mktemped: 801 _unlink = os.unlink 802 _bflags = tempfile._bin_openflags 803 804 def __init__(self, dir, pre, suf): 805 self.name = tempfile.mktemp(dir=dir, prefix=pre, suffix=suf) 806 # Create the file. This will raise an exception if it's 807 # mysteriously appeared in the meanwhile. 808 os.close(os.open(self.name, self._bflags, 0o600)) 809 810 def __del__(self): 811 self._unlink(self.name) 812 813 def do_create(self, pre="", suf=""): 814 file = self.mktemped(self.dir, pre, suf) 815 816 self.nameCheck(file.name, self.dir, pre, suf) 817 return file 818 819 def test_basic(self): 820 # mktemp can choose usable file names 821 self.do_create() 822 self.do_create(pre="a") 823 self.do_create(suf="b") 824 self.do_create(pre="a", suf="b") 825 self.do_create(pre="aa", suf=".txt") 826 827 def test_many(self): 828 # mktemp can choose many usable file names (stochastic) 829 extant = list(range(TEST_FILES)) 830 for i in extant: 831 extant[i] = self.do_create(pre="aa") 832 833## def test_warning(self): 834## # mktemp issues a warning when used 835## warnings.filterwarnings("error", 836## category=RuntimeWarning, 837## message="mktemp") 838## self.assertRaises(RuntimeWarning, 839## tempfile.mktemp, dir=self.dir) 840 841 842# We test _TemporaryFileWrapper by testing NamedTemporaryFile. 843 844 845class TestNamedTemporaryFile(BaseTestCase): 846 """Test NamedTemporaryFile().""" 847 848 def do_create(self, dir=None, pre="", suf="", delete=True): 849 if dir is None: 850 dir = tempfile.gettempdir() 851 file = tempfile.NamedTemporaryFile(dir=dir, prefix=pre, suffix=suf, 852 delete=delete) 853 854 self.nameCheck(file.name, dir, pre, suf) 855 return file 856 857 858 def test_basic(self): 859 # NamedTemporaryFile can create files 860 self.do_create() 861 self.do_create(pre="a") 862 self.do_create(suf="b") 863 self.do_create(pre="a", suf="b") 864 self.do_create(pre="aa", suf=".txt") 865 866 def test_method_lookup(self): 867 # Issue #18879: Looking up a temporary file method should keep it 868 # alive long enough. 869 f = self.do_create() 870 wr = weakref.ref(f) 871 write = f.write 872 write2 = f.write 873 del f 874 write(b'foo') 875 del write 876 write2(b'bar') 877 del write2 878 if support.check_impl_detail(cpython=True): 879 # No reference cycle was created. 880 self.assertIsNone(wr()) 881 882 def test_iter(self): 883 # Issue #23700: getting iterator from a temporary file should keep 884 # it alive as long as it's being iterated over 885 lines = [b'spam\n', b'eggs\n', b'beans\n'] 886 def make_file(): 887 f = tempfile.NamedTemporaryFile(mode='w+b') 888 f.write(b''.join(lines)) 889 f.seek(0) 890 return f 891 for i, l in enumerate(make_file()): 892 self.assertEqual(l, lines[i]) 893 self.assertEqual(i, len(lines) - 1) 894 895 def test_creates_named(self): 896 # NamedTemporaryFile creates files with names 897 f = tempfile.NamedTemporaryFile() 898 self.assertTrue(os.path.exists(f.name), 899 "NamedTemporaryFile %s does not exist" % f.name) 900 901 def test_del_on_close(self): 902 # A NamedTemporaryFile is deleted when closed 903 dir = tempfile.mkdtemp() 904 try: 905 with tempfile.NamedTemporaryFile(dir=dir) as f: 906 f.write(b'blat') 907 self.assertFalse(os.path.exists(f.name), 908 "NamedTemporaryFile %s exists after close" % f.name) 909 finally: 910 os.rmdir(dir) 911 912 def test_dis_del_on_close(self): 913 # Tests that delete-on-close can be disabled 914 dir = tempfile.mkdtemp() 915 tmp = None 916 try: 917 f = tempfile.NamedTemporaryFile(dir=dir, delete=False) 918 tmp = f.name 919 f.write(b'blat') 920 f.close() 921 self.assertTrue(os.path.exists(f.name), 922 "NamedTemporaryFile %s missing after close" % f.name) 923 finally: 924 if tmp is not None: 925 os.unlink(tmp) 926 os.rmdir(dir) 927 928 def test_multiple_close(self): 929 # A NamedTemporaryFile can be closed many times without error 930 f = tempfile.NamedTemporaryFile() 931 f.write(b'abc\n') 932 f.close() 933 f.close() 934 f.close() 935 936 def test_context_manager(self): 937 # A NamedTemporaryFile can be used as a context manager 938 with tempfile.NamedTemporaryFile() as f: 939 self.assertTrue(os.path.exists(f.name)) 940 self.assertFalse(os.path.exists(f.name)) 941 def use_closed(): 942 with f: 943 pass 944 self.assertRaises(ValueError, use_closed) 945 946 def test_no_leak_fd(self): 947 # Issue #21058: don't leak file descriptor when io.open() fails 948 closed = [] 949 os_close = os.close 950 def close(fd): 951 closed.append(fd) 952 os_close(fd) 953 954 with mock.patch('os.close', side_effect=close): 955 with mock.patch('io.open', side_effect=ValueError): 956 self.assertRaises(ValueError, tempfile.NamedTemporaryFile) 957 self.assertEqual(len(closed), 1) 958 959 def test_bad_mode(self): 960 dir = tempfile.mkdtemp() 961 self.addCleanup(support.rmtree, dir) 962 with self.assertRaises(ValueError): 963 tempfile.NamedTemporaryFile(mode='wr', dir=dir) 964 with self.assertRaises(TypeError): 965 tempfile.NamedTemporaryFile(mode=2, dir=dir) 966 self.assertEqual(os.listdir(dir), []) 967 968 # How to test the mode and bufsize parameters? 969 970class TestSpooledTemporaryFile(BaseTestCase): 971 """Test SpooledTemporaryFile().""" 972 973 def do_create(self, max_size=0, dir=None, pre="", suf=""): 974 if dir is None: 975 dir = tempfile.gettempdir() 976 file = tempfile.SpooledTemporaryFile(max_size=max_size, dir=dir, prefix=pre, suffix=suf) 977 978 return file 979 980 981 def test_basic(self): 982 # SpooledTemporaryFile can create files 983 f = self.do_create() 984 self.assertFalse(f._rolled) 985 f = self.do_create(max_size=100, pre="a", suf=".txt") 986 self.assertFalse(f._rolled) 987 988 def test_del_on_close(self): 989 # A SpooledTemporaryFile is deleted when closed 990 dir = tempfile.mkdtemp() 991 try: 992 f = tempfile.SpooledTemporaryFile(max_size=10, dir=dir) 993 self.assertFalse(f._rolled) 994 f.write(b'blat ' * 5) 995 self.assertTrue(f._rolled) 996 filename = f.name 997 f.close() 998 self.assertFalse(isinstance(filename, str) and os.path.exists(filename), 999 "SpooledTemporaryFile %s exists after close" % filename) 1000 finally: 1001 os.rmdir(dir) 1002 1003 def test_rewrite_small(self): 1004 # A SpooledTemporaryFile can be written to multiple within the max_size 1005 f = self.do_create(max_size=30) 1006 self.assertFalse(f._rolled) 1007 for i in range(5): 1008 f.seek(0, 0) 1009 f.write(b'x' * 20) 1010 self.assertFalse(f._rolled) 1011 1012 def test_write_sequential(self): 1013 # A SpooledTemporaryFile should hold exactly max_size bytes, and roll 1014 # over afterward 1015 f = self.do_create(max_size=30) 1016 self.assertFalse(f._rolled) 1017 f.write(b'x' * 20) 1018 self.assertFalse(f._rolled) 1019 f.write(b'x' * 10) 1020 self.assertFalse(f._rolled) 1021 f.write(b'x') 1022 self.assertTrue(f._rolled) 1023 1024 def test_writelines(self): 1025 # Verify writelines with a SpooledTemporaryFile 1026 f = self.do_create() 1027 f.writelines((b'x', b'y', b'z')) 1028 pos = f.seek(0) 1029 self.assertEqual(pos, 0) 1030 buf = f.read() 1031 self.assertEqual(buf, b'xyz') 1032 1033 def test_writelines_sequential(self): 1034 # A SpooledTemporaryFile should hold exactly max_size bytes, and roll 1035 # over afterward 1036 f = self.do_create(max_size=35) 1037 f.writelines((b'x' * 20, b'x' * 10, b'x' * 5)) 1038 self.assertFalse(f._rolled) 1039 f.write(b'x') 1040 self.assertTrue(f._rolled) 1041 1042 def test_sparse(self): 1043 # A SpooledTemporaryFile that is written late in the file will extend 1044 # when that occurs 1045 f = self.do_create(max_size=30) 1046 self.assertFalse(f._rolled) 1047 pos = f.seek(100, 0) 1048 self.assertEqual(pos, 100) 1049 self.assertFalse(f._rolled) 1050 f.write(b'x') 1051 self.assertTrue(f._rolled) 1052 1053 def test_fileno(self): 1054 # A SpooledTemporaryFile should roll over to a real file on fileno() 1055 f = self.do_create(max_size=30) 1056 self.assertFalse(f._rolled) 1057 self.assertTrue(f.fileno() > 0) 1058 self.assertTrue(f._rolled) 1059 1060 def test_multiple_close_before_rollover(self): 1061 # A SpooledTemporaryFile can be closed many times without error 1062 f = tempfile.SpooledTemporaryFile() 1063 f.write(b'abc\n') 1064 self.assertFalse(f._rolled) 1065 f.close() 1066 f.close() 1067 f.close() 1068 1069 def test_multiple_close_after_rollover(self): 1070 # A SpooledTemporaryFile can be closed many times without error 1071 f = tempfile.SpooledTemporaryFile(max_size=1) 1072 f.write(b'abc\n') 1073 self.assertTrue(f._rolled) 1074 f.close() 1075 f.close() 1076 f.close() 1077 1078 def test_bound_methods(self): 1079 # It should be OK to steal a bound method from a SpooledTemporaryFile 1080 # and use it independently; when the file rolls over, those bound 1081 # methods should continue to function 1082 f = self.do_create(max_size=30) 1083 read = f.read 1084 write = f.write 1085 seek = f.seek 1086 1087 write(b"a" * 35) 1088 write(b"b" * 35) 1089 seek(0, 0) 1090 self.assertEqual(read(70), b'a'*35 + b'b'*35) 1091 1092 def test_properties(self): 1093 f = tempfile.SpooledTemporaryFile(max_size=10) 1094 f.write(b'x' * 10) 1095 self.assertFalse(f._rolled) 1096 self.assertEqual(f.mode, 'w+b') 1097 self.assertIsNone(f.name) 1098 with self.assertRaises(AttributeError): 1099 f.newlines 1100 with self.assertRaises(AttributeError): 1101 f.encoding 1102 with self.assertRaises(AttributeError): 1103 f.errors 1104 1105 f.write(b'x') 1106 self.assertTrue(f._rolled) 1107 self.assertEqual(f.mode, 'rb+') 1108 self.assertIsNotNone(f.name) 1109 with self.assertRaises(AttributeError): 1110 f.newlines 1111 with self.assertRaises(AttributeError): 1112 f.encoding 1113 with self.assertRaises(AttributeError): 1114 f.errors 1115 1116 def test_text_mode(self): 1117 # Creating a SpooledTemporaryFile with a text mode should produce 1118 # a file object reading and writing (Unicode) text strings. 1119 f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10, 1120 encoding="utf-8") 1121 f.write("abc\n") 1122 f.seek(0) 1123 self.assertEqual(f.read(), "abc\n") 1124 f.write("def\n") 1125 f.seek(0) 1126 self.assertEqual(f.read(), "abc\ndef\n") 1127 self.assertFalse(f._rolled) 1128 self.assertEqual(f.mode, 'w+') 1129 self.assertIsNone(f.name) 1130 self.assertEqual(f.newlines, os.linesep) 1131 self.assertEqual(f.encoding, "utf-8") 1132 self.assertEqual(f.errors, "strict") 1133 1134 f.write("xyzzy\n") 1135 f.seek(0) 1136 self.assertEqual(f.read(), "abc\ndef\nxyzzy\n") 1137 # Check that Ctrl+Z doesn't truncate the file 1138 f.write("foo\x1abar\n") 1139 f.seek(0) 1140 self.assertEqual(f.read(), "abc\ndef\nxyzzy\nfoo\x1abar\n") 1141 self.assertTrue(f._rolled) 1142 self.assertEqual(f.mode, 'w+') 1143 self.assertIsNotNone(f.name) 1144 self.assertEqual(f.newlines, os.linesep) 1145 self.assertEqual(f.encoding, "utf-8") 1146 self.assertEqual(f.errors, "strict") 1147 1148 def test_text_newline_and_encoding(self): 1149 f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10, 1150 newline='', encoding='utf-8', 1151 errors='ignore') 1152 f.write("\u039B\r\n") 1153 f.seek(0) 1154 self.assertEqual(f.read(), "\u039B\r\n") 1155 self.assertFalse(f._rolled) 1156 self.assertEqual(f.mode, 'w+') 1157 self.assertIsNone(f.name) 1158 self.assertIsNotNone(f.newlines) 1159 self.assertEqual(f.encoding, "utf-8") 1160 self.assertEqual(f.errors, "ignore") 1161 1162 f.write("\u039C" * 10 + "\r\n") 1163 f.write("\u039D" * 20) 1164 f.seek(0) 1165 self.assertEqual(f.read(), 1166 "\u039B\r\n" + ("\u039C" * 10) + "\r\n" + ("\u039D" * 20)) 1167 self.assertTrue(f._rolled) 1168 self.assertEqual(f.mode, 'w+') 1169 self.assertIsNotNone(f.name) 1170 self.assertIsNotNone(f.newlines) 1171 self.assertEqual(f.encoding, 'utf-8') 1172 self.assertEqual(f.errors, 'ignore') 1173 1174 def test_context_manager_before_rollover(self): 1175 # A SpooledTemporaryFile can be used as a context manager 1176 with tempfile.SpooledTemporaryFile(max_size=1) as f: 1177 self.assertFalse(f._rolled) 1178 self.assertFalse(f.closed) 1179 self.assertTrue(f.closed) 1180 def use_closed(): 1181 with f: 1182 pass 1183 self.assertRaises(ValueError, use_closed) 1184 1185 def test_context_manager_during_rollover(self): 1186 # A SpooledTemporaryFile can be used as a context manager 1187 with tempfile.SpooledTemporaryFile(max_size=1) as f: 1188 self.assertFalse(f._rolled) 1189 f.write(b'abc\n') 1190 f.flush() 1191 self.assertTrue(f._rolled) 1192 self.assertFalse(f.closed) 1193 self.assertTrue(f.closed) 1194 def use_closed(): 1195 with f: 1196 pass 1197 self.assertRaises(ValueError, use_closed) 1198 1199 def test_context_manager_after_rollover(self): 1200 # A SpooledTemporaryFile can be used as a context manager 1201 f = tempfile.SpooledTemporaryFile(max_size=1) 1202 f.write(b'abc\n') 1203 f.flush() 1204 self.assertTrue(f._rolled) 1205 with f: 1206 self.assertFalse(f.closed) 1207 self.assertTrue(f.closed) 1208 def use_closed(): 1209 with f: 1210 pass 1211 self.assertRaises(ValueError, use_closed) 1212 1213 def test_truncate_with_size_parameter(self): 1214 # A SpooledTemporaryFile can be truncated to zero size 1215 f = tempfile.SpooledTemporaryFile(max_size=10) 1216 f.write(b'abcdefg\n') 1217 f.seek(0) 1218 f.truncate() 1219 self.assertFalse(f._rolled) 1220 self.assertEqual(f._file.getvalue(), b'') 1221 # A SpooledTemporaryFile can be truncated to a specific size 1222 f = tempfile.SpooledTemporaryFile(max_size=10) 1223 f.write(b'abcdefg\n') 1224 f.truncate(4) 1225 self.assertFalse(f._rolled) 1226 self.assertEqual(f._file.getvalue(), b'abcd') 1227 # A SpooledTemporaryFile rolls over if truncated to large size 1228 f = tempfile.SpooledTemporaryFile(max_size=10) 1229 f.write(b'abcdefg\n') 1230 f.truncate(20) 1231 self.assertTrue(f._rolled) 1232 self.assertEqual(os.fstat(f.fileno()).st_size, 20) 1233 1234 1235if tempfile.NamedTemporaryFile is not tempfile.TemporaryFile: 1236 1237 class TestTemporaryFile(BaseTestCase): 1238 """Test TemporaryFile().""" 1239 1240 def test_basic(self): 1241 # TemporaryFile can create files 1242 # No point in testing the name params - the file has no name. 1243 tempfile.TemporaryFile() 1244 1245 def test_has_no_name(self): 1246 # TemporaryFile creates files with no names (on this system) 1247 dir = tempfile.mkdtemp() 1248 f = tempfile.TemporaryFile(dir=dir) 1249 f.write(b'blat') 1250 1251 # Sneaky: because this file has no name, it should not prevent 1252 # us from removing the directory it was created in. 1253 try: 1254 os.rmdir(dir) 1255 except: 1256 # cleanup 1257 f.close() 1258 os.rmdir(dir) 1259 raise 1260 1261 def test_multiple_close(self): 1262 # A TemporaryFile can be closed many times without error 1263 f = tempfile.TemporaryFile() 1264 f.write(b'abc\n') 1265 f.close() 1266 f.close() 1267 f.close() 1268 1269 # How to test the mode and bufsize parameters? 1270 def test_mode_and_encoding(self): 1271 1272 def roundtrip(input, *args, **kwargs): 1273 with tempfile.TemporaryFile(*args, **kwargs) as fileobj: 1274 fileobj.write(input) 1275 fileobj.seek(0) 1276 self.assertEqual(input, fileobj.read()) 1277 1278 roundtrip(b"1234", "w+b") 1279 roundtrip("abdc\n", "w+") 1280 roundtrip("\u039B", "w+", encoding="utf-16") 1281 roundtrip("foo\r\n", "w+", newline="") 1282 1283 def test_no_leak_fd(self): 1284 # Issue #21058: don't leak file descriptor when io.open() fails 1285 closed = [] 1286 os_close = os.close 1287 def close(fd): 1288 closed.append(fd) 1289 os_close(fd) 1290 1291 with mock.patch('os.close', side_effect=close): 1292 with mock.patch('io.open', side_effect=ValueError): 1293 self.assertRaises(ValueError, tempfile.TemporaryFile) 1294 self.assertEqual(len(closed), 1) 1295 1296 1297 1298# Helper for test_del_on_shutdown 1299class NulledModules: 1300 def __init__(self, *modules): 1301 self.refs = [mod.__dict__ for mod in modules] 1302 self.contents = [ref.copy() for ref in self.refs] 1303 1304 def __enter__(self): 1305 for d in self.refs: 1306 for key in d: 1307 d[key] = None 1308 1309 def __exit__(self, *exc_info): 1310 for d, c in zip(self.refs, self.contents): 1311 d.clear() 1312 d.update(c) 1313 1314class TestTemporaryDirectory(BaseTestCase): 1315 """Test TemporaryDirectory().""" 1316 1317 def do_create(self, dir=None, pre="", suf="", recurse=1, dirs=1, files=1): 1318 if dir is None: 1319 dir = tempfile.gettempdir() 1320 tmp = tempfile.TemporaryDirectory(dir=dir, prefix=pre, suffix=suf) 1321 self.nameCheck(tmp.name, dir, pre, suf) 1322 self.do_create2(tmp.name, recurse, dirs, files) 1323 return tmp 1324 1325 def do_create2(self, path, recurse=1, dirs=1, files=1): 1326 # Create subdirectories and some files 1327 if recurse: 1328 for i in range(dirs): 1329 name = os.path.join(path, "dir%d" % i) 1330 os.mkdir(name) 1331 self.do_create2(name, recurse-1, dirs, files) 1332 for i in range(files): 1333 with open(os.path.join(path, "test%d.txt" % i), "wb") as f: 1334 f.write(b"Hello world!") 1335 1336 def test_mkdtemp_failure(self): 1337 # Check no additional exception if mkdtemp fails 1338 # Previously would raise AttributeError instead 1339 # (noted as part of Issue #10188) 1340 with tempfile.TemporaryDirectory() as nonexistent: 1341 pass 1342 with self.assertRaises(FileNotFoundError) as cm: 1343 tempfile.TemporaryDirectory(dir=nonexistent) 1344 self.assertEqual(cm.exception.errno, errno.ENOENT) 1345 1346 def test_explicit_cleanup(self): 1347 # A TemporaryDirectory is deleted when cleaned up 1348 dir = tempfile.mkdtemp() 1349 try: 1350 d = self.do_create(dir=dir) 1351 self.assertTrue(os.path.exists(d.name), 1352 "TemporaryDirectory %s does not exist" % d.name) 1353 d.cleanup() 1354 self.assertFalse(os.path.exists(d.name), 1355 "TemporaryDirectory %s exists after cleanup" % d.name) 1356 finally: 1357 os.rmdir(dir) 1358 1359 @support.skip_unless_symlink 1360 def test_cleanup_with_symlink_to_a_directory(self): 1361 # cleanup() should not follow symlinks to directories (issue #12464) 1362 d1 = self.do_create() 1363 d2 = self.do_create(recurse=0) 1364 1365 # Symlink d1/foo -> d2 1366 os.symlink(d2.name, os.path.join(d1.name, "foo")) 1367 1368 # This call to cleanup() should not follow the "foo" symlink 1369 d1.cleanup() 1370 1371 self.assertFalse(os.path.exists(d1.name), 1372 "TemporaryDirectory %s exists after cleanup" % d1.name) 1373 self.assertTrue(os.path.exists(d2.name), 1374 "Directory pointed to by a symlink was deleted") 1375 self.assertEqual(os.listdir(d2.name), ['test0.txt'], 1376 "Contents of the directory pointed to by a symlink " 1377 "were deleted") 1378 d2.cleanup() 1379 1380 @support.cpython_only 1381 def test_del_on_collection(self): 1382 # A TemporaryDirectory is deleted when garbage collected 1383 dir = tempfile.mkdtemp() 1384 try: 1385 d = self.do_create(dir=dir) 1386 name = d.name 1387 del d # Rely on refcounting to invoke __del__ 1388 self.assertFalse(os.path.exists(name), 1389 "TemporaryDirectory %s exists after __del__" % name) 1390 finally: 1391 os.rmdir(dir) 1392 1393 def test_del_on_shutdown(self): 1394 # A TemporaryDirectory may be cleaned up during shutdown 1395 with self.do_create() as dir: 1396 for mod in ('builtins', 'os', 'shutil', 'sys', 'tempfile', 'warnings'): 1397 code = """if True: 1398 import builtins 1399 import os 1400 import shutil 1401 import sys 1402 import tempfile 1403 import warnings 1404 1405 tmp = tempfile.TemporaryDirectory(dir={dir!r}) 1406 sys.stdout.buffer.write(tmp.name.encode()) 1407 1408 tmp2 = os.path.join(tmp.name, 'test_dir') 1409 os.mkdir(tmp2) 1410 with open(os.path.join(tmp2, "test0.txt"), "w") as f: 1411 f.write("Hello world!") 1412 1413 {mod}.tmp = tmp 1414 1415 warnings.filterwarnings("always", category=ResourceWarning) 1416 """.format(dir=dir, mod=mod) 1417 rc, out, err = script_helper.assert_python_ok("-c", code) 1418 tmp_name = out.decode().strip() 1419 self.assertFalse(os.path.exists(tmp_name), 1420 "TemporaryDirectory %s exists after cleanup" % tmp_name) 1421 err = err.decode('utf-8', 'backslashreplace') 1422 self.assertNotIn("Exception ", err) 1423 self.assertIn("ResourceWarning: Implicitly cleaning up", err) 1424 1425 def test_exit_on_shutdown(self): 1426 # Issue #22427 1427 with self.do_create() as dir: 1428 code = """if True: 1429 import sys 1430 import tempfile 1431 import warnings 1432 1433 def generator(): 1434 with tempfile.TemporaryDirectory(dir={dir!r}) as tmp: 1435 yield tmp 1436 g = generator() 1437 sys.stdout.buffer.write(next(g).encode()) 1438 1439 warnings.filterwarnings("always", category=ResourceWarning) 1440 """.format(dir=dir) 1441 rc, out, err = script_helper.assert_python_ok("-c", code) 1442 tmp_name = out.decode().strip() 1443 self.assertFalse(os.path.exists(tmp_name), 1444 "TemporaryDirectory %s exists after cleanup" % tmp_name) 1445 err = err.decode('utf-8', 'backslashreplace') 1446 self.assertNotIn("Exception ", err) 1447 self.assertIn("ResourceWarning: Implicitly cleaning up", err) 1448 1449 def test_warnings_on_cleanup(self): 1450 # ResourceWarning will be triggered by __del__ 1451 with self.do_create() as dir: 1452 d = self.do_create(dir=dir, recurse=3) 1453 name = d.name 1454 1455 # Check for the resource warning 1456 with support.check_warnings(('Implicitly', ResourceWarning), quiet=False): 1457 warnings.filterwarnings("always", category=ResourceWarning) 1458 del d 1459 support.gc_collect() 1460 self.assertFalse(os.path.exists(name), 1461 "TemporaryDirectory %s exists after __del__" % name) 1462 1463 def test_multiple_close(self): 1464 # Can be cleaned-up many times without error 1465 d = self.do_create() 1466 d.cleanup() 1467 d.cleanup() 1468 d.cleanup() 1469 1470 def test_context_manager(self): 1471 # Can be used as a context manager 1472 d = self.do_create() 1473 with d as name: 1474 self.assertTrue(os.path.exists(name)) 1475 self.assertEqual(name, d.name) 1476 self.assertFalse(os.path.exists(name)) 1477 1478 def test_modes(self): 1479 for mode in range(8): 1480 mode <<= 6 1481 with self.subTest(mode=format(mode, '03o')): 1482 d = self.do_create(recurse=3, dirs=2, files=2) 1483 with d: 1484 # Change files and directories mode recursively. 1485 for root, dirs, files in os.walk(d.name, topdown=False): 1486 for name in files: 1487 os.chmod(os.path.join(root, name), mode) 1488 os.chmod(root, mode) 1489 d.cleanup() 1490 self.assertFalse(os.path.exists(d.name)) 1491 1492 @unittest.skipUnless(hasattr(os, 'chflags'), 'requires os.lchflags') 1493 def test_flags(self): 1494 flags = stat.UF_IMMUTABLE | stat.UF_NOUNLINK 1495 d = self.do_create(recurse=3, dirs=2, files=2) 1496 with d: 1497 # Change files and directories flags recursively. 1498 for root, dirs, files in os.walk(d.name, topdown=False): 1499 for name in files: 1500 os.chflags(os.path.join(root, name), flags) 1501 os.chflags(root, flags) 1502 d.cleanup() 1503 self.assertFalse(os.path.exists(d.name)) 1504 1505 1506if __name__ == "__main__": 1507 unittest.main() 1508