1# tempfile.py unit tests. 2import tempfile 3import errno 4import io 5import os 6import pathlib 7import sys 8import re 9import warnings 10import contextlib 11import stat 12import types 13import weakref 14from unittest import mock 15 16import unittest 17from test import support 18from test.support import script_helper 19 20 21has_textmode = (tempfile._text_openflags != tempfile._bin_openflags) 22has_spawnl = hasattr(os, 'spawnl') 23 24# TEST_FILES may need to be tweaked for systems depending on the maximum 25# number of files that can be opened at one time (see ulimit -n) 26if sys.platform.startswith('openbsd'): 27 TEST_FILES = 48 28else: 29 TEST_FILES = 100 30 31# This is organized as one test for each chunk of code in tempfile.py, 32# in order of their appearance in the file. Testing which requires 33# threads is not done here. 34 35class TestLowLevelInternals(unittest.TestCase): 36 def test_infer_return_type_singles(self): 37 self.assertIs(str, tempfile._infer_return_type('')) 38 self.assertIs(bytes, tempfile._infer_return_type(b'')) 39 self.assertIs(str, tempfile._infer_return_type(None)) 40 41 def test_infer_return_type_multiples(self): 42 self.assertIs(str, tempfile._infer_return_type('', '')) 43 self.assertIs(bytes, tempfile._infer_return_type(b'', b'')) 44 with self.assertRaises(TypeError): 45 tempfile._infer_return_type('', b'') 46 with self.assertRaises(TypeError): 47 tempfile._infer_return_type(b'', '') 48 49 def test_infer_return_type_multiples_and_none(self): 50 self.assertIs(str, tempfile._infer_return_type(None, '')) 51 self.assertIs(str, tempfile._infer_return_type('', None)) 52 self.assertIs(str, tempfile._infer_return_type(None, None)) 53 self.assertIs(bytes, tempfile._infer_return_type(b'', None)) 54 self.assertIs(bytes, tempfile._infer_return_type(None, b'')) 55 with self.assertRaises(TypeError): 56 tempfile._infer_return_type('', None, b'') 57 with self.assertRaises(TypeError): 58 tempfile._infer_return_type(b'', None, '') 59 60 def test_infer_return_type_pathlib(self): 61 self.assertIs(str, tempfile._infer_return_type(pathlib.Path('/'))) 62 63 def test_infer_return_type_pathlike(self): 64 class Path: 65 def __init__(self, path): 66 self.path = path 67 68 def __fspath__(self): 69 return self.path 70 71 self.assertIs(str, tempfile._infer_return_type(Path('/'))) 72 self.assertIs(bytes, tempfile._infer_return_type(Path(b'/'))) 73 self.assertIs(str, tempfile._infer_return_type('', Path(''))) 74 self.assertIs(bytes, tempfile._infer_return_type(b'', Path(b''))) 75 self.assertIs(bytes, tempfile._infer_return_type(None, Path(b''))) 76 self.assertIs(str, tempfile._infer_return_type(None, Path(''))) 77 78 with self.assertRaises(TypeError): 79 tempfile._infer_return_type('', Path(b'')) 80 with self.assertRaises(TypeError): 81 tempfile._infer_return_type(b'', Path('')) 82 83# Common functionality. 84 85class BaseTestCase(unittest.TestCase): 86 87 str_check = re.compile(r"^[a-z0-9_-]{8}$") 88 b_check = re.compile(br"^[a-z0-9_-]{8}$") 89 90 def setUp(self): 91 self._warnings_manager = support.check_warnings() 92 self._warnings_manager.__enter__() 93 warnings.filterwarnings("ignore", category=RuntimeWarning, 94 message="mktemp", module=__name__) 95 96 def tearDown(self): 97 self._warnings_manager.__exit__(None, None, None) 98 99 def nameCheck(self, name, dir, pre, suf): 100 (ndir, nbase) = os.path.split(name) 101 npre = nbase[:len(pre)] 102 nsuf = nbase[len(nbase)-len(suf):] 103 104 if dir is not None: 105 self.assertIs( 106 type(name), 107 str 108 if type(dir) is str or isinstance(dir, os.PathLike) else 109 bytes, 110 "unexpected return type", 111 ) 112 if pre is not None: 113 self.assertIs(type(name), str if type(pre) is str else bytes, 114 "unexpected return type") 115 if suf is not None: 116 self.assertIs(type(name), str if type(suf) is str else bytes, 117 "unexpected return type") 118 if (dir, pre, suf) == (None, None, None): 119 self.assertIs(type(name), str, "default return type must be str") 120 121 # check for equality of the absolute paths! 122 self.assertEqual(os.path.abspath(ndir), os.path.abspath(dir), 123 "file %r not in directory %r" % (name, dir)) 124 self.assertEqual(npre, pre, 125 "file %r does not begin with %r" % (nbase, pre)) 126 self.assertEqual(nsuf, suf, 127 "file %r does not end with %r" % (nbase, suf)) 128 129 nbase = nbase[len(pre):len(nbase)-len(suf)] 130 check = self.str_check if isinstance(nbase, str) else self.b_check 131 self.assertTrue(check.match(nbase), 132 "random characters %r do not match %r" 133 % (nbase, check.pattern)) 134 135 136class TestExports(BaseTestCase): 137 def test_exports(self): 138 # There are no surprising symbols in the tempfile module 139 dict = tempfile.__dict__ 140 141 expected = { 142 "NamedTemporaryFile" : 1, 143 "TemporaryFile" : 1, 144 "mkstemp" : 1, 145 "mkdtemp" : 1, 146 "mktemp" : 1, 147 "TMP_MAX" : 1, 148 "gettempprefix" : 1, 149 "gettempprefixb" : 1, 150 "gettempdir" : 1, 151 "gettempdirb" : 1, 152 "tempdir" : 1, 153 "template" : 1, 154 "SpooledTemporaryFile" : 1, 155 "TemporaryDirectory" : 1, 156 } 157 158 unexp = [] 159 for key in dict: 160 if key[0] != '_' and key not in expected: 161 unexp.append(key) 162 self.assertTrue(len(unexp) == 0, 163 "unexpected keys: %s" % unexp) 164 165 166class TestRandomNameSequence(BaseTestCase): 167 """Test the internal iterator object _RandomNameSequence.""" 168 169 def setUp(self): 170 self.r = tempfile._RandomNameSequence() 171 super().setUp() 172 173 def test_get_six_char_str(self): 174 # _RandomNameSequence returns a six-character string 175 s = next(self.r) 176 self.nameCheck(s, '', '', '') 177 178 def test_many(self): 179 # _RandomNameSequence returns no duplicate strings (stochastic) 180 181 dict = {} 182 r = self.r 183 for i in range(TEST_FILES): 184 s = next(r) 185 self.nameCheck(s, '', '', '') 186 self.assertNotIn(s, dict) 187 dict[s] = 1 188 189 def supports_iter(self): 190 # _RandomNameSequence supports the iterator protocol 191 192 i = 0 193 r = self.r 194 for s in r: 195 i += 1 196 if i == 20: 197 break 198 199 @unittest.skipUnless(hasattr(os, 'fork'), 200 "os.fork is required for this test") 201 def test_process_awareness(self): 202 # ensure that the random source differs between 203 # child and parent. 204 read_fd, write_fd = os.pipe() 205 pid = None 206 try: 207 pid = os.fork() 208 if not pid: 209 # child process 210 os.close(read_fd) 211 os.write(write_fd, next(self.r).encode("ascii")) 212 os.close(write_fd) 213 # bypass the normal exit handlers- leave those to 214 # the parent. 215 os._exit(0) 216 217 # parent process 218 parent_value = next(self.r) 219 child_value = os.read(read_fd, len(parent_value)).decode("ascii") 220 finally: 221 if pid: 222 support.wait_process(pid, exitcode=0) 223 224 os.close(read_fd) 225 os.close(write_fd) 226 self.assertNotEqual(child_value, parent_value) 227 228 229 230class TestCandidateTempdirList(BaseTestCase): 231 """Test the internal function _candidate_tempdir_list.""" 232 233 def test_nonempty_list(self): 234 # _candidate_tempdir_list returns a nonempty list of strings 235 236 cand = tempfile._candidate_tempdir_list() 237 238 self.assertFalse(len(cand) == 0) 239 for c in cand: 240 self.assertIsInstance(c, str) 241 242 def test_wanted_dirs(self): 243 # _candidate_tempdir_list contains the expected directories 244 245 # Make sure the interesting environment variables are all set. 246 with support.EnvironmentVarGuard() as env: 247 for envname in 'TMPDIR', 'TEMP', 'TMP': 248 dirname = os.getenv(envname) 249 if not dirname: 250 env[envname] = os.path.abspath(envname) 251 252 cand = tempfile._candidate_tempdir_list() 253 254 for envname in 'TMPDIR', 'TEMP', 'TMP': 255 dirname = os.getenv(envname) 256 if not dirname: raise ValueError 257 self.assertIn(dirname, cand) 258 259 try: 260 dirname = os.getcwd() 261 except (AttributeError, OSError): 262 dirname = os.curdir 263 264 self.assertIn(dirname, cand) 265 266 # Not practical to try to verify the presence of OS-specific 267 # paths in this list. 268 269 270# We test _get_default_tempdir some more by testing gettempdir. 271 272class TestGetDefaultTempdir(BaseTestCase): 273 """Test _get_default_tempdir().""" 274 275 def test_no_files_left_behind(self): 276 # use a private empty directory 277 with tempfile.TemporaryDirectory() as our_temp_directory: 278 # force _get_default_tempdir() to consider our empty directory 279 def our_candidate_list(): 280 return [our_temp_directory] 281 282 with support.swap_attr(tempfile, "_candidate_tempdir_list", 283 our_candidate_list): 284 # verify our directory is empty after _get_default_tempdir() 285 tempfile._get_default_tempdir() 286 self.assertEqual(os.listdir(our_temp_directory), []) 287 288 def raise_OSError(*args, **kwargs): 289 raise OSError() 290 291 with support.swap_attr(io, "open", raise_OSError): 292 # test again with failing io.open() 293 with self.assertRaises(FileNotFoundError): 294 tempfile._get_default_tempdir() 295 self.assertEqual(os.listdir(our_temp_directory), []) 296 297 def bad_writer(*args, **kwargs): 298 fp = orig_open(*args, **kwargs) 299 fp.write = raise_OSError 300 return fp 301 302 with support.swap_attr(io, "open", bad_writer) as orig_open: 303 # test again with failing write() 304 with self.assertRaises(FileNotFoundError): 305 tempfile._get_default_tempdir() 306 self.assertEqual(os.listdir(our_temp_directory), []) 307 308 309class TestGetCandidateNames(BaseTestCase): 310 """Test the internal function _get_candidate_names.""" 311 312 def test_retval(self): 313 # _get_candidate_names returns a _RandomNameSequence object 314 obj = tempfile._get_candidate_names() 315 self.assertIsInstance(obj, tempfile._RandomNameSequence) 316 317 def test_same_thing(self): 318 # _get_candidate_names always returns the same object 319 a = tempfile._get_candidate_names() 320 b = tempfile._get_candidate_names() 321 322 self.assertTrue(a is b) 323 324 325@contextlib.contextmanager 326def _inside_empty_temp_dir(): 327 dir = tempfile.mkdtemp() 328 try: 329 with support.swap_attr(tempfile, 'tempdir', dir): 330 yield 331 finally: 332 support.rmtree(dir) 333 334 335def _mock_candidate_names(*names): 336 return support.swap_attr(tempfile, 337 '_get_candidate_names', 338 lambda: iter(names)) 339 340 341class TestBadTempdir: 342 343 def test_read_only_directory(self): 344 with _inside_empty_temp_dir(): 345 oldmode = mode = os.stat(tempfile.tempdir).st_mode 346 mode &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH) 347 os.chmod(tempfile.tempdir, mode) 348 try: 349 if os.access(tempfile.tempdir, os.W_OK): 350 self.skipTest("can't set the directory read-only") 351 with self.assertRaises(PermissionError): 352 self.make_temp() 353 self.assertEqual(os.listdir(tempfile.tempdir), []) 354 finally: 355 os.chmod(tempfile.tempdir, oldmode) 356 357 def test_nonexisting_directory(self): 358 with _inside_empty_temp_dir(): 359 tempdir = os.path.join(tempfile.tempdir, 'nonexistent') 360 with support.swap_attr(tempfile, 'tempdir', tempdir): 361 with self.assertRaises(FileNotFoundError): 362 self.make_temp() 363 364 def test_non_directory(self): 365 with _inside_empty_temp_dir(): 366 tempdir = os.path.join(tempfile.tempdir, 'file') 367 open(tempdir, 'wb').close() 368 with support.swap_attr(tempfile, 'tempdir', tempdir): 369 with self.assertRaises((NotADirectoryError, FileNotFoundError)): 370 self.make_temp() 371 372 373class TestMkstempInner(TestBadTempdir, BaseTestCase): 374 """Test the internal function _mkstemp_inner.""" 375 376 class mkstemped: 377 _bflags = tempfile._bin_openflags 378 _tflags = tempfile._text_openflags 379 _close = os.close 380 _unlink = os.unlink 381 382 def __init__(self, dir, pre, suf, bin): 383 if bin: flags = self._bflags 384 else: flags = self._tflags 385 386 output_type = tempfile._infer_return_type(dir, pre, suf) 387 (self.fd, self.name) = tempfile._mkstemp_inner(dir, pre, suf, flags, output_type) 388 389 def write(self, str): 390 os.write(self.fd, str) 391 392 def __del__(self): 393 self._close(self.fd) 394 self._unlink(self.name) 395 396 def do_create(self, dir=None, pre=None, suf=None, bin=1): 397 output_type = tempfile._infer_return_type(dir, pre, suf) 398 if dir is None: 399 if output_type is str: 400 dir = tempfile.gettempdir() 401 else: 402 dir = tempfile.gettempdirb() 403 if pre is None: 404 pre = output_type() 405 if suf is None: 406 suf = output_type() 407 file = self.mkstemped(dir, pre, suf, bin) 408 409 self.nameCheck(file.name, dir, pre, suf) 410 return file 411 412 def test_basic(self): 413 # _mkstemp_inner can create files 414 self.do_create().write(b"blat") 415 self.do_create(pre="a").write(b"blat") 416 self.do_create(suf="b").write(b"blat") 417 self.do_create(pre="a", suf="b").write(b"blat") 418 self.do_create(pre="aa", suf=".txt").write(b"blat") 419 420 def test_basic_with_bytes_names(self): 421 # _mkstemp_inner can create files when given name parts all 422 # specified as bytes. 423 dir_b = tempfile.gettempdirb() 424 self.do_create(dir=dir_b, suf=b"").write(b"blat") 425 self.do_create(dir=dir_b, pre=b"a").write(b"blat") 426 self.do_create(dir=dir_b, suf=b"b").write(b"blat") 427 self.do_create(dir=dir_b, pre=b"a", suf=b"b").write(b"blat") 428 self.do_create(dir=dir_b, pre=b"aa", suf=b".txt").write(b"blat") 429 # Can't mix str & binary types in the args. 430 with self.assertRaises(TypeError): 431 self.do_create(dir="", suf=b"").write(b"blat") 432 with self.assertRaises(TypeError): 433 self.do_create(dir=dir_b, pre="").write(b"blat") 434 with self.assertRaises(TypeError): 435 self.do_create(dir=dir_b, pre=b"", suf="").write(b"blat") 436 437 def test_basic_many(self): 438 # _mkstemp_inner can create many files (stochastic) 439 extant = list(range(TEST_FILES)) 440 for i in extant: 441 extant[i] = self.do_create(pre="aa") 442 443 def test_choose_directory(self): 444 # _mkstemp_inner can create files in a user-selected directory 445 dir = tempfile.mkdtemp() 446 try: 447 self.do_create(dir=dir).write(b"blat") 448 self.do_create(dir=pathlib.Path(dir)).write(b"blat") 449 finally: 450 support.gc_collect() # For PyPy or other GCs. 451 os.rmdir(dir) 452 453 def test_file_mode(self): 454 # _mkstemp_inner creates files with the proper mode 455 456 file = self.do_create() 457 mode = stat.S_IMODE(os.stat(file.name).st_mode) 458 expected = 0o600 459 if sys.platform == 'win32': 460 # There's no distinction among 'user', 'group' and 'world'; 461 # replicate the 'user' bits. 462 user = expected >> 6 463 expected = user * (1 + 8 + 64) 464 self.assertEqual(mode, expected) 465 466 @unittest.skipUnless(has_spawnl, 'os.spawnl not available') 467 def test_noinherit(self): 468 # _mkstemp_inner file handles are not inherited by child processes 469 470 if support.verbose: 471 v="v" 472 else: 473 v="q" 474 475 file = self.do_create() 476 self.assertEqual(os.get_inheritable(file.fd), False) 477 fd = "%d" % file.fd 478 479 try: 480 me = __file__ 481 except NameError: 482 me = sys.argv[0] 483 484 # We have to exec something, so that FD_CLOEXEC will take 485 # effect. The core of this test is therefore in 486 # tf_inherit_check.py, which see. 487 tester = os.path.join(os.path.dirname(os.path.abspath(me)), 488 "tf_inherit_check.py") 489 490 # On Windows a spawn* /path/ with embedded spaces shouldn't be quoted, 491 # but an arg with embedded spaces should be decorated with double 492 # quotes on each end 493 if sys.platform == 'win32': 494 decorated = '"%s"' % sys.executable 495 tester = '"%s"' % tester 496 else: 497 decorated = sys.executable 498 499 retval = os.spawnl(os.P_WAIT, sys.executable, decorated, tester, v, fd) 500 self.assertFalse(retval < 0, 501 "child process caught fatal signal %d" % -retval) 502 self.assertFalse(retval > 0, "child process reports failure %d"%retval) 503 504 @unittest.skipUnless(has_textmode, "text mode not available") 505 def test_textmode(self): 506 # _mkstemp_inner can create files in text mode 507 508 # A text file is truncated at the first Ctrl+Z byte 509 f = self.do_create(bin=0) 510 f.write(b"blat\x1a") 511 f.write(b"extra\n") 512 os.lseek(f.fd, 0, os.SEEK_SET) 513 self.assertEqual(os.read(f.fd, 20), b"blat") 514 515 def make_temp(self): 516 return tempfile._mkstemp_inner(tempfile.gettempdir(), 517 tempfile.gettempprefix(), 518 '', 519 tempfile._bin_openflags, 520 str) 521 522 def test_collision_with_existing_file(self): 523 # _mkstemp_inner tries another name when a file with 524 # the chosen name already exists 525 with _inside_empty_temp_dir(), \ 526 _mock_candidate_names('aaa', 'aaa', 'bbb'): 527 (fd1, name1) = self.make_temp() 528 os.close(fd1) 529 self.assertTrue(name1.endswith('aaa')) 530 531 (fd2, name2) = self.make_temp() 532 os.close(fd2) 533 self.assertTrue(name2.endswith('bbb')) 534 535 def test_collision_with_existing_directory(self): 536 # _mkstemp_inner tries another name when a directory with 537 # the chosen name already exists 538 with _inside_empty_temp_dir(), \ 539 _mock_candidate_names('aaa', 'aaa', 'bbb'): 540 dir = tempfile.mkdtemp() 541 self.assertTrue(dir.endswith('aaa')) 542 543 (fd, name) = self.make_temp() 544 os.close(fd) 545 self.assertTrue(name.endswith('bbb')) 546 547 548class TestGetTempPrefix(BaseTestCase): 549 """Test gettempprefix().""" 550 551 def test_sane_template(self): 552 # gettempprefix returns a nonempty prefix string 553 p = tempfile.gettempprefix() 554 555 self.assertIsInstance(p, str) 556 self.assertGreater(len(p), 0) 557 558 pb = tempfile.gettempprefixb() 559 560 self.assertIsInstance(pb, bytes) 561 self.assertGreater(len(pb), 0) 562 563 def test_usable_template(self): 564 # gettempprefix returns a usable prefix string 565 566 # Create a temp directory, avoiding use of the prefix. 567 # Then attempt to create a file whose name is 568 # prefix + 'xxxxxx.xxx' in that directory. 569 p = tempfile.gettempprefix() + "xxxxxx.xxx" 570 d = tempfile.mkdtemp(prefix="") 571 try: 572 p = os.path.join(d, p) 573 fd = os.open(p, os.O_RDWR | os.O_CREAT) 574 os.close(fd) 575 os.unlink(p) 576 finally: 577 os.rmdir(d) 578 579 580class TestGetTempDir(BaseTestCase): 581 """Test gettempdir().""" 582 583 def test_directory_exists(self): 584 # gettempdir returns a directory which exists 585 586 for d in (tempfile.gettempdir(), tempfile.gettempdirb()): 587 self.assertTrue(os.path.isabs(d) or d == os.curdir, 588 "%r is not an absolute path" % d) 589 self.assertTrue(os.path.isdir(d), 590 "%r is not a directory" % d) 591 592 def test_directory_writable(self): 593 # gettempdir returns a directory writable by the user 594 595 # sneaky: just instantiate a NamedTemporaryFile, which 596 # defaults to writing into the directory returned by 597 # gettempdir. 598 with tempfile.NamedTemporaryFile() as file: 599 file.write(b"blat") 600 601 def test_same_thing(self): 602 # gettempdir always returns the same object 603 a = tempfile.gettempdir() 604 b = tempfile.gettempdir() 605 c = tempfile.gettempdirb() 606 607 self.assertTrue(a is b) 608 self.assertNotEqual(type(a), type(c)) 609 self.assertEqual(a, os.fsdecode(c)) 610 611 def test_case_sensitive(self): 612 # gettempdir should not flatten its case 613 # even on a case-insensitive file system 614 case_sensitive_tempdir = tempfile.mkdtemp("-Temp") 615 _tempdir, tempfile.tempdir = tempfile.tempdir, None 616 try: 617 with support.EnvironmentVarGuard() as env: 618 # Fake the first env var which is checked as a candidate 619 env["TMPDIR"] = case_sensitive_tempdir 620 self.assertEqual(tempfile.gettempdir(), case_sensitive_tempdir) 621 finally: 622 tempfile.tempdir = _tempdir 623 support.rmdir(case_sensitive_tempdir) 624 625 626class TestMkstemp(BaseTestCase): 627 """Test mkstemp().""" 628 629 def do_create(self, dir=None, pre=None, suf=None): 630 output_type = tempfile._infer_return_type(dir, pre, suf) 631 if dir is None: 632 if output_type is str: 633 dir = tempfile.gettempdir() 634 else: 635 dir = tempfile.gettempdirb() 636 if pre is None: 637 pre = output_type() 638 if suf is None: 639 suf = output_type() 640 (fd, name) = tempfile.mkstemp(dir=dir, prefix=pre, suffix=suf) 641 (ndir, nbase) = os.path.split(name) 642 adir = os.path.abspath(dir) 643 self.assertEqual(adir, ndir, 644 "Directory '%s' incorrectly returned as '%s'" % (adir, ndir)) 645 646 try: 647 self.nameCheck(name, dir, pre, suf) 648 finally: 649 os.close(fd) 650 os.unlink(name) 651 652 def test_basic(self): 653 # mkstemp can create files 654 self.do_create() 655 self.do_create(pre="a") 656 self.do_create(suf="b") 657 self.do_create(pre="a", suf="b") 658 self.do_create(pre="aa", suf=".txt") 659 self.do_create(dir=".") 660 661 def test_basic_with_bytes_names(self): 662 # mkstemp can create files when given name parts all 663 # specified as bytes. 664 d = tempfile.gettempdirb() 665 self.do_create(dir=d, suf=b"") 666 self.do_create(dir=d, pre=b"a") 667 self.do_create(dir=d, suf=b"b") 668 self.do_create(dir=d, pre=b"a", suf=b"b") 669 self.do_create(dir=d, pre=b"aa", suf=b".txt") 670 self.do_create(dir=b".") 671 with self.assertRaises(TypeError): 672 self.do_create(dir=".", pre=b"aa", suf=b".txt") 673 with self.assertRaises(TypeError): 674 self.do_create(dir=b".", pre="aa", suf=b".txt") 675 with self.assertRaises(TypeError): 676 self.do_create(dir=b".", pre=b"aa", suf=".txt") 677 678 679 def test_choose_directory(self): 680 # mkstemp can create directories in a user-selected directory 681 dir = tempfile.mkdtemp() 682 try: 683 self.do_create(dir=dir) 684 self.do_create(dir=pathlib.Path(dir)) 685 finally: 686 os.rmdir(dir) 687 688 689class TestMkdtemp(TestBadTempdir, BaseTestCase): 690 """Test mkdtemp().""" 691 692 def make_temp(self): 693 return tempfile.mkdtemp() 694 695 def do_create(self, dir=None, pre=None, suf=None): 696 output_type = tempfile._infer_return_type(dir, pre, suf) 697 if dir is None: 698 if output_type is str: 699 dir = tempfile.gettempdir() 700 else: 701 dir = tempfile.gettempdirb() 702 if pre is None: 703 pre = output_type() 704 if suf is None: 705 suf = output_type() 706 name = tempfile.mkdtemp(dir=dir, prefix=pre, suffix=suf) 707 708 try: 709 self.nameCheck(name, dir, pre, suf) 710 return name 711 except: 712 os.rmdir(name) 713 raise 714 715 def test_basic(self): 716 # mkdtemp can create directories 717 os.rmdir(self.do_create()) 718 os.rmdir(self.do_create(pre="a")) 719 os.rmdir(self.do_create(suf="b")) 720 os.rmdir(self.do_create(pre="a", suf="b")) 721 os.rmdir(self.do_create(pre="aa", suf=".txt")) 722 723 def test_basic_with_bytes_names(self): 724 # mkdtemp can create directories when given all binary parts 725 d = tempfile.gettempdirb() 726 os.rmdir(self.do_create(dir=d)) 727 os.rmdir(self.do_create(dir=d, pre=b"a")) 728 os.rmdir(self.do_create(dir=d, suf=b"b")) 729 os.rmdir(self.do_create(dir=d, pre=b"a", suf=b"b")) 730 os.rmdir(self.do_create(dir=d, pre=b"aa", suf=b".txt")) 731 with self.assertRaises(TypeError): 732 os.rmdir(self.do_create(dir=d, pre="aa", suf=b".txt")) 733 with self.assertRaises(TypeError): 734 os.rmdir(self.do_create(dir=d, pre=b"aa", suf=".txt")) 735 with self.assertRaises(TypeError): 736 os.rmdir(self.do_create(dir="", pre=b"aa", suf=b".txt")) 737 738 def test_basic_many(self): 739 # mkdtemp can create many directories (stochastic) 740 extant = list(range(TEST_FILES)) 741 try: 742 for i in extant: 743 extant[i] = self.do_create(pre="aa") 744 finally: 745 for i in extant: 746 if(isinstance(i, str)): 747 os.rmdir(i) 748 749 def test_choose_directory(self): 750 # mkdtemp can create directories in a user-selected directory 751 dir = tempfile.mkdtemp() 752 try: 753 os.rmdir(self.do_create(dir=dir)) 754 os.rmdir(self.do_create(dir=pathlib.Path(dir))) 755 finally: 756 os.rmdir(dir) 757 758 def test_mode(self): 759 # mkdtemp creates directories with the proper mode 760 761 dir = self.do_create() 762 try: 763 mode = stat.S_IMODE(os.stat(dir).st_mode) 764 mode &= 0o777 # Mask off sticky bits inherited from /tmp 765 expected = 0o700 766 if sys.platform == 'win32': 767 # There's no distinction among 'user', 'group' and 'world'; 768 # replicate the 'user' bits. 769 user = expected >> 6 770 expected = user * (1 + 8 + 64) 771 self.assertEqual(mode, expected) 772 finally: 773 os.rmdir(dir) 774 775 def test_collision_with_existing_file(self): 776 # mkdtemp tries another name when a file with 777 # the chosen name already exists 778 with _inside_empty_temp_dir(), \ 779 _mock_candidate_names('aaa', 'aaa', 'bbb'): 780 file = tempfile.NamedTemporaryFile(delete=False) 781 file.close() 782 self.assertTrue(file.name.endswith('aaa')) 783 dir = tempfile.mkdtemp() 784 self.assertTrue(dir.endswith('bbb')) 785 786 def test_collision_with_existing_directory(self): 787 # mkdtemp tries another name when a directory with 788 # the chosen name already exists 789 with _inside_empty_temp_dir(), \ 790 _mock_candidate_names('aaa', 'aaa', 'bbb'): 791 dir1 = tempfile.mkdtemp() 792 self.assertTrue(dir1.endswith('aaa')) 793 dir2 = tempfile.mkdtemp() 794 self.assertTrue(dir2.endswith('bbb')) 795 796 797class TestMktemp(BaseTestCase): 798 """Test mktemp().""" 799 800 # For safety, all use of mktemp must occur in a private directory. 801 # We must also suppress the RuntimeWarning it generates. 802 def setUp(self): 803 self.dir = tempfile.mkdtemp() 804 super().setUp() 805 806 def tearDown(self): 807 if self.dir: 808 os.rmdir(self.dir) 809 self.dir = None 810 super().tearDown() 811 812 class mktemped: 813 _unlink = os.unlink 814 _bflags = tempfile._bin_openflags 815 816 def __init__(self, dir, pre, suf): 817 self.name = tempfile.mktemp(dir=dir, prefix=pre, suffix=suf) 818 # Create the file. This will raise an exception if it's 819 # mysteriously appeared in the meanwhile. 820 os.close(os.open(self.name, self._bflags, 0o600)) 821 822 def __del__(self): 823 self._unlink(self.name) 824 825 def do_create(self, pre="", suf=""): 826 file = self.mktemped(self.dir, pre, suf) 827 828 self.nameCheck(file.name, self.dir, pre, suf) 829 return file 830 831 def test_basic(self): 832 # mktemp can choose usable file names 833 self.do_create() 834 self.do_create(pre="a") 835 self.do_create(suf="b") 836 self.do_create(pre="a", suf="b") 837 self.do_create(pre="aa", suf=".txt") 838 839 def test_many(self): 840 # mktemp can choose many usable file names (stochastic) 841 extant = list(range(TEST_FILES)) 842 for i in extant: 843 extant[i] = self.do_create(pre="aa") 844 del extant 845 support.gc_collect() # For PyPy or other GCs. 846 847## def test_warning(self): 848## # mktemp issues a warning when used 849## warnings.filterwarnings("error", 850## category=RuntimeWarning, 851## message="mktemp") 852## self.assertRaises(RuntimeWarning, 853## tempfile.mktemp, dir=self.dir) 854 855 856# We test _TemporaryFileWrapper by testing NamedTemporaryFile. 857 858 859class TestNamedTemporaryFile(BaseTestCase): 860 """Test NamedTemporaryFile().""" 861 862 def do_create(self, dir=None, pre="", suf="", delete=True): 863 if dir is None: 864 dir = tempfile.gettempdir() 865 file = tempfile.NamedTemporaryFile(dir=dir, prefix=pre, suffix=suf, 866 delete=delete) 867 868 self.nameCheck(file.name, dir, pre, suf) 869 return file 870 871 872 def test_basic(self): 873 # NamedTemporaryFile can create files 874 self.do_create() 875 self.do_create(pre="a") 876 self.do_create(suf="b") 877 self.do_create(pre="a", suf="b") 878 self.do_create(pre="aa", suf=".txt") 879 880 def test_method_lookup(self): 881 # Issue #18879: Looking up a temporary file method should keep it 882 # alive long enough. 883 f = self.do_create() 884 wr = weakref.ref(f) 885 write = f.write 886 write2 = f.write 887 del f 888 write(b'foo') 889 del write 890 write2(b'bar') 891 del write2 892 if support.check_impl_detail(cpython=True): 893 # No reference cycle was created. 894 self.assertIsNone(wr()) 895 896 def test_iter(self): 897 # Issue #23700: getting iterator from a temporary file should keep 898 # it alive as long as it's being iterated over 899 lines = [b'spam\n', b'eggs\n', b'beans\n'] 900 def make_file(): 901 f = tempfile.NamedTemporaryFile(mode='w+b') 902 f.write(b''.join(lines)) 903 f.seek(0) 904 return f 905 for i, l in enumerate(make_file()): 906 self.assertEqual(l, lines[i]) 907 self.assertEqual(i, len(lines) - 1) 908 909 def test_creates_named(self): 910 # NamedTemporaryFile creates files with names 911 f = tempfile.NamedTemporaryFile() 912 self.assertTrue(os.path.exists(f.name), 913 "NamedTemporaryFile %s does not exist" % f.name) 914 915 def test_del_on_close(self): 916 # A NamedTemporaryFile is deleted when closed 917 dir = tempfile.mkdtemp() 918 try: 919 with tempfile.NamedTemporaryFile(dir=dir) as f: 920 f.write(b'blat') 921 self.assertFalse(os.path.exists(f.name), 922 "NamedTemporaryFile %s exists after close" % f.name) 923 finally: 924 os.rmdir(dir) 925 926 def test_dis_del_on_close(self): 927 # Tests that delete-on-close can be disabled 928 dir = tempfile.mkdtemp() 929 tmp = None 930 try: 931 f = tempfile.NamedTemporaryFile(dir=dir, delete=False) 932 tmp = f.name 933 f.write(b'blat') 934 f.close() 935 self.assertTrue(os.path.exists(f.name), 936 "NamedTemporaryFile %s missing after close" % f.name) 937 finally: 938 if tmp is not None: 939 os.unlink(tmp) 940 os.rmdir(dir) 941 942 def test_multiple_close(self): 943 # A NamedTemporaryFile can be closed many times without error 944 f = tempfile.NamedTemporaryFile() 945 f.write(b'abc\n') 946 f.close() 947 f.close() 948 f.close() 949 950 def test_context_manager(self): 951 # A NamedTemporaryFile can be used as a context manager 952 with tempfile.NamedTemporaryFile() as f: 953 self.assertTrue(os.path.exists(f.name)) 954 self.assertFalse(os.path.exists(f.name)) 955 def use_closed(): 956 with f: 957 pass 958 self.assertRaises(ValueError, use_closed) 959 960 def test_no_leak_fd(self): 961 # Issue #21058: don't leak file descriptor when io.open() fails 962 closed = [] 963 os_close = os.close 964 def close(fd): 965 closed.append(fd) 966 os_close(fd) 967 968 with mock.patch('os.close', side_effect=close): 969 with mock.patch('io.open', side_effect=ValueError): 970 self.assertRaises(ValueError, tempfile.NamedTemporaryFile) 971 self.assertEqual(len(closed), 1) 972 973 def test_bad_mode(self): 974 dir = tempfile.mkdtemp() 975 self.addCleanup(support.rmtree, dir) 976 with self.assertRaises(ValueError): 977 tempfile.NamedTemporaryFile(mode='wr', dir=dir) 978 with self.assertRaises(TypeError): 979 tempfile.NamedTemporaryFile(mode=2, dir=dir) 980 self.assertEqual(os.listdir(dir), []) 981 982 # How to test the mode and bufsize parameters? 983 984class TestSpooledTemporaryFile(BaseTestCase): 985 """Test SpooledTemporaryFile().""" 986 987 def do_create(self, max_size=0, dir=None, pre="", suf=""): 988 if dir is None: 989 dir = tempfile.gettempdir() 990 file = tempfile.SpooledTemporaryFile(max_size=max_size, dir=dir, prefix=pre, suffix=suf) 991 992 return file 993 994 995 def test_basic(self): 996 # SpooledTemporaryFile can create files 997 f = self.do_create() 998 self.assertFalse(f._rolled) 999 f = self.do_create(max_size=100, pre="a", suf=".txt") 1000 self.assertFalse(f._rolled) 1001 1002 def test_del_on_close(self): 1003 # A SpooledTemporaryFile is deleted when closed 1004 dir = tempfile.mkdtemp() 1005 try: 1006 f = tempfile.SpooledTemporaryFile(max_size=10, dir=dir) 1007 self.assertFalse(f._rolled) 1008 f.write(b'blat ' * 5) 1009 self.assertTrue(f._rolled) 1010 filename = f.name 1011 f.close() 1012 self.assertFalse(isinstance(filename, str) and os.path.exists(filename), 1013 "SpooledTemporaryFile %s exists after close" % filename) 1014 finally: 1015 os.rmdir(dir) 1016 1017 def test_rewrite_small(self): 1018 # A SpooledTemporaryFile can be written to multiple within the max_size 1019 f = self.do_create(max_size=30) 1020 self.assertFalse(f._rolled) 1021 for i in range(5): 1022 f.seek(0, 0) 1023 f.write(b'x' * 20) 1024 self.assertFalse(f._rolled) 1025 1026 def test_write_sequential(self): 1027 # A SpooledTemporaryFile should hold exactly max_size bytes, and roll 1028 # over afterward 1029 f = self.do_create(max_size=30) 1030 self.assertFalse(f._rolled) 1031 f.write(b'x' * 20) 1032 self.assertFalse(f._rolled) 1033 f.write(b'x' * 10) 1034 self.assertFalse(f._rolled) 1035 f.write(b'x') 1036 self.assertTrue(f._rolled) 1037 1038 def test_writelines(self): 1039 # Verify writelines with a SpooledTemporaryFile 1040 f = self.do_create() 1041 f.writelines((b'x', b'y', b'z')) 1042 pos = f.seek(0) 1043 self.assertEqual(pos, 0) 1044 buf = f.read() 1045 self.assertEqual(buf, b'xyz') 1046 1047 def test_writelines_sequential(self): 1048 # A SpooledTemporaryFile should hold exactly max_size bytes, and roll 1049 # over afterward 1050 f = self.do_create(max_size=35) 1051 f.writelines((b'x' * 20, b'x' * 10, b'x' * 5)) 1052 self.assertFalse(f._rolled) 1053 f.write(b'x') 1054 self.assertTrue(f._rolled) 1055 1056 def test_sparse(self): 1057 # A SpooledTemporaryFile that is written late in the file will extend 1058 # when that occurs 1059 f = self.do_create(max_size=30) 1060 self.assertFalse(f._rolled) 1061 pos = f.seek(100, 0) 1062 self.assertEqual(pos, 100) 1063 self.assertFalse(f._rolled) 1064 f.write(b'x') 1065 self.assertTrue(f._rolled) 1066 1067 def test_fileno(self): 1068 # A SpooledTemporaryFile should roll over to a real file on fileno() 1069 f = self.do_create(max_size=30) 1070 self.assertFalse(f._rolled) 1071 self.assertTrue(f.fileno() > 0) 1072 self.assertTrue(f._rolled) 1073 1074 def test_multiple_close_before_rollover(self): 1075 # A SpooledTemporaryFile can be closed many times without error 1076 f = tempfile.SpooledTemporaryFile() 1077 f.write(b'abc\n') 1078 self.assertFalse(f._rolled) 1079 f.close() 1080 f.close() 1081 f.close() 1082 1083 def test_multiple_close_after_rollover(self): 1084 # A SpooledTemporaryFile can be closed many times without error 1085 f = tempfile.SpooledTemporaryFile(max_size=1) 1086 f.write(b'abc\n') 1087 self.assertTrue(f._rolled) 1088 f.close() 1089 f.close() 1090 f.close() 1091 1092 def test_bound_methods(self): 1093 # It should be OK to steal a bound method from a SpooledTemporaryFile 1094 # and use it independently; when the file rolls over, those bound 1095 # methods should continue to function 1096 f = self.do_create(max_size=30) 1097 read = f.read 1098 write = f.write 1099 seek = f.seek 1100 1101 write(b"a" * 35) 1102 write(b"b" * 35) 1103 seek(0, 0) 1104 self.assertEqual(read(70), b'a'*35 + b'b'*35) 1105 1106 def test_properties(self): 1107 f = tempfile.SpooledTemporaryFile(max_size=10) 1108 f.write(b'x' * 10) 1109 self.assertFalse(f._rolled) 1110 self.assertEqual(f.mode, 'w+b') 1111 self.assertIsNone(f.name) 1112 with self.assertRaises(AttributeError): 1113 f.newlines 1114 with self.assertRaises(AttributeError): 1115 f.encoding 1116 with self.assertRaises(AttributeError): 1117 f.errors 1118 1119 f.write(b'x') 1120 self.assertTrue(f._rolled) 1121 self.assertEqual(f.mode, 'rb+') 1122 self.assertIsNotNone(f.name) 1123 with self.assertRaises(AttributeError): 1124 f.newlines 1125 with self.assertRaises(AttributeError): 1126 f.encoding 1127 with self.assertRaises(AttributeError): 1128 f.errors 1129 1130 def test_text_mode(self): 1131 # Creating a SpooledTemporaryFile with a text mode should produce 1132 # a file object reading and writing (Unicode) text strings. 1133 f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10, 1134 encoding="utf-8") 1135 f.write("abc\n") 1136 f.seek(0) 1137 self.assertEqual(f.read(), "abc\n") 1138 f.write("def\n") 1139 f.seek(0) 1140 self.assertEqual(f.read(), "abc\ndef\n") 1141 self.assertFalse(f._rolled) 1142 self.assertEqual(f.mode, 'w+') 1143 self.assertIsNone(f.name) 1144 self.assertEqual(f.newlines, os.linesep) 1145 self.assertEqual(f.encoding, "utf-8") 1146 self.assertEqual(f.errors, "strict") 1147 1148 f.write("xyzzy\n") 1149 f.seek(0) 1150 self.assertEqual(f.read(), "abc\ndef\nxyzzy\n") 1151 # Check that Ctrl+Z doesn't truncate the file 1152 f.write("foo\x1abar\n") 1153 f.seek(0) 1154 self.assertEqual(f.read(), "abc\ndef\nxyzzy\nfoo\x1abar\n") 1155 self.assertTrue(f._rolled) 1156 self.assertEqual(f.mode, 'w+') 1157 self.assertIsNotNone(f.name) 1158 self.assertEqual(f.newlines, os.linesep) 1159 self.assertEqual(f.encoding, "utf-8") 1160 self.assertEqual(f.errors, "strict") 1161 1162 def test_text_newline_and_encoding(self): 1163 f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10, 1164 newline='', encoding='utf-8', 1165 errors='ignore') 1166 f.write("\u039B\r\n") 1167 f.seek(0) 1168 self.assertEqual(f.read(), "\u039B\r\n") 1169 self.assertFalse(f._rolled) 1170 self.assertEqual(f.mode, 'w+') 1171 self.assertIsNone(f.name) 1172 self.assertIsNotNone(f.newlines) 1173 self.assertEqual(f.encoding, "utf-8") 1174 self.assertEqual(f.errors, "ignore") 1175 1176 f.write("\u039C" * 10 + "\r\n") 1177 f.write("\u039D" * 20) 1178 f.seek(0) 1179 self.assertEqual(f.read(), 1180 "\u039B\r\n" + ("\u039C" * 10) + "\r\n" + ("\u039D" * 20)) 1181 self.assertTrue(f._rolled) 1182 self.assertEqual(f.mode, 'w+') 1183 self.assertIsNotNone(f.name) 1184 self.assertIsNotNone(f.newlines) 1185 self.assertEqual(f.encoding, 'utf-8') 1186 self.assertEqual(f.errors, 'ignore') 1187 1188 def test_context_manager_before_rollover(self): 1189 # A SpooledTemporaryFile can be used as a context manager 1190 with tempfile.SpooledTemporaryFile(max_size=1) as f: 1191 self.assertFalse(f._rolled) 1192 self.assertFalse(f.closed) 1193 self.assertTrue(f.closed) 1194 def use_closed(): 1195 with f: 1196 pass 1197 self.assertRaises(ValueError, use_closed) 1198 1199 def test_context_manager_during_rollover(self): 1200 # A SpooledTemporaryFile can be used as a context manager 1201 with tempfile.SpooledTemporaryFile(max_size=1) as f: 1202 self.assertFalse(f._rolled) 1203 f.write(b'abc\n') 1204 f.flush() 1205 self.assertTrue(f._rolled) 1206 self.assertFalse(f.closed) 1207 self.assertTrue(f.closed) 1208 def use_closed(): 1209 with f: 1210 pass 1211 self.assertRaises(ValueError, use_closed) 1212 1213 def test_context_manager_after_rollover(self): 1214 # A SpooledTemporaryFile can be used as a context manager 1215 f = tempfile.SpooledTemporaryFile(max_size=1) 1216 f.write(b'abc\n') 1217 f.flush() 1218 self.assertTrue(f._rolled) 1219 with f: 1220 self.assertFalse(f.closed) 1221 self.assertTrue(f.closed) 1222 def use_closed(): 1223 with f: 1224 pass 1225 self.assertRaises(ValueError, use_closed) 1226 1227 def test_truncate_with_size_parameter(self): 1228 # A SpooledTemporaryFile can be truncated to zero size 1229 f = tempfile.SpooledTemporaryFile(max_size=10) 1230 f.write(b'abcdefg\n') 1231 f.seek(0) 1232 f.truncate() 1233 self.assertFalse(f._rolled) 1234 self.assertEqual(f._file.getvalue(), b'') 1235 # A SpooledTemporaryFile can be truncated to a specific size 1236 f = tempfile.SpooledTemporaryFile(max_size=10) 1237 f.write(b'abcdefg\n') 1238 f.truncate(4) 1239 self.assertFalse(f._rolled) 1240 self.assertEqual(f._file.getvalue(), b'abcd') 1241 # A SpooledTemporaryFile rolls over if truncated to large size 1242 f = tempfile.SpooledTemporaryFile(max_size=10) 1243 f.write(b'abcdefg\n') 1244 f.truncate(20) 1245 self.assertTrue(f._rolled) 1246 self.assertEqual(os.fstat(f.fileno()).st_size, 20) 1247 1248 def test_class_getitem(self): 1249 self.assertIsInstance(tempfile.SpooledTemporaryFile[bytes], 1250 types.GenericAlias) 1251 1252if tempfile.NamedTemporaryFile is not tempfile.TemporaryFile: 1253 1254 class TestTemporaryFile(BaseTestCase): 1255 """Test TemporaryFile().""" 1256 1257 def test_basic(self): 1258 # TemporaryFile can create files 1259 # No point in testing the name params - the file has no name. 1260 tempfile.TemporaryFile() 1261 1262 def test_has_no_name(self): 1263 # TemporaryFile creates files with no names (on this system) 1264 dir = tempfile.mkdtemp() 1265 f = tempfile.TemporaryFile(dir=dir) 1266 f.write(b'blat') 1267 1268 # Sneaky: because this file has no name, it should not prevent 1269 # us from removing the directory it was created in. 1270 try: 1271 os.rmdir(dir) 1272 except: 1273 # cleanup 1274 f.close() 1275 os.rmdir(dir) 1276 raise 1277 1278 def test_multiple_close(self): 1279 # A TemporaryFile can be closed many times without error 1280 f = tempfile.TemporaryFile() 1281 f.write(b'abc\n') 1282 f.close() 1283 f.close() 1284 f.close() 1285 1286 # How to test the mode and bufsize parameters? 1287 def test_mode_and_encoding(self): 1288 1289 def roundtrip(input, *args, **kwargs): 1290 with tempfile.TemporaryFile(*args, **kwargs) as fileobj: 1291 fileobj.write(input) 1292 fileobj.seek(0) 1293 self.assertEqual(input, fileobj.read()) 1294 1295 roundtrip(b"1234", "w+b") 1296 roundtrip("abdc\n", "w+") 1297 roundtrip("\u039B", "w+", encoding="utf-16") 1298 roundtrip("foo\r\n", "w+", newline="") 1299 1300 def test_no_leak_fd(self): 1301 # Issue #21058: don't leak file descriptor when io.open() fails 1302 closed = [] 1303 os_close = os.close 1304 def close(fd): 1305 closed.append(fd) 1306 os_close(fd) 1307 1308 with mock.patch('os.close', side_effect=close): 1309 with mock.patch('io.open', side_effect=ValueError): 1310 self.assertRaises(ValueError, tempfile.TemporaryFile) 1311 self.assertEqual(len(closed), 1) 1312 1313 1314 1315# Helper for test_del_on_shutdown 1316class NulledModules: 1317 def __init__(self, *modules): 1318 self.refs = [mod.__dict__ for mod in modules] 1319 self.contents = [ref.copy() for ref in self.refs] 1320 1321 def __enter__(self): 1322 for d in self.refs: 1323 for key in d: 1324 d[key] = None 1325 1326 def __exit__(self, *exc_info): 1327 for d, c in zip(self.refs, self.contents): 1328 d.clear() 1329 d.update(c) 1330 1331class TestTemporaryDirectory(BaseTestCase): 1332 """Test TemporaryDirectory().""" 1333 1334 def do_create(self, dir=None, pre="", suf="", recurse=1, dirs=1, files=1): 1335 if dir is None: 1336 dir = tempfile.gettempdir() 1337 tmp = tempfile.TemporaryDirectory(dir=dir, prefix=pre, suffix=suf) 1338 self.nameCheck(tmp.name, dir, pre, suf) 1339 self.do_create2(tmp.name, recurse, dirs, files) 1340 return tmp 1341 1342 def do_create2(self, path, recurse=1, dirs=1, files=1): 1343 # Create subdirectories and some files 1344 if recurse: 1345 for i in range(dirs): 1346 name = os.path.join(path, "dir%d" % i) 1347 os.mkdir(name) 1348 self.do_create2(name, recurse-1, dirs, files) 1349 for i in range(files): 1350 with open(os.path.join(path, "test%d.txt" % i), "wb") as f: 1351 f.write(b"Hello world!") 1352 1353 def test_mkdtemp_failure(self): 1354 # Check no additional exception if mkdtemp fails 1355 # Previously would raise AttributeError instead 1356 # (noted as part of Issue #10188) 1357 with tempfile.TemporaryDirectory() as nonexistent: 1358 pass 1359 with self.assertRaises(FileNotFoundError) as cm: 1360 tempfile.TemporaryDirectory(dir=nonexistent) 1361 self.assertEqual(cm.exception.errno, errno.ENOENT) 1362 1363 def test_explicit_cleanup(self): 1364 # A TemporaryDirectory is deleted when cleaned up 1365 dir = tempfile.mkdtemp() 1366 try: 1367 d = self.do_create(dir=dir) 1368 self.assertTrue(os.path.exists(d.name), 1369 "TemporaryDirectory %s does not exist" % d.name) 1370 d.cleanup() 1371 self.assertFalse(os.path.exists(d.name), 1372 "TemporaryDirectory %s exists after cleanup" % d.name) 1373 finally: 1374 os.rmdir(dir) 1375 1376 @support.skip_unless_symlink 1377 def test_cleanup_with_symlink_to_a_directory(self): 1378 # cleanup() should not follow symlinks to directories (issue #12464) 1379 d1 = self.do_create() 1380 d2 = self.do_create(recurse=0) 1381 1382 # Symlink d1/foo -> d2 1383 os.symlink(d2.name, os.path.join(d1.name, "foo")) 1384 1385 # This call to cleanup() should not follow the "foo" symlink 1386 d1.cleanup() 1387 1388 self.assertFalse(os.path.exists(d1.name), 1389 "TemporaryDirectory %s exists after cleanup" % d1.name) 1390 self.assertTrue(os.path.exists(d2.name), 1391 "Directory pointed to by a symlink was deleted") 1392 self.assertEqual(os.listdir(d2.name), ['test0.txt'], 1393 "Contents of the directory pointed to by a symlink " 1394 "were deleted") 1395 d2.cleanup() 1396 1397 @support.cpython_only 1398 def test_del_on_collection(self): 1399 # A TemporaryDirectory is deleted when garbage collected 1400 dir = tempfile.mkdtemp() 1401 try: 1402 d = self.do_create(dir=dir) 1403 name = d.name 1404 del d # Rely on refcounting to invoke __del__ 1405 self.assertFalse(os.path.exists(name), 1406 "TemporaryDirectory %s exists after __del__" % name) 1407 finally: 1408 os.rmdir(dir) 1409 1410 def test_del_on_shutdown(self): 1411 # A TemporaryDirectory may be cleaned up during shutdown 1412 with self.do_create() as dir: 1413 for mod in ('builtins', 'os', 'shutil', 'sys', 'tempfile', 'warnings'): 1414 code = """if True: 1415 import builtins 1416 import os 1417 import shutil 1418 import sys 1419 import tempfile 1420 import warnings 1421 1422 tmp = tempfile.TemporaryDirectory(dir={dir!r}) 1423 sys.stdout.buffer.write(tmp.name.encode()) 1424 1425 tmp2 = os.path.join(tmp.name, 'test_dir') 1426 os.mkdir(tmp2) 1427 with open(os.path.join(tmp2, "test0.txt"), "w") as f: 1428 f.write("Hello world!") 1429 1430 {mod}.tmp = tmp 1431 1432 warnings.filterwarnings("always", category=ResourceWarning) 1433 """.format(dir=dir, mod=mod) 1434 rc, out, err = script_helper.assert_python_ok("-c", code) 1435 tmp_name = out.decode().strip() 1436 self.assertFalse(os.path.exists(tmp_name), 1437 "TemporaryDirectory %s exists after cleanup" % tmp_name) 1438 err = err.decode('utf-8', 'backslashreplace') 1439 self.assertNotIn("Exception ", err) 1440 self.assertIn("ResourceWarning: Implicitly cleaning up", err) 1441 1442 def test_exit_on_shutdown(self): 1443 # Issue #22427 1444 with self.do_create() as dir: 1445 code = """if True: 1446 import sys 1447 import tempfile 1448 import warnings 1449 1450 def generator(): 1451 with tempfile.TemporaryDirectory(dir={dir!r}) as tmp: 1452 yield tmp 1453 g = generator() 1454 sys.stdout.buffer.write(next(g).encode()) 1455 1456 warnings.filterwarnings("always", category=ResourceWarning) 1457 """.format(dir=dir) 1458 rc, out, err = script_helper.assert_python_ok("-c", code) 1459 tmp_name = out.decode().strip() 1460 self.assertFalse(os.path.exists(tmp_name), 1461 "TemporaryDirectory %s exists after cleanup" % tmp_name) 1462 err = err.decode('utf-8', 'backslashreplace') 1463 self.assertNotIn("Exception ", err) 1464 self.assertIn("ResourceWarning: Implicitly cleaning up", err) 1465 1466 def test_warnings_on_cleanup(self): 1467 # ResourceWarning will be triggered by __del__ 1468 with self.do_create() as dir: 1469 d = self.do_create(dir=dir, recurse=3) 1470 name = d.name 1471 1472 # Check for the resource warning 1473 with support.check_warnings(('Implicitly', ResourceWarning), quiet=False): 1474 warnings.filterwarnings("always", category=ResourceWarning) 1475 del d 1476 support.gc_collect() 1477 self.assertFalse(os.path.exists(name), 1478 "TemporaryDirectory %s exists after __del__" % name) 1479 1480 def test_multiple_close(self): 1481 # Can be cleaned-up many times without error 1482 d = self.do_create() 1483 d.cleanup() 1484 d.cleanup() 1485 d.cleanup() 1486 1487 def test_context_manager(self): 1488 # Can be used as a context manager 1489 d = self.do_create() 1490 with d as name: 1491 self.assertTrue(os.path.exists(name)) 1492 self.assertEqual(name, d.name) 1493 self.assertFalse(os.path.exists(name)) 1494 1495 def test_modes(self): 1496 for mode in range(8): 1497 mode <<= 6 1498 with self.subTest(mode=format(mode, '03o')): 1499 d = self.do_create(recurse=3, dirs=2, files=2) 1500 with d: 1501 # Change files and directories mode recursively. 1502 for root, dirs, files in os.walk(d.name, topdown=False): 1503 for name in files: 1504 os.chmod(os.path.join(root, name), mode) 1505 os.chmod(root, mode) 1506 d.cleanup() 1507 self.assertFalse(os.path.exists(d.name)) 1508 1509 @unittest.skipUnless(hasattr(os, 'chflags'), 'requires os.lchflags') 1510 def test_flags(self): 1511 flags = stat.UF_IMMUTABLE | stat.UF_NOUNLINK 1512 d = self.do_create(recurse=3, dirs=2, files=2) 1513 with d: 1514 # Change files and directories flags recursively. 1515 for root, dirs, files in os.walk(d.name, topdown=False): 1516 for name in files: 1517 os.chflags(os.path.join(root, name), flags) 1518 os.chflags(root, flags) 1519 d.cleanup() 1520 self.assertFalse(os.path.exists(d.name)) 1521 1522 1523if __name__ == "__main__": 1524 unittest.main() 1525