1# tempfile.py unit tests. 2import tempfile 3import errno 4import io 5import os 6import pathlib 7import sys 8import re 9import warnings 10import contextlib 11import stat 12import types 13import weakref 14from unittest import mock 15 16import unittest 17from test import support 18from test.support import os_helper 19from test.support import script_helper 20from test.support import warnings_helper 21 22 23has_textmode = (tempfile._text_openflags != tempfile._bin_openflags) 24has_spawnl = hasattr(os, 'spawnl') 25 26# TEST_FILES may need to be tweaked for systems depending on the maximum 27# number of files that can be opened at one time (see ulimit -n) 28if sys.platform.startswith('openbsd'): 29 TEST_FILES = 48 30else: 31 TEST_FILES = 100 32 33# This is organized as one test for each chunk of code in tempfile.py, 34# in order of their appearance in the file. Testing which requires 35# threads is not done here. 36 37class TestLowLevelInternals(unittest.TestCase): 38 def test_infer_return_type_singles(self): 39 self.assertIs(str, tempfile._infer_return_type('')) 40 self.assertIs(bytes, tempfile._infer_return_type(b'')) 41 self.assertIs(str, tempfile._infer_return_type(None)) 42 43 def test_infer_return_type_multiples(self): 44 self.assertIs(str, tempfile._infer_return_type('', '')) 45 self.assertIs(bytes, tempfile._infer_return_type(b'', b'')) 46 with self.assertRaises(TypeError): 47 tempfile._infer_return_type('', b'') 48 with self.assertRaises(TypeError): 49 tempfile._infer_return_type(b'', '') 50 51 def test_infer_return_type_multiples_and_none(self): 52 self.assertIs(str, tempfile._infer_return_type(None, '')) 53 self.assertIs(str, tempfile._infer_return_type('', None)) 54 self.assertIs(str, tempfile._infer_return_type(None, None)) 55 self.assertIs(bytes, tempfile._infer_return_type(b'', None)) 56 self.assertIs(bytes, tempfile._infer_return_type(None, b'')) 57 with self.assertRaises(TypeError): 58 tempfile._infer_return_type('', None, b'') 59 with self.assertRaises(TypeError): 60 tempfile._infer_return_type(b'', None, '') 61 62 def test_infer_return_type_pathlib(self): 63 self.assertIs(str, tempfile._infer_return_type(pathlib.Path('/'))) 64 65 def test_infer_return_type_pathlike(self): 66 class Path: 67 def __init__(self, path): 68 self.path = path 69 70 def __fspath__(self): 71 return self.path 72 73 self.assertIs(str, tempfile._infer_return_type(Path('/'))) 74 self.assertIs(bytes, tempfile._infer_return_type(Path(b'/'))) 75 self.assertIs(str, tempfile._infer_return_type('', Path(''))) 76 self.assertIs(bytes, tempfile._infer_return_type(b'', Path(b''))) 77 self.assertIs(bytes, tempfile._infer_return_type(None, Path(b''))) 78 self.assertIs(str, tempfile._infer_return_type(None, Path(''))) 79 80 with self.assertRaises(TypeError): 81 tempfile._infer_return_type('', Path(b'')) 82 with self.assertRaises(TypeError): 83 tempfile._infer_return_type(b'', Path('')) 84 85# Common functionality. 86 87class BaseTestCase(unittest.TestCase): 88 89 str_check = re.compile(r"^[a-z0-9_-]{8}$") 90 b_check = re.compile(br"^[a-z0-9_-]{8}$") 91 92 def setUp(self): 93 self._warnings_manager = warnings_helper.check_warnings() 94 self._warnings_manager.__enter__() 95 warnings.filterwarnings("ignore", category=RuntimeWarning, 96 message="mktemp", module=__name__) 97 98 def tearDown(self): 99 self._warnings_manager.__exit__(None, None, None) 100 101 def nameCheck(self, name, dir, pre, suf): 102 (ndir, nbase) = os.path.split(name) 103 npre = nbase[:len(pre)] 104 nsuf = nbase[len(nbase)-len(suf):] 105 106 if dir is not None: 107 self.assertIs( 108 type(name), 109 str 110 if type(dir) is str or isinstance(dir, os.PathLike) else 111 bytes, 112 "unexpected return type", 113 ) 114 if pre is not None: 115 self.assertIs(type(name), str if type(pre) is str else bytes, 116 "unexpected return type") 117 if suf is not None: 118 self.assertIs(type(name), str if type(suf) is str else bytes, 119 "unexpected return type") 120 if (dir, pre, suf) == (None, None, None): 121 self.assertIs(type(name), str, "default return type must be str") 122 123 # check for equality of the absolute paths! 124 self.assertEqual(os.path.abspath(ndir), os.path.abspath(dir), 125 "file %r not in directory %r" % (name, dir)) 126 self.assertEqual(npre, pre, 127 "file %r does not begin with %r" % (nbase, pre)) 128 self.assertEqual(nsuf, suf, 129 "file %r does not end with %r" % (nbase, suf)) 130 131 nbase = nbase[len(pre):len(nbase)-len(suf)] 132 check = self.str_check if isinstance(nbase, str) else self.b_check 133 self.assertTrue(check.match(nbase), 134 "random characters %r do not match %r" 135 % (nbase, check.pattern)) 136 137 138class TestExports(BaseTestCase): 139 def test_exports(self): 140 # There are no surprising symbols in the tempfile module 141 dict = tempfile.__dict__ 142 143 expected = { 144 "NamedTemporaryFile" : 1, 145 "TemporaryFile" : 1, 146 "mkstemp" : 1, 147 "mkdtemp" : 1, 148 "mktemp" : 1, 149 "TMP_MAX" : 1, 150 "gettempprefix" : 1, 151 "gettempprefixb" : 1, 152 "gettempdir" : 1, 153 "gettempdirb" : 1, 154 "tempdir" : 1, 155 "template" : 1, 156 "SpooledTemporaryFile" : 1, 157 "TemporaryDirectory" : 1, 158 } 159 160 unexp = [] 161 for key in dict: 162 if key[0] != '_' and key not in expected: 163 unexp.append(key) 164 self.assertTrue(len(unexp) == 0, 165 "unexpected keys: %s" % unexp) 166 167 168class TestRandomNameSequence(BaseTestCase): 169 """Test the internal iterator object _RandomNameSequence.""" 170 171 def setUp(self): 172 self.r = tempfile._RandomNameSequence() 173 super().setUp() 174 175 def test_get_eight_char_str(self): 176 # _RandomNameSequence returns a eight-character string 177 s = next(self.r) 178 self.nameCheck(s, '', '', '') 179 180 def test_many(self): 181 # _RandomNameSequence returns no duplicate strings (stochastic) 182 183 dict = {} 184 r = self.r 185 for i in range(TEST_FILES): 186 s = next(r) 187 self.nameCheck(s, '', '', '') 188 self.assertNotIn(s, dict) 189 dict[s] = 1 190 191 def supports_iter(self): 192 # _RandomNameSequence supports the iterator protocol 193 194 i = 0 195 r = self.r 196 for s in r: 197 i += 1 198 if i == 20: 199 break 200 201 @unittest.skipUnless(hasattr(os, 'fork'), 202 "os.fork is required for this test") 203 def test_process_awareness(self): 204 # ensure that the random source differs between 205 # child and parent. 206 read_fd, write_fd = os.pipe() 207 pid = None 208 try: 209 pid = os.fork() 210 if not pid: 211 # child process 212 os.close(read_fd) 213 os.write(write_fd, next(self.r).encode("ascii")) 214 os.close(write_fd) 215 # bypass the normal exit handlers- leave those to 216 # the parent. 217 os._exit(0) 218 219 # parent process 220 parent_value = next(self.r) 221 child_value = os.read(read_fd, len(parent_value)).decode("ascii") 222 finally: 223 if pid: 224 support.wait_process(pid, exitcode=0) 225 226 os.close(read_fd) 227 os.close(write_fd) 228 self.assertNotEqual(child_value, parent_value) 229 230 231 232class TestCandidateTempdirList(BaseTestCase): 233 """Test the internal function _candidate_tempdir_list.""" 234 235 def test_nonempty_list(self): 236 # _candidate_tempdir_list returns a nonempty list of strings 237 238 cand = tempfile._candidate_tempdir_list() 239 240 self.assertFalse(len(cand) == 0) 241 for c in cand: 242 self.assertIsInstance(c, str) 243 244 def test_wanted_dirs(self): 245 # _candidate_tempdir_list contains the expected directories 246 247 # Make sure the interesting environment variables are all set. 248 with os_helper.EnvironmentVarGuard() as env: 249 for envname in 'TMPDIR', 'TEMP', 'TMP': 250 dirname = os.getenv(envname) 251 if not dirname: 252 env[envname] = os.path.abspath(envname) 253 254 cand = tempfile._candidate_tempdir_list() 255 256 for envname in 'TMPDIR', 'TEMP', 'TMP': 257 dirname = os.getenv(envname) 258 if not dirname: raise ValueError 259 self.assertIn(dirname, cand) 260 261 try: 262 dirname = os.getcwd() 263 except (AttributeError, OSError): 264 dirname = os.curdir 265 266 self.assertIn(dirname, cand) 267 268 # Not practical to try to verify the presence of OS-specific 269 # paths in this list. 270 271 272# We test _get_default_tempdir some more by testing gettempdir. 273 274class TestGetDefaultTempdir(BaseTestCase): 275 """Test _get_default_tempdir().""" 276 277 def test_no_files_left_behind(self): 278 # use a private empty directory 279 with tempfile.TemporaryDirectory() as our_temp_directory: 280 # force _get_default_tempdir() to consider our empty directory 281 def our_candidate_list(): 282 return [our_temp_directory] 283 284 with support.swap_attr(tempfile, "_candidate_tempdir_list", 285 our_candidate_list): 286 # verify our directory is empty after _get_default_tempdir() 287 tempfile._get_default_tempdir() 288 self.assertEqual(os.listdir(our_temp_directory), []) 289 290 def raise_OSError(*args, **kwargs): 291 raise OSError() 292 293 with support.swap_attr(io, "open", raise_OSError): 294 # test again with failing io.open() 295 with self.assertRaises(FileNotFoundError): 296 tempfile._get_default_tempdir() 297 self.assertEqual(os.listdir(our_temp_directory), []) 298 299 def bad_writer(*args, **kwargs): 300 fp = orig_open(*args, **kwargs) 301 fp.write = raise_OSError 302 return fp 303 304 with support.swap_attr(io, "open", bad_writer) as orig_open: 305 # test again with failing write() 306 with self.assertRaises(FileNotFoundError): 307 tempfile._get_default_tempdir() 308 self.assertEqual(os.listdir(our_temp_directory), []) 309 310 311class TestGetCandidateNames(BaseTestCase): 312 """Test the internal function _get_candidate_names.""" 313 314 def test_retval(self): 315 # _get_candidate_names returns a _RandomNameSequence object 316 obj = tempfile._get_candidate_names() 317 self.assertIsInstance(obj, tempfile._RandomNameSequence) 318 319 def test_same_thing(self): 320 # _get_candidate_names always returns the same object 321 a = tempfile._get_candidate_names() 322 b = tempfile._get_candidate_names() 323 324 self.assertTrue(a is b) 325 326 327@contextlib.contextmanager 328def _inside_empty_temp_dir(): 329 dir = tempfile.mkdtemp() 330 try: 331 with support.swap_attr(tempfile, 'tempdir', dir): 332 yield 333 finally: 334 os_helper.rmtree(dir) 335 336 337def _mock_candidate_names(*names): 338 return support.swap_attr(tempfile, 339 '_get_candidate_names', 340 lambda: iter(names)) 341 342 343class TestBadTempdir: 344 345 def test_read_only_directory(self): 346 with _inside_empty_temp_dir(): 347 oldmode = mode = os.stat(tempfile.tempdir).st_mode 348 mode &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH) 349 os.chmod(tempfile.tempdir, mode) 350 try: 351 if os.access(tempfile.tempdir, os.W_OK): 352 self.skipTest("can't set the directory read-only") 353 with self.assertRaises(PermissionError): 354 self.make_temp() 355 self.assertEqual(os.listdir(tempfile.tempdir), []) 356 finally: 357 os.chmod(tempfile.tempdir, oldmode) 358 359 def test_nonexisting_directory(self): 360 with _inside_empty_temp_dir(): 361 tempdir = os.path.join(tempfile.tempdir, 'nonexistent') 362 with support.swap_attr(tempfile, 'tempdir', tempdir): 363 with self.assertRaises(FileNotFoundError): 364 self.make_temp() 365 366 def test_non_directory(self): 367 with _inside_empty_temp_dir(): 368 tempdir = os.path.join(tempfile.tempdir, 'file') 369 open(tempdir, 'wb').close() 370 with support.swap_attr(tempfile, 'tempdir', tempdir): 371 with self.assertRaises((NotADirectoryError, FileNotFoundError)): 372 self.make_temp() 373 374 375class TestMkstempInner(TestBadTempdir, BaseTestCase): 376 """Test the internal function _mkstemp_inner.""" 377 378 class mkstemped: 379 _bflags = tempfile._bin_openflags 380 _tflags = tempfile._text_openflags 381 _close = os.close 382 _unlink = os.unlink 383 384 def __init__(self, dir, pre, suf, bin): 385 if bin: flags = self._bflags 386 else: flags = self._tflags 387 388 output_type = tempfile._infer_return_type(dir, pre, suf) 389 (self.fd, self.name) = tempfile._mkstemp_inner(dir, pre, suf, flags, output_type) 390 391 def write(self, str): 392 os.write(self.fd, str) 393 394 def __del__(self): 395 self._close(self.fd) 396 self._unlink(self.name) 397 398 def do_create(self, dir=None, pre=None, suf=None, bin=1): 399 output_type = tempfile._infer_return_type(dir, pre, suf) 400 if dir is None: 401 if output_type is str: 402 dir = tempfile.gettempdir() 403 else: 404 dir = tempfile.gettempdirb() 405 if pre is None: 406 pre = output_type() 407 if suf is None: 408 suf = output_type() 409 file = self.mkstemped(dir, pre, suf, bin) 410 411 self.nameCheck(file.name, dir, pre, suf) 412 return file 413 414 def test_basic(self): 415 # _mkstemp_inner can create files 416 self.do_create().write(b"blat") 417 self.do_create(pre="a").write(b"blat") 418 self.do_create(suf="b").write(b"blat") 419 self.do_create(pre="a", suf="b").write(b"blat") 420 self.do_create(pre="aa", suf=".txt").write(b"blat") 421 422 def test_basic_with_bytes_names(self): 423 # _mkstemp_inner can create files when given name parts all 424 # specified as bytes. 425 dir_b = tempfile.gettempdirb() 426 self.do_create(dir=dir_b, suf=b"").write(b"blat") 427 self.do_create(dir=dir_b, pre=b"a").write(b"blat") 428 self.do_create(dir=dir_b, suf=b"b").write(b"blat") 429 self.do_create(dir=dir_b, pre=b"a", suf=b"b").write(b"blat") 430 self.do_create(dir=dir_b, pre=b"aa", suf=b".txt").write(b"blat") 431 # Can't mix str & binary types in the args. 432 with self.assertRaises(TypeError): 433 self.do_create(dir="", suf=b"").write(b"blat") 434 with self.assertRaises(TypeError): 435 self.do_create(dir=dir_b, pre="").write(b"blat") 436 with self.assertRaises(TypeError): 437 self.do_create(dir=dir_b, pre=b"", suf="").write(b"blat") 438 439 def test_basic_many(self): 440 # _mkstemp_inner can create many files (stochastic) 441 extant = list(range(TEST_FILES)) 442 for i in extant: 443 extant[i] = self.do_create(pre="aa") 444 445 def test_choose_directory(self): 446 # _mkstemp_inner can create files in a user-selected directory 447 dir = tempfile.mkdtemp() 448 try: 449 self.do_create(dir=dir).write(b"blat") 450 self.do_create(dir=pathlib.Path(dir)).write(b"blat") 451 finally: 452 support.gc_collect() # For PyPy or other GCs. 453 os.rmdir(dir) 454 455 def test_file_mode(self): 456 # _mkstemp_inner creates files with the proper mode 457 458 file = self.do_create() 459 mode = stat.S_IMODE(os.stat(file.name).st_mode) 460 expected = 0o600 461 if sys.platform == 'win32': 462 # There's no distinction among 'user', 'group' and 'world'; 463 # replicate the 'user' bits. 464 user = expected >> 6 465 expected = user * (1 + 8 + 64) 466 self.assertEqual(mode, expected) 467 468 @unittest.skipUnless(has_spawnl, 'os.spawnl not available') 469 def test_noinherit(self): 470 # _mkstemp_inner file handles are not inherited by child processes 471 472 if support.verbose: 473 v="v" 474 else: 475 v="q" 476 477 file = self.do_create() 478 self.assertEqual(os.get_inheritable(file.fd), False) 479 fd = "%d" % file.fd 480 481 try: 482 me = __file__ 483 except NameError: 484 me = sys.argv[0] 485 486 # We have to exec something, so that FD_CLOEXEC will take 487 # effect. The core of this test is therefore in 488 # tf_inherit_check.py, which see. 489 tester = os.path.join(os.path.dirname(os.path.abspath(me)), 490 "tf_inherit_check.py") 491 492 # On Windows a spawn* /path/ with embedded spaces shouldn't be quoted, 493 # but an arg with embedded spaces should be decorated with double 494 # quotes on each end 495 if sys.platform == 'win32': 496 decorated = '"%s"' % sys.executable 497 tester = '"%s"' % tester 498 else: 499 decorated = sys.executable 500 501 retval = os.spawnl(os.P_WAIT, sys.executable, decorated, tester, v, fd) 502 self.assertFalse(retval < 0, 503 "child process caught fatal signal %d" % -retval) 504 self.assertFalse(retval > 0, "child process reports failure %d"%retval) 505 506 @unittest.skipUnless(has_textmode, "text mode not available") 507 def test_textmode(self): 508 # _mkstemp_inner can create files in text mode 509 510 # A text file is truncated at the first Ctrl+Z byte 511 f = self.do_create(bin=0) 512 f.write(b"blat\x1a") 513 f.write(b"extra\n") 514 os.lseek(f.fd, 0, os.SEEK_SET) 515 self.assertEqual(os.read(f.fd, 20), b"blat") 516 517 def make_temp(self): 518 return tempfile._mkstemp_inner(tempfile.gettempdir(), 519 tempfile.gettempprefix(), 520 '', 521 tempfile._bin_openflags, 522 str) 523 524 def test_collision_with_existing_file(self): 525 # _mkstemp_inner tries another name when a file with 526 # the chosen name already exists 527 with _inside_empty_temp_dir(), \ 528 _mock_candidate_names('aaa', 'aaa', 'bbb'): 529 (fd1, name1) = self.make_temp() 530 os.close(fd1) 531 self.assertTrue(name1.endswith('aaa')) 532 533 (fd2, name2) = self.make_temp() 534 os.close(fd2) 535 self.assertTrue(name2.endswith('bbb')) 536 537 def test_collision_with_existing_directory(self): 538 # _mkstemp_inner tries another name when a directory with 539 # the chosen name already exists 540 with _inside_empty_temp_dir(), \ 541 _mock_candidate_names('aaa', 'aaa', 'bbb'): 542 dir = tempfile.mkdtemp() 543 self.assertTrue(dir.endswith('aaa')) 544 545 (fd, name) = self.make_temp() 546 os.close(fd) 547 self.assertTrue(name.endswith('bbb')) 548 549 550class TestGetTempPrefix(BaseTestCase): 551 """Test gettempprefix().""" 552 553 def test_sane_template(self): 554 # gettempprefix returns a nonempty prefix string 555 p = tempfile.gettempprefix() 556 557 self.assertIsInstance(p, str) 558 self.assertGreater(len(p), 0) 559 560 pb = tempfile.gettempprefixb() 561 562 self.assertIsInstance(pb, bytes) 563 self.assertGreater(len(pb), 0) 564 565 def test_usable_template(self): 566 # gettempprefix returns a usable prefix string 567 568 # Create a temp directory, avoiding use of the prefix. 569 # Then attempt to create a file whose name is 570 # prefix + 'xxxxxx.xxx' in that directory. 571 p = tempfile.gettempprefix() + "xxxxxx.xxx" 572 d = tempfile.mkdtemp(prefix="") 573 try: 574 p = os.path.join(d, p) 575 fd = os.open(p, os.O_RDWR | os.O_CREAT) 576 os.close(fd) 577 os.unlink(p) 578 finally: 579 os.rmdir(d) 580 581 582class TestGetTempDir(BaseTestCase): 583 """Test gettempdir().""" 584 585 def test_directory_exists(self): 586 # gettempdir returns a directory which exists 587 588 for d in (tempfile.gettempdir(), tempfile.gettempdirb()): 589 self.assertTrue(os.path.isabs(d) or d == os.curdir, 590 "%r is not an absolute path" % d) 591 self.assertTrue(os.path.isdir(d), 592 "%r is not a directory" % d) 593 594 def test_directory_writable(self): 595 # gettempdir returns a directory writable by the user 596 597 # sneaky: just instantiate a NamedTemporaryFile, which 598 # defaults to writing into the directory returned by 599 # gettempdir. 600 with tempfile.NamedTemporaryFile() as file: 601 file.write(b"blat") 602 603 def test_same_thing(self): 604 # gettempdir always returns the same object 605 a = tempfile.gettempdir() 606 b = tempfile.gettempdir() 607 c = tempfile.gettempdirb() 608 609 self.assertTrue(a is b) 610 self.assertNotEqual(type(a), type(c)) 611 self.assertEqual(a, os.fsdecode(c)) 612 613 def test_case_sensitive(self): 614 # gettempdir should not flatten its case 615 # even on a case-insensitive file system 616 case_sensitive_tempdir = tempfile.mkdtemp("-Temp") 617 _tempdir, tempfile.tempdir = tempfile.tempdir, None 618 try: 619 with os_helper.EnvironmentVarGuard() as env: 620 # Fake the first env var which is checked as a candidate 621 env["TMPDIR"] = case_sensitive_tempdir 622 self.assertEqual(tempfile.gettempdir(), case_sensitive_tempdir) 623 finally: 624 tempfile.tempdir = _tempdir 625 os_helper.rmdir(case_sensitive_tempdir) 626 627 628class TestMkstemp(BaseTestCase): 629 """Test mkstemp().""" 630 631 def do_create(self, dir=None, pre=None, suf=None): 632 output_type = tempfile._infer_return_type(dir, pre, suf) 633 if dir is None: 634 if output_type is str: 635 dir = tempfile.gettempdir() 636 else: 637 dir = tempfile.gettempdirb() 638 if pre is None: 639 pre = output_type() 640 if suf is None: 641 suf = output_type() 642 (fd, name) = tempfile.mkstemp(dir=dir, prefix=pre, suffix=suf) 643 (ndir, nbase) = os.path.split(name) 644 adir = os.path.abspath(dir) 645 self.assertEqual(adir, ndir, 646 "Directory '%s' incorrectly returned as '%s'" % (adir, ndir)) 647 648 try: 649 self.nameCheck(name, dir, pre, suf) 650 finally: 651 os.close(fd) 652 os.unlink(name) 653 654 def test_basic(self): 655 # mkstemp can create files 656 self.do_create() 657 self.do_create(pre="a") 658 self.do_create(suf="b") 659 self.do_create(pre="a", suf="b") 660 self.do_create(pre="aa", suf=".txt") 661 self.do_create(dir=".") 662 663 def test_basic_with_bytes_names(self): 664 # mkstemp can create files when given name parts all 665 # specified as bytes. 666 d = tempfile.gettempdirb() 667 self.do_create(dir=d, suf=b"") 668 self.do_create(dir=d, pre=b"a") 669 self.do_create(dir=d, suf=b"b") 670 self.do_create(dir=d, pre=b"a", suf=b"b") 671 self.do_create(dir=d, pre=b"aa", suf=b".txt") 672 self.do_create(dir=b".") 673 with self.assertRaises(TypeError): 674 self.do_create(dir=".", pre=b"aa", suf=b".txt") 675 with self.assertRaises(TypeError): 676 self.do_create(dir=b".", pre="aa", suf=b".txt") 677 with self.assertRaises(TypeError): 678 self.do_create(dir=b".", pre=b"aa", suf=".txt") 679 680 681 def test_choose_directory(self): 682 # mkstemp can create directories in a user-selected directory 683 dir = tempfile.mkdtemp() 684 try: 685 self.do_create(dir=dir) 686 self.do_create(dir=pathlib.Path(dir)) 687 finally: 688 os.rmdir(dir) 689 690 def test_for_tempdir_is_bytes_issue40701_api_warts(self): 691 orig_tempdir = tempfile.tempdir 692 self.assertIsInstance(tempfile.tempdir, (str, type(None))) 693 try: 694 fd, path = tempfile.mkstemp() 695 os.close(fd) 696 os.unlink(path) 697 self.assertIsInstance(path, str) 698 tempfile.tempdir = tempfile.gettempdirb() 699 self.assertIsInstance(tempfile.tempdir, bytes) 700 self.assertIsInstance(tempfile.gettempdir(), str) 701 self.assertIsInstance(tempfile.gettempdirb(), bytes) 702 fd, path = tempfile.mkstemp() 703 os.close(fd) 704 os.unlink(path) 705 self.assertIsInstance(path, bytes) 706 fd, path = tempfile.mkstemp(suffix='.txt') 707 os.close(fd) 708 os.unlink(path) 709 self.assertIsInstance(path, str) 710 fd, path = tempfile.mkstemp(prefix='test-temp-') 711 os.close(fd) 712 os.unlink(path) 713 self.assertIsInstance(path, str) 714 fd, path = tempfile.mkstemp(dir=tempfile.gettempdir()) 715 os.close(fd) 716 os.unlink(path) 717 self.assertIsInstance(path, str) 718 finally: 719 tempfile.tempdir = orig_tempdir 720 721 722class TestMkdtemp(TestBadTempdir, BaseTestCase): 723 """Test mkdtemp().""" 724 725 def make_temp(self): 726 return tempfile.mkdtemp() 727 728 def do_create(self, dir=None, pre=None, suf=None): 729 output_type = tempfile._infer_return_type(dir, pre, suf) 730 if dir is None: 731 if output_type is str: 732 dir = tempfile.gettempdir() 733 else: 734 dir = tempfile.gettempdirb() 735 if pre is None: 736 pre = output_type() 737 if suf is None: 738 suf = output_type() 739 name = tempfile.mkdtemp(dir=dir, prefix=pre, suffix=suf) 740 741 try: 742 self.nameCheck(name, dir, pre, suf) 743 return name 744 except: 745 os.rmdir(name) 746 raise 747 748 def test_basic(self): 749 # mkdtemp can create directories 750 os.rmdir(self.do_create()) 751 os.rmdir(self.do_create(pre="a")) 752 os.rmdir(self.do_create(suf="b")) 753 os.rmdir(self.do_create(pre="a", suf="b")) 754 os.rmdir(self.do_create(pre="aa", suf=".txt")) 755 756 def test_basic_with_bytes_names(self): 757 # mkdtemp can create directories when given all binary parts 758 d = tempfile.gettempdirb() 759 os.rmdir(self.do_create(dir=d)) 760 os.rmdir(self.do_create(dir=d, pre=b"a")) 761 os.rmdir(self.do_create(dir=d, suf=b"b")) 762 os.rmdir(self.do_create(dir=d, pre=b"a", suf=b"b")) 763 os.rmdir(self.do_create(dir=d, pre=b"aa", suf=b".txt")) 764 with self.assertRaises(TypeError): 765 os.rmdir(self.do_create(dir=d, pre="aa", suf=b".txt")) 766 with self.assertRaises(TypeError): 767 os.rmdir(self.do_create(dir=d, pre=b"aa", suf=".txt")) 768 with self.assertRaises(TypeError): 769 os.rmdir(self.do_create(dir="", pre=b"aa", suf=b".txt")) 770 771 def test_basic_many(self): 772 # mkdtemp can create many directories (stochastic) 773 extant = list(range(TEST_FILES)) 774 try: 775 for i in extant: 776 extant[i] = self.do_create(pre="aa") 777 finally: 778 for i in extant: 779 if(isinstance(i, str)): 780 os.rmdir(i) 781 782 def test_choose_directory(self): 783 # mkdtemp can create directories in a user-selected directory 784 dir = tempfile.mkdtemp() 785 try: 786 os.rmdir(self.do_create(dir=dir)) 787 os.rmdir(self.do_create(dir=pathlib.Path(dir))) 788 finally: 789 os.rmdir(dir) 790 791 def test_mode(self): 792 # mkdtemp creates directories with the proper mode 793 794 dir = self.do_create() 795 try: 796 mode = stat.S_IMODE(os.stat(dir).st_mode) 797 mode &= 0o777 # Mask off sticky bits inherited from /tmp 798 expected = 0o700 799 if sys.platform == 'win32': 800 # There's no distinction among 'user', 'group' and 'world'; 801 # replicate the 'user' bits. 802 user = expected >> 6 803 expected = user * (1 + 8 + 64) 804 self.assertEqual(mode, expected) 805 finally: 806 os.rmdir(dir) 807 808 def test_collision_with_existing_file(self): 809 # mkdtemp tries another name when a file with 810 # the chosen name already exists 811 with _inside_empty_temp_dir(), \ 812 _mock_candidate_names('aaa', 'aaa', 'bbb'): 813 file = tempfile.NamedTemporaryFile(delete=False) 814 file.close() 815 self.assertTrue(file.name.endswith('aaa')) 816 dir = tempfile.mkdtemp() 817 self.assertTrue(dir.endswith('bbb')) 818 819 def test_collision_with_existing_directory(self): 820 # mkdtemp tries another name when a directory with 821 # the chosen name already exists 822 with _inside_empty_temp_dir(), \ 823 _mock_candidate_names('aaa', 'aaa', 'bbb'): 824 dir1 = tempfile.mkdtemp() 825 self.assertTrue(dir1.endswith('aaa')) 826 dir2 = tempfile.mkdtemp() 827 self.assertTrue(dir2.endswith('bbb')) 828 829 def test_for_tempdir_is_bytes_issue40701_api_warts(self): 830 orig_tempdir = tempfile.tempdir 831 self.assertIsInstance(tempfile.tempdir, (str, type(None))) 832 try: 833 path = tempfile.mkdtemp() 834 os.rmdir(path) 835 self.assertIsInstance(path, str) 836 tempfile.tempdir = tempfile.gettempdirb() 837 self.assertIsInstance(tempfile.tempdir, bytes) 838 self.assertIsInstance(tempfile.gettempdir(), str) 839 self.assertIsInstance(tempfile.gettempdirb(), bytes) 840 path = tempfile.mkdtemp() 841 os.rmdir(path) 842 self.assertIsInstance(path, bytes) 843 path = tempfile.mkdtemp(suffix='-dir') 844 os.rmdir(path) 845 self.assertIsInstance(path, str) 846 path = tempfile.mkdtemp(prefix='test-mkdtemp-') 847 os.rmdir(path) 848 self.assertIsInstance(path, str) 849 path = tempfile.mkdtemp(dir=tempfile.gettempdir()) 850 os.rmdir(path) 851 self.assertIsInstance(path, str) 852 finally: 853 tempfile.tempdir = orig_tempdir 854 855 856class TestMktemp(BaseTestCase): 857 """Test mktemp().""" 858 859 # For safety, all use of mktemp must occur in a private directory. 860 # We must also suppress the RuntimeWarning it generates. 861 def setUp(self): 862 self.dir = tempfile.mkdtemp() 863 super().setUp() 864 865 def tearDown(self): 866 if self.dir: 867 os.rmdir(self.dir) 868 self.dir = None 869 super().tearDown() 870 871 class mktemped: 872 _unlink = os.unlink 873 _bflags = tempfile._bin_openflags 874 875 def __init__(self, dir, pre, suf): 876 self.name = tempfile.mktemp(dir=dir, prefix=pre, suffix=suf) 877 # Create the file. This will raise an exception if it's 878 # mysteriously appeared in the meanwhile. 879 os.close(os.open(self.name, self._bflags, 0o600)) 880 881 def __del__(self): 882 self._unlink(self.name) 883 884 def do_create(self, pre="", suf=""): 885 file = self.mktemped(self.dir, pre, suf) 886 887 self.nameCheck(file.name, self.dir, pre, suf) 888 return file 889 890 def test_basic(self): 891 # mktemp can choose usable file names 892 self.do_create() 893 self.do_create(pre="a") 894 self.do_create(suf="b") 895 self.do_create(pre="a", suf="b") 896 self.do_create(pre="aa", suf=".txt") 897 898 def test_many(self): 899 # mktemp can choose many usable file names (stochastic) 900 extant = list(range(TEST_FILES)) 901 for i in extant: 902 extant[i] = self.do_create(pre="aa") 903 del extant 904 support.gc_collect() # For PyPy or other GCs. 905 906## def test_warning(self): 907## # mktemp issues a warning when used 908## warnings.filterwarnings("error", 909## category=RuntimeWarning, 910## message="mktemp") 911## self.assertRaises(RuntimeWarning, 912## tempfile.mktemp, dir=self.dir) 913 914 915# We test _TemporaryFileWrapper by testing NamedTemporaryFile. 916 917 918class TestNamedTemporaryFile(BaseTestCase): 919 """Test NamedTemporaryFile().""" 920 921 def do_create(self, dir=None, pre="", suf="", delete=True): 922 if dir is None: 923 dir = tempfile.gettempdir() 924 file = tempfile.NamedTemporaryFile(dir=dir, prefix=pre, suffix=suf, 925 delete=delete) 926 927 self.nameCheck(file.name, dir, pre, suf) 928 return file 929 930 931 def test_basic(self): 932 # NamedTemporaryFile can create files 933 self.do_create() 934 self.do_create(pre="a") 935 self.do_create(suf="b") 936 self.do_create(pre="a", suf="b") 937 self.do_create(pre="aa", suf=".txt") 938 939 def test_method_lookup(self): 940 # Issue #18879: Looking up a temporary file method should keep it 941 # alive long enough. 942 f = self.do_create() 943 wr = weakref.ref(f) 944 write = f.write 945 write2 = f.write 946 del f 947 write(b'foo') 948 del write 949 write2(b'bar') 950 del write2 951 if support.check_impl_detail(cpython=True): 952 # No reference cycle was created. 953 self.assertIsNone(wr()) 954 955 def test_iter(self): 956 # Issue #23700: getting iterator from a temporary file should keep 957 # it alive as long as it's being iterated over 958 lines = [b'spam\n', b'eggs\n', b'beans\n'] 959 def make_file(): 960 f = tempfile.NamedTemporaryFile(mode='w+b') 961 f.write(b''.join(lines)) 962 f.seek(0) 963 return f 964 for i, l in enumerate(make_file()): 965 self.assertEqual(l, lines[i]) 966 self.assertEqual(i, len(lines) - 1) 967 968 def test_creates_named(self): 969 # NamedTemporaryFile creates files with names 970 f = tempfile.NamedTemporaryFile() 971 self.assertTrue(os.path.exists(f.name), 972 "NamedTemporaryFile %s does not exist" % f.name) 973 974 def test_del_on_close(self): 975 # A NamedTemporaryFile is deleted when closed 976 dir = tempfile.mkdtemp() 977 try: 978 with tempfile.NamedTemporaryFile(dir=dir) as f: 979 f.write(b'blat') 980 self.assertFalse(os.path.exists(f.name), 981 "NamedTemporaryFile %s exists after close" % f.name) 982 finally: 983 os.rmdir(dir) 984 985 def test_dis_del_on_close(self): 986 # Tests that delete-on-close can be disabled 987 dir = tempfile.mkdtemp() 988 tmp = None 989 try: 990 f = tempfile.NamedTemporaryFile(dir=dir, delete=False) 991 tmp = f.name 992 f.write(b'blat') 993 f.close() 994 self.assertTrue(os.path.exists(f.name), 995 "NamedTemporaryFile %s missing after close" % f.name) 996 finally: 997 if tmp is not None: 998 os.unlink(tmp) 999 os.rmdir(dir) 1000 1001 def test_multiple_close(self): 1002 # A NamedTemporaryFile can be closed many times without error 1003 f = tempfile.NamedTemporaryFile() 1004 f.write(b'abc\n') 1005 f.close() 1006 f.close() 1007 f.close() 1008 1009 def test_context_manager(self): 1010 # A NamedTemporaryFile can be used as a context manager 1011 with tempfile.NamedTemporaryFile() as f: 1012 self.assertTrue(os.path.exists(f.name)) 1013 self.assertFalse(os.path.exists(f.name)) 1014 def use_closed(): 1015 with f: 1016 pass 1017 self.assertRaises(ValueError, use_closed) 1018 1019 def test_no_leak_fd(self): 1020 # Issue #21058: don't leak file descriptor when io.open() fails 1021 closed = [] 1022 os_close = os.close 1023 def close(fd): 1024 closed.append(fd) 1025 os_close(fd) 1026 1027 with mock.patch('os.close', side_effect=close): 1028 with mock.patch('io.open', side_effect=ValueError): 1029 self.assertRaises(ValueError, tempfile.NamedTemporaryFile) 1030 self.assertEqual(len(closed), 1) 1031 1032 def test_bad_mode(self): 1033 dir = tempfile.mkdtemp() 1034 self.addCleanup(os_helper.rmtree, dir) 1035 with self.assertRaises(ValueError): 1036 tempfile.NamedTemporaryFile(mode='wr', dir=dir) 1037 with self.assertRaises(TypeError): 1038 tempfile.NamedTemporaryFile(mode=2, dir=dir) 1039 self.assertEqual(os.listdir(dir), []) 1040 1041 # How to test the mode and bufsize parameters? 1042 1043class TestSpooledTemporaryFile(BaseTestCase): 1044 """Test SpooledTemporaryFile().""" 1045 1046 def do_create(self, max_size=0, dir=None, pre="", suf=""): 1047 if dir is None: 1048 dir = tempfile.gettempdir() 1049 file = tempfile.SpooledTemporaryFile(max_size=max_size, dir=dir, prefix=pre, suffix=suf) 1050 1051 return file 1052 1053 1054 def test_basic(self): 1055 # SpooledTemporaryFile can create files 1056 f = self.do_create() 1057 self.assertFalse(f._rolled) 1058 f = self.do_create(max_size=100, pre="a", suf=".txt") 1059 self.assertFalse(f._rolled) 1060 1061 def test_del_on_close(self): 1062 # A SpooledTemporaryFile is deleted when closed 1063 dir = tempfile.mkdtemp() 1064 try: 1065 f = tempfile.SpooledTemporaryFile(max_size=10, dir=dir) 1066 self.assertFalse(f._rolled) 1067 f.write(b'blat ' * 5) 1068 self.assertTrue(f._rolled) 1069 filename = f.name 1070 f.close() 1071 self.assertFalse(isinstance(filename, str) and os.path.exists(filename), 1072 "SpooledTemporaryFile %s exists after close" % filename) 1073 finally: 1074 os.rmdir(dir) 1075 1076 def test_rewrite_small(self): 1077 # A SpooledTemporaryFile can be written to multiple within the max_size 1078 f = self.do_create(max_size=30) 1079 self.assertFalse(f._rolled) 1080 for i in range(5): 1081 f.seek(0, 0) 1082 f.write(b'x' * 20) 1083 self.assertFalse(f._rolled) 1084 1085 def test_write_sequential(self): 1086 # A SpooledTemporaryFile should hold exactly max_size bytes, and roll 1087 # over afterward 1088 f = self.do_create(max_size=30) 1089 self.assertFalse(f._rolled) 1090 f.write(b'x' * 20) 1091 self.assertFalse(f._rolled) 1092 f.write(b'x' * 10) 1093 self.assertFalse(f._rolled) 1094 f.write(b'x') 1095 self.assertTrue(f._rolled) 1096 1097 def test_writelines(self): 1098 # Verify writelines with a SpooledTemporaryFile 1099 f = self.do_create() 1100 f.writelines((b'x', b'y', b'z')) 1101 pos = f.seek(0) 1102 self.assertEqual(pos, 0) 1103 buf = f.read() 1104 self.assertEqual(buf, b'xyz') 1105 1106 def test_writelines_sequential(self): 1107 # A SpooledTemporaryFile should hold exactly max_size bytes, and roll 1108 # over afterward 1109 f = self.do_create(max_size=35) 1110 f.writelines((b'x' * 20, b'x' * 10, b'x' * 5)) 1111 self.assertFalse(f._rolled) 1112 f.write(b'x') 1113 self.assertTrue(f._rolled) 1114 1115 def test_sparse(self): 1116 # A SpooledTemporaryFile that is written late in the file will extend 1117 # when that occurs 1118 f = self.do_create(max_size=30) 1119 self.assertFalse(f._rolled) 1120 pos = f.seek(100, 0) 1121 self.assertEqual(pos, 100) 1122 self.assertFalse(f._rolled) 1123 f.write(b'x') 1124 self.assertTrue(f._rolled) 1125 1126 def test_fileno(self): 1127 # A SpooledTemporaryFile should roll over to a real file on fileno() 1128 f = self.do_create(max_size=30) 1129 self.assertFalse(f._rolled) 1130 self.assertTrue(f.fileno() > 0) 1131 self.assertTrue(f._rolled) 1132 1133 def test_multiple_close_before_rollover(self): 1134 # A SpooledTemporaryFile can be closed many times without error 1135 f = tempfile.SpooledTemporaryFile() 1136 f.write(b'abc\n') 1137 self.assertFalse(f._rolled) 1138 f.close() 1139 f.close() 1140 f.close() 1141 1142 def test_multiple_close_after_rollover(self): 1143 # A SpooledTemporaryFile can be closed many times without error 1144 f = tempfile.SpooledTemporaryFile(max_size=1) 1145 f.write(b'abc\n') 1146 self.assertTrue(f._rolled) 1147 f.close() 1148 f.close() 1149 f.close() 1150 1151 def test_bound_methods(self): 1152 # It should be OK to steal a bound method from a SpooledTemporaryFile 1153 # and use it independently; when the file rolls over, those bound 1154 # methods should continue to function 1155 f = self.do_create(max_size=30) 1156 read = f.read 1157 write = f.write 1158 seek = f.seek 1159 1160 write(b"a" * 35) 1161 write(b"b" * 35) 1162 seek(0, 0) 1163 self.assertEqual(read(70), b'a'*35 + b'b'*35) 1164 1165 def test_properties(self): 1166 f = tempfile.SpooledTemporaryFile(max_size=10) 1167 f.write(b'x' * 10) 1168 self.assertFalse(f._rolled) 1169 self.assertEqual(f.mode, 'w+b') 1170 self.assertIsNone(f.name) 1171 with self.assertRaises(AttributeError): 1172 f.newlines 1173 with self.assertRaises(AttributeError): 1174 f.encoding 1175 with self.assertRaises(AttributeError): 1176 f.errors 1177 1178 f.write(b'x') 1179 self.assertTrue(f._rolled) 1180 self.assertEqual(f.mode, 'rb+') 1181 self.assertIsNotNone(f.name) 1182 with self.assertRaises(AttributeError): 1183 f.newlines 1184 with self.assertRaises(AttributeError): 1185 f.encoding 1186 with self.assertRaises(AttributeError): 1187 f.errors 1188 1189 def test_text_mode(self): 1190 # Creating a SpooledTemporaryFile with a text mode should produce 1191 # a file object reading and writing (Unicode) text strings. 1192 f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10, 1193 encoding="utf-8") 1194 f.write("abc\n") 1195 f.seek(0) 1196 self.assertEqual(f.read(), "abc\n") 1197 f.write("def\n") 1198 f.seek(0) 1199 self.assertEqual(f.read(), "abc\ndef\n") 1200 self.assertFalse(f._rolled) 1201 self.assertEqual(f.mode, 'w+') 1202 self.assertIsNone(f.name) 1203 self.assertEqual(f.newlines, os.linesep) 1204 self.assertEqual(f.encoding, "utf-8") 1205 self.assertEqual(f.errors, "strict") 1206 1207 f.write("xyzzy\n") 1208 f.seek(0) 1209 self.assertEqual(f.read(), "abc\ndef\nxyzzy\n") 1210 # Check that Ctrl+Z doesn't truncate the file 1211 f.write("foo\x1abar\n") 1212 f.seek(0) 1213 self.assertEqual(f.read(), "abc\ndef\nxyzzy\nfoo\x1abar\n") 1214 self.assertTrue(f._rolled) 1215 self.assertEqual(f.mode, 'w+') 1216 self.assertIsNotNone(f.name) 1217 self.assertEqual(f.newlines, os.linesep) 1218 self.assertEqual(f.encoding, "utf-8") 1219 self.assertEqual(f.errors, "strict") 1220 1221 def test_text_newline_and_encoding(self): 1222 f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10, 1223 newline='', encoding='utf-8', 1224 errors='ignore') 1225 f.write("\u039B\r\n") 1226 f.seek(0) 1227 self.assertEqual(f.read(), "\u039B\r\n") 1228 self.assertFalse(f._rolled) 1229 self.assertEqual(f.mode, 'w+') 1230 self.assertIsNone(f.name) 1231 self.assertIsNotNone(f.newlines) 1232 self.assertEqual(f.encoding, "utf-8") 1233 self.assertEqual(f.errors, "ignore") 1234 1235 f.write("\u039C" * 10 + "\r\n") 1236 f.write("\u039D" * 20) 1237 f.seek(0) 1238 self.assertEqual(f.read(), 1239 "\u039B\r\n" + ("\u039C" * 10) + "\r\n" + ("\u039D" * 20)) 1240 self.assertTrue(f._rolled) 1241 self.assertEqual(f.mode, 'w+') 1242 self.assertIsNotNone(f.name) 1243 self.assertIsNotNone(f.newlines) 1244 self.assertEqual(f.encoding, 'utf-8') 1245 self.assertEqual(f.errors, 'ignore') 1246 1247 def test_context_manager_before_rollover(self): 1248 # A SpooledTemporaryFile can be used as a context manager 1249 with tempfile.SpooledTemporaryFile(max_size=1) as f: 1250 self.assertFalse(f._rolled) 1251 self.assertFalse(f.closed) 1252 self.assertTrue(f.closed) 1253 def use_closed(): 1254 with f: 1255 pass 1256 self.assertRaises(ValueError, use_closed) 1257 1258 def test_context_manager_during_rollover(self): 1259 # A SpooledTemporaryFile can be used as a context manager 1260 with tempfile.SpooledTemporaryFile(max_size=1) as f: 1261 self.assertFalse(f._rolled) 1262 f.write(b'abc\n') 1263 f.flush() 1264 self.assertTrue(f._rolled) 1265 self.assertFalse(f.closed) 1266 self.assertTrue(f.closed) 1267 def use_closed(): 1268 with f: 1269 pass 1270 self.assertRaises(ValueError, use_closed) 1271 1272 def test_context_manager_after_rollover(self): 1273 # A SpooledTemporaryFile can be used as a context manager 1274 f = tempfile.SpooledTemporaryFile(max_size=1) 1275 f.write(b'abc\n') 1276 f.flush() 1277 self.assertTrue(f._rolled) 1278 with f: 1279 self.assertFalse(f.closed) 1280 self.assertTrue(f.closed) 1281 def use_closed(): 1282 with f: 1283 pass 1284 self.assertRaises(ValueError, use_closed) 1285 1286 def test_truncate_with_size_parameter(self): 1287 # A SpooledTemporaryFile can be truncated to zero size 1288 f = tempfile.SpooledTemporaryFile(max_size=10) 1289 f.write(b'abcdefg\n') 1290 f.seek(0) 1291 f.truncate() 1292 self.assertFalse(f._rolled) 1293 self.assertEqual(f._file.getvalue(), b'') 1294 # A SpooledTemporaryFile can be truncated to a specific size 1295 f = tempfile.SpooledTemporaryFile(max_size=10) 1296 f.write(b'abcdefg\n') 1297 f.truncate(4) 1298 self.assertFalse(f._rolled) 1299 self.assertEqual(f._file.getvalue(), b'abcd') 1300 # A SpooledTemporaryFile rolls over if truncated to large size 1301 f = tempfile.SpooledTemporaryFile(max_size=10) 1302 f.write(b'abcdefg\n') 1303 f.truncate(20) 1304 self.assertTrue(f._rolled) 1305 self.assertEqual(os.fstat(f.fileno()).st_size, 20) 1306 1307 def test_class_getitem(self): 1308 self.assertIsInstance(tempfile.SpooledTemporaryFile[bytes], 1309 types.GenericAlias) 1310 1311if tempfile.NamedTemporaryFile is not tempfile.TemporaryFile: 1312 1313 class TestTemporaryFile(BaseTestCase): 1314 """Test TemporaryFile().""" 1315 1316 def test_basic(self): 1317 # TemporaryFile can create files 1318 # No point in testing the name params - the file has no name. 1319 tempfile.TemporaryFile() 1320 1321 def test_has_no_name(self): 1322 # TemporaryFile creates files with no names (on this system) 1323 dir = tempfile.mkdtemp() 1324 f = tempfile.TemporaryFile(dir=dir) 1325 f.write(b'blat') 1326 1327 # Sneaky: because this file has no name, it should not prevent 1328 # us from removing the directory it was created in. 1329 try: 1330 os.rmdir(dir) 1331 except: 1332 # cleanup 1333 f.close() 1334 os.rmdir(dir) 1335 raise 1336 1337 def test_multiple_close(self): 1338 # A TemporaryFile can be closed many times without error 1339 f = tempfile.TemporaryFile() 1340 f.write(b'abc\n') 1341 f.close() 1342 f.close() 1343 f.close() 1344 1345 # How to test the mode and bufsize parameters? 1346 def test_mode_and_encoding(self): 1347 1348 def roundtrip(input, *args, **kwargs): 1349 with tempfile.TemporaryFile(*args, **kwargs) as fileobj: 1350 fileobj.write(input) 1351 fileobj.seek(0) 1352 self.assertEqual(input, fileobj.read()) 1353 1354 roundtrip(b"1234", "w+b") 1355 roundtrip("abdc\n", "w+") 1356 roundtrip("\u039B", "w+", encoding="utf-16") 1357 roundtrip("foo\r\n", "w+", newline="") 1358 1359 def test_no_leak_fd(self): 1360 # Issue #21058: don't leak file descriptor when io.open() fails 1361 closed = [] 1362 os_close = os.close 1363 def close(fd): 1364 closed.append(fd) 1365 os_close(fd) 1366 1367 with mock.patch('os.close', side_effect=close): 1368 with mock.patch('io.open', side_effect=ValueError): 1369 self.assertRaises(ValueError, tempfile.TemporaryFile) 1370 self.assertEqual(len(closed), 1) 1371 1372 1373 1374# Helper for test_del_on_shutdown 1375class NulledModules: 1376 def __init__(self, *modules): 1377 self.refs = [mod.__dict__ for mod in modules] 1378 self.contents = [ref.copy() for ref in self.refs] 1379 1380 def __enter__(self): 1381 for d in self.refs: 1382 for key in d: 1383 d[key] = None 1384 1385 def __exit__(self, *exc_info): 1386 for d, c in zip(self.refs, self.contents): 1387 d.clear() 1388 d.update(c) 1389 1390 1391class TestTemporaryDirectory(BaseTestCase): 1392 """Test TemporaryDirectory().""" 1393 1394 def do_create(self, dir=None, pre="", suf="", recurse=1, dirs=1, files=1, 1395 ignore_cleanup_errors=False): 1396 if dir is None: 1397 dir = tempfile.gettempdir() 1398 tmp = tempfile.TemporaryDirectory( 1399 dir=dir, prefix=pre, suffix=suf, 1400 ignore_cleanup_errors=ignore_cleanup_errors) 1401 self.nameCheck(tmp.name, dir, pre, suf) 1402 self.do_create2(tmp.name, recurse, dirs, files) 1403 return tmp 1404 1405 def do_create2(self, path, recurse=1, dirs=1, files=1): 1406 # Create subdirectories and some files 1407 if recurse: 1408 for i in range(dirs): 1409 name = os.path.join(path, "dir%d" % i) 1410 os.mkdir(name) 1411 self.do_create2(name, recurse-1, dirs, files) 1412 for i in range(files): 1413 with open(os.path.join(path, "test%d.txt" % i), "wb") as f: 1414 f.write(b"Hello world!") 1415 1416 def test_mkdtemp_failure(self): 1417 # Check no additional exception if mkdtemp fails 1418 # Previously would raise AttributeError instead 1419 # (noted as part of Issue #10188) 1420 with tempfile.TemporaryDirectory() as nonexistent: 1421 pass 1422 with self.assertRaises(FileNotFoundError) as cm: 1423 tempfile.TemporaryDirectory(dir=nonexistent) 1424 self.assertEqual(cm.exception.errno, errno.ENOENT) 1425 1426 def test_explicit_cleanup(self): 1427 # A TemporaryDirectory is deleted when cleaned up 1428 dir = tempfile.mkdtemp() 1429 try: 1430 d = self.do_create(dir=dir) 1431 self.assertTrue(os.path.exists(d.name), 1432 "TemporaryDirectory %s does not exist" % d.name) 1433 d.cleanup() 1434 self.assertFalse(os.path.exists(d.name), 1435 "TemporaryDirectory %s exists after cleanup" % d.name) 1436 finally: 1437 os.rmdir(dir) 1438 1439 def test_explict_cleanup_ignore_errors(self): 1440 """Test that cleanup doesn't return an error when ignoring them.""" 1441 with tempfile.TemporaryDirectory() as working_dir: 1442 temp_dir = self.do_create( 1443 dir=working_dir, ignore_cleanup_errors=True) 1444 temp_path = pathlib.Path(temp_dir.name) 1445 self.assertTrue(temp_path.exists(), 1446 f"TemporaryDirectory {temp_path!s} does not exist") 1447 with open(temp_path / "a_file.txt", "w+t") as open_file: 1448 open_file.write("Hello world!\n") 1449 temp_dir.cleanup() 1450 self.assertEqual(len(list(temp_path.glob("*"))), 1451 int(sys.platform.startswith("win")), 1452 "Unexpected number of files in " 1453 f"TemporaryDirectory {temp_path!s}") 1454 self.assertEqual( 1455 temp_path.exists(), 1456 sys.platform.startswith("win"), 1457 f"TemporaryDirectory {temp_path!s} existence state unexpected") 1458 temp_dir.cleanup() 1459 self.assertFalse( 1460 temp_path.exists(), 1461 f"TemporaryDirectory {temp_path!s} exists after cleanup") 1462 1463 @os_helper.skip_unless_symlink 1464 def test_cleanup_with_symlink_to_a_directory(self): 1465 # cleanup() should not follow symlinks to directories (issue #12464) 1466 d1 = self.do_create() 1467 d2 = self.do_create(recurse=0) 1468 1469 # Symlink d1/foo -> d2 1470 os.symlink(d2.name, os.path.join(d1.name, "foo")) 1471 1472 # This call to cleanup() should not follow the "foo" symlink 1473 d1.cleanup() 1474 1475 self.assertFalse(os.path.exists(d1.name), 1476 "TemporaryDirectory %s exists after cleanup" % d1.name) 1477 self.assertTrue(os.path.exists(d2.name), 1478 "Directory pointed to by a symlink was deleted") 1479 self.assertEqual(os.listdir(d2.name), ['test0.txt'], 1480 "Contents of the directory pointed to by a symlink " 1481 "were deleted") 1482 d2.cleanup() 1483 1484 @support.cpython_only 1485 def test_del_on_collection(self): 1486 # A TemporaryDirectory is deleted when garbage collected 1487 dir = tempfile.mkdtemp() 1488 try: 1489 d = self.do_create(dir=dir) 1490 name = d.name 1491 del d # Rely on refcounting to invoke __del__ 1492 self.assertFalse(os.path.exists(name), 1493 "TemporaryDirectory %s exists after __del__" % name) 1494 finally: 1495 os.rmdir(dir) 1496 1497 @support.cpython_only 1498 def test_del_on_collection_ignore_errors(self): 1499 """Test that ignoring errors works when TemporaryDirectory is gced.""" 1500 with tempfile.TemporaryDirectory() as working_dir: 1501 temp_dir = self.do_create( 1502 dir=working_dir, ignore_cleanup_errors=True) 1503 temp_path = pathlib.Path(temp_dir.name) 1504 self.assertTrue(temp_path.exists(), 1505 f"TemporaryDirectory {temp_path!s} does not exist") 1506 with open(temp_path / "a_file.txt", "w+t") as open_file: 1507 open_file.write("Hello world!\n") 1508 del temp_dir 1509 self.assertEqual(len(list(temp_path.glob("*"))), 1510 int(sys.platform.startswith("win")), 1511 "Unexpected number of files in " 1512 f"TemporaryDirectory {temp_path!s}") 1513 self.assertEqual( 1514 temp_path.exists(), 1515 sys.platform.startswith("win"), 1516 f"TemporaryDirectory {temp_path!s} existence state unexpected") 1517 1518 def test_del_on_shutdown(self): 1519 # A TemporaryDirectory may be cleaned up during shutdown 1520 with self.do_create() as dir: 1521 for mod in ('builtins', 'os', 'shutil', 'sys', 'tempfile', 'warnings'): 1522 code = """if True: 1523 import builtins 1524 import os 1525 import shutil 1526 import sys 1527 import tempfile 1528 import warnings 1529 1530 tmp = tempfile.TemporaryDirectory(dir={dir!r}) 1531 sys.stdout.buffer.write(tmp.name.encode()) 1532 1533 tmp2 = os.path.join(tmp.name, 'test_dir') 1534 os.mkdir(tmp2) 1535 with open(os.path.join(tmp2, "test0.txt"), "w") as f: 1536 f.write("Hello world!") 1537 1538 {mod}.tmp = tmp 1539 1540 warnings.filterwarnings("always", category=ResourceWarning) 1541 """.format(dir=dir, mod=mod) 1542 rc, out, err = script_helper.assert_python_ok("-c", code) 1543 tmp_name = out.decode().strip() 1544 self.assertFalse(os.path.exists(tmp_name), 1545 "TemporaryDirectory %s exists after cleanup" % tmp_name) 1546 err = err.decode('utf-8', 'backslashreplace') 1547 self.assertNotIn("Exception ", err) 1548 self.assertIn("ResourceWarning: Implicitly cleaning up", err) 1549 1550 def test_del_on_shutdown_ignore_errors(self): 1551 """Test ignoring errors works when a tempdir is gc'ed on shutdown.""" 1552 with tempfile.TemporaryDirectory() as working_dir: 1553 code = """if True: 1554 import pathlib 1555 import sys 1556 import tempfile 1557 import warnings 1558 1559 temp_dir = tempfile.TemporaryDirectory( 1560 dir={working_dir!r}, ignore_cleanup_errors=True) 1561 sys.stdout.buffer.write(temp_dir.name.encode()) 1562 1563 temp_dir_2 = pathlib.Path(temp_dir.name) / "test_dir" 1564 temp_dir_2.mkdir() 1565 with open(temp_dir_2 / "test0.txt", "w") as test_file: 1566 test_file.write("Hello world!") 1567 open_file = open(temp_dir_2 / "open_file.txt", "w") 1568 open_file.write("Hello world!") 1569 1570 warnings.filterwarnings("always", category=ResourceWarning) 1571 """.format(working_dir=working_dir) 1572 __, out, err = script_helper.assert_python_ok("-c", code) 1573 temp_path = pathlib.Path(out.decode().strip()) 1574 self.assertEqual(len(list(temp_path.glob("*"))), 1575 int(sys.platform.startswith("win")), 1576 "Unexpected number of files in " 1577 f"TemporaryDirectory {temp_path!s}") 1578 self.assertEqual( 1579 temp_path.exists(), 1580 sys.platform.startswith("win"), 1581 f"TemporaryDirectory {temp_path!s} existence state unexpected") 1582 err = err.decode('utf-8', 'backslashreplace') 1583 self.assertNotIn("Exception", err) 1584 self.assertNotIn("Error", err) 1585 self.assertIn("ResourceWarning: Implicitly cleaning up", err) 1586 1587 def test_exit_on_shutdown(self): 1588 # Issue #22427 1589 with self.do_create() as dir: 1590 code = """if True: 1591 import sys 1592 import tempfile 1593 import warnings 1594 1595 def generator(): 1596 with tempfile.TemporaryDirectory(dir={dir!r}) as tmp: 1597 yield tmp 1598 g = generator() 1599 sys.stdout.buffer.write(next(g).encode()) 1600 1601 warnings.filterwarnings("always", category=ResourceWarning) 1602 """.format(dir=dir) 1603 rc, out, err = script_helper.assert_python_ok("-c", code) 1604 tmp_name = out.decode().strip() 1605 self.assertFalse(os.path.exists(tmp_name), 1606 "TemporaryDirectory %s exists after cleanup" % tmp_name) 1607 err = err.decode('utf-8', 'backslashreplace') 1608 self.assertNotIn("Exception ", err) 1609 self.assertIn("ResourceWarning: Implicitly cleaning up", err) 1610 1611 def test_warnings_on_cleanup(self): 1612 # ResourceWarning will be triggered by __del__ 1613 with self.do_create() as dir: 1614 d = self.do_create(dir=dir, recurse=3) 1615 name = d.name 1616 1617 # Check for the resource warning 1618 with warnings_helper.check_warnings(('Implicitly', 1619 ResourceWarning), 1620 quiet=False): 1621 warnings.filterwarnings("always", category=ResourceWarning) 1622 del d 1623 support.gc_collect() 1624 self.assertFalse(os.path.exists(name), 1625 "TemporaryDirectory %s exists after __del__" % name) 1626 1627 def test_multiple_close(self): 1628 # Can be cleaned-up many times without error 1629 d = self.do_create() 1630 d.cleanup() 1631 d.cleanup() 1632 d.cleanup() 1633 1634 def test_context_manager(self): 1635 # Can be used as a context manager 1636 d = self.do_create() 1637 with d as name: 1638 self.assertTrue(os.path.exists(name)) 1639 self.assertEqual(name, d.name) 1640 self.assertFalse(os.path.exists(name)) 1641 1642 def test_modes(self): 1643 for mode in range(8): 1644 mode <<= 6 1645 with self.subTest(mode=format(mode, '03o')): 1646 d = self.do_create(recurse=3, dirs=2, files=2) 1647 with d: 1648 # Change files and directories mode recursively. 1649 for root, dirs, files in os.walk(d.name, topdown=False): 1650 for name in files: 1651 os.chmod(os.path.join(root, name), mode) 1652 os.chmod(root, mode) 1653 d.cleanup() 1654 self.assertFalse(os.path.exists(d.name)) 1655 1656 @unittest.skipUnless(hasattr(os, 'chflags'), 'requires os.lchflags') 1657 def test_flags(self): 1658 flags = stat.UF_IMMUTABLE | stat.UF_NOUNLINK 1659 d = self.do_create(recurse=3, dirs=2, files=2) 1660 with d: 1661 # Change files and directories flags recursively. 1662 for root, dirs, files in os.walk(d.name, topdown=False): 1663 for name in files: 1664 os.chflags(os.path.join(root, name), flags) 1665 os.chflags(root, flags) 1666 d.cleanup() 1667 self.assertFalse(os.path.exists(d.name)) 1668 1669 1670if __name__ == "__main__": 1671 unittest.main() 1672