1# tempfile.py unit tests.
2import tempfile
3import errno
4import io
5import os
6import pathlib
7import signal
8import sys
9import re
10import warnings
11import contextlib
12import stat
13import weakref
14from unittest import mock
15
16import unittest
17from test import support
18from test.support import script_helper
19
20
21has_textmode = (tempfile._text_openflags != tempfile._bin_openflags)
22has_spawnl = hasattr(os, 'spawnl')
23
24# TEST_FILES may need to be tweaked for systems depending on the maximum
25# number of files that can be opened at one time (see ulimit -n)
26if sys.platform.startswith('openbsd'):
27    TEST_FILES = 48
28else:
29    TEST_FILES = 100
30
31# This is organized as one test for each chunk of code in tempfile.py,
32# in order of their appearance in the file.  Testing which requires
33# threads is not done here.
34
35class TestLowLevelInternals(unittest.TestCase):
36    def test_infer_return_type_singles(self):
37        self.assertIs(str, tempfile._infer_return_type(''))
38        self.assertIs(bytes, tempfile._infer_return_type(b''))
39        self.assertIs(str, tempfile._infer_return_type(None))
40
41    def test_infer_return_type_multiples(self):
42        self.assertIs(str, tempfile._infer_return_type('', ''))
43        self.assertIs(bytes, tempfile._infer_return_type(b'', b''))
44        with self.assertRaises(TypeError):
45            tempfile._infer_return_type('', b'')
46        with self.assertRaises(TypeError):
47            tempfile._infer_return_type(b'', '')
48
49    def test_infer_return_type_multiples_and_none(self):
50        self.assertIs(str, tempfile._infer_return_type(None, ''))
51        self.assertIs(str, tempfile._infer_return_type('', None))
52        self.assertIs(str, tempfile._infer_return_type(None, None))
53        self.assertIs(bytes, tempfile._infer_return_type(b'', None))
54        self.assertIs(bytes, tempfile._infer_return_type(None, b''))
55        with self.assertRaises(TypeError):
56            tempfile._infer_return_type('', None, b'')
57        with self.assertRaises(TypeError):
58            tempfile._infer_return_type(b'', None, '')
59
60    def test_infer_return_type_pathlib(self):
61        self.assertIs(str, tempfile._infer_return_type(pathlib.Path('/')))
62
63
64# Common functionality.
65
66class BaseTestCase(unittest.TestCase):
67
68    str_check = re.compile(r"^[a-z0-9_-]{8}$")
69    b_check = re.compile(br"^[a-z0-9_-]{8}$")
70
71    def setUp(self):
72        self._warnings_manager = support.check_warnings()
73        self._warnings_manager.__enter__()
74        warnings.filterwarnings("ignore", category=RuntimeWarning,
75                                message="mktemp", module=__name__)
76
77    def tearDown(self):
78        self._warnings_manager.__exit__(None, None, None)
79
80    def nameCheck(self, name, dir, pre, suf):
81        (ndir, nbase) = os.path.split(name)
82        npre  = nbase[:len(pre)]
83        nsuf  = nbase[len(nbase)-len(suf):]
84
85        if dir is not None:
86            self.assertIs(
87                type(name),
88                str
89                if type(dir) is str or isinstance(dir, os.PathLike) else
90                bytes,
91                "unexpected return type",
92            )
93        if pre is not None:
94            self.assertIs(type(name), str if type(pre) is str else bytes,
95                          "unexpected return type")
96        if suf is not None:
97            self.assertIs(type(name), str if type(suf) is str else bytes,
98                          "unexpected return type")
99        if (dir, pre, suf) == (None, None, None):
100            self.assertIs(type(name), str, "default return type must be str")
101
102        # check for equality of the absolute paths!
103        self.assertEqual(os.path.abspath(ndir), os.path.abspath(dir),
104                         "file %r not in directory %r" % (name, dir))
105        self.assertEqual(npre, pre,
106                         "file %r does not begin with %r" % (nbase, pre))
107        self.assertEqual(nsuf, suf,
108                         "file %r does not end with %r" % (nbase, suf))
109
110        nbase = nbase[len(pre):len(nbase)-len(suf)]
111        check = self.str_check if isinstance(nbase, str) else self.b_check
112        self.assertTrue(check.match(nbase),
113                        "random characters %r do not match %r"
114                        % (nbase, check.pattern))
115
116
117class TestExports(BaseTestCase):
118    def test_exports(self):
119        # There are no surprising symbols in the tempfile module
120        dict = tempfile.__dict__
121
122        expected = {
123            "NamedTemporaryFile" : 1,
124            "TemporaryFile" : 1,
125            "mkstemp" : 1,
126            "mkdtemp" : 1,
127            "mktemp" : 1,
128            "TMP_MAX" : 1,
129            "gettempprefix" : 1,
130            "gettempprefixb" : 1,
131            "gettempdir" : 1,
132            "gettempdirb" : 1,
133            "tempdir" : 1,
134            "template" : 1,
135            "SpooledTemporaryFile" : 1,
136            "TemporaryDirectory" : 1,
137        }
138
139        unexp = []
140        for key in dict:
141            if key[0] != '_' and key not in expected:
142                unexp.append(key)
143        self.assertTrue(len(unexp) == 0,
144                        "unexpected keys: %s" % unexp)
145
146
147class TestRandomNameSequence(BaseTestCase):
148    """Test the internal iterator object _RandomNameSequence."""
149
150    def setUp(self):
151        self.r = tempfile._RandomNameSequence()
152        super().setUp()
153
154    def test_get_six_char_str(self):
155        # _RandomNameSequence returns a six-character string
156        s = next(self.r)
157        self.nameCheck(s, '', '', '')
158
159    def test_many(self):
160        # _RandomNameSequence returns no duplicate strings (stochastic)
161
162        dict = {}
163        r = self.r
164        for i in range(TEST_FILES):
165            s = next(r)
166            self.nameCheck(s, '', '', '')
167            self.assertNotIn(s, dict)
168            dict[s] = 1
169
170    def supports_iter(self):
171        # _RandomNameSequence supports the iterator protocol
172
173        i = 0
174        r = self.r
175        for s in r:
176            i += 1
177            if i == 20:
178                break
179
180    @unittest.skipUnless(hasattr(os, 'fork'),
181        "os.fork is required for this test")
182    def test_process_awareness(self):
183        # ensure that the random source differs between
184        # child and parent.
185        read_fd, write_fd = os.pipe()
186        pid = None
187        try:
188            pid = os.fork()
189            if not pid:
190                # child process
191                os.close(read_fd)
192                os.write(write_fd, next(self.r).encode("ascii"))
193                os.close(write_fd)
194                # bypass the normal exit handlers- leave those to
195                # the parent.
196                os._exit(0)
197
198            # parent process
199            parent_value = next(self.r)
200            child_value = os.read(read_fd, len(parent_value)).decode("ascii")
201        finally:
202            if pid:
203                # best effort to ensure the process can't bleed out
204                # via any bugs above
205                try:
206                    os.kill(pid, signal.SIGKILL)
207                except OSError:
208                    pass
209
210                # Read the process exit status to avoid zombie process
211                os.waitpid(pid, 0)
212
213            os.close(read_fd)
214            os.close(write_fd)
215        self.assertNotEqual(child_value, parent_value)
216
217
218
219class TestCandidateTempdirList(BaseTestCase):
220    """Test the internal function _candidate_tempdir_list."""
221
222    def test_nonempty_list(self):
223        # _candidate_tempdir_list returns a nonempty list of strings
224
225        cand = tempfile._candidate_tempdir_list()
226
227        self.assertFalse(len(cand) == 0)
228        for c in cand:
229            self.assertIsInstance(c, str)
230
231    def test_wanted_dirs(self):
232        # _candidate_tempdir_list contains the expected directories
233
234        # Make sure the interesting environment variables are all set.
235        with support.EnvironmentVarGuard() as env:
236            for envname in 'TMPDIR', 'TEMP', 'TMP':
237                dirname = os.getenv(envname)
238                if not dirname:
239                    env[envname] = os.path.abspath(envname)
240
241            cand = tempfile._candidate_tempdir_list()
242
243            for envname in 'TMPDIR', 'TEMP', 'TMP':
244                dirname = os.getenv(envname)
245                if not dirname: raise ValueError
246                self.assertIn(dirname, cand)
247
248            try:
249                dirname = os.getcwd()
250            except (AttributeError, OSError):
251                dirname = os.curdir
252
253            self.assertIn(dirname, cand)
254
255            # Not practical to try to verify the presence of OS-specific
256            # paths in this list.
257
258
259# We test _get_default_tempdir some more by testing gettempdir.
260
261class TestGetDefaultTempdir(BaseTestCase):
262    """Test _get_default_tempdir()."""
263
264    def test_no_files_left_behind(self):
265        # use a private empty directory
266        with tempfile.TemporaryDirectory() as our_temp_directory:
267            # force _get_default_tempdir() to consider our empty directory
268            def our_candidate_list():
269                return [our_temp_directory]
270
271            with support.swap_attr(tempfile, "_candidate_tempdir_list",
272                                   our_candidate_list):
273                # verify our directory is empty after _get_default_tempdir()
274                tempfile._get_default_tempdir()
275                self.assertEqual(os.listdir(our_temp_directory), [])
276
277                def raise_OSError(*args, **kwargs):
278                    raise OSError()
279
280                with support.swap_attr(io, "open", raise_OSError):
281                    # test again with failing io.open()
282                    with self.assertRaises(FileNotFoundError):
283                        tempfile._get_default_tempdir()
284                    self.assertEqual(os.listdir(our_temp_directory), [])
285
286                def bad_writer(*args, **kwargs):
287                    fp = orig_open(*args, **kwargs)
288                    fp.write = raise_OSError
289                    return fp
290
291                with support.swap_attr(io, "open", bad_writer) as orig_open:
292                    # test again with failing write()
293                    with self.assertRaises(FileNotFoundError):
294                        tempfile._get_default_tempdir()
295                    self.assertEqual(os.listdir(our_temp_directory), [])
296
297
298class TestGetCandidateNames(BaseTestCase):
299    """Test the internal function _get_candidate_names."""
300
301    def test_retval(self):
302        # _get_candidate_names returns a _RandomNameSequence object
303        obj = tempfile._get_candidate_names()
304        self.assertIsInstance(obj, tempfile._RandomNameSequence)
305
306    def test_same_thing(self):
307        # _get_candidate_names always returns the same object
308        a = tempfile._get_candidate_names()
309        b = tempfile._get_candidate_names()
310
311        self.assertTrue(a is b)
312
313
314@contextlib.contextmanager
315def _inside_empty_temp_dir():
316    dir = tempfile.mkdtemp()
317    try:
318        with support.swap_attr(tempfile, 'tempdir', dir):
319            yield
320    finally:
321        support.rmtree(dir)
322
323
324def _mock_candidate_names(*names):
325    return support.swap_attr(tempfile,
326                             '_get_candidate_names',
327                             lambda: iter(names))
328
329
330class TestBadTempdir:
331
332    def test_read_only_directory(self):
333        with _inside_empty_temp_dir():
334            oldmode = mode = os.stat(tempfile.tempdir).st_mode
335            mode &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
336            os.chmod(tempfile.tempdir, mode)
337            try:
338                if os.access(tempfile.tempdir, os.W_OK):
339                    self.skipTest("can't set the directory read-only")
340                with self.assertRaises(PermissionError):
341                    self.make_temp()
342                self.assertEqual(os.listdir(tempfile.tempdir), [])
343            finally:
344                os.chmod(tempfile.tempdir, oldmode)
345
346    def test_nonexisting_directory(self):
347        with _inside_empty_temp_dir():
348            tempdir = os.path.join(tempfile.tempdir, 'nonexistent')
349            with support.swap_attr(tempfile, 'tempdir', tempdir):
350                with self.assertRaises(FileNotFoundError):
351                    self.make_temp()
352
353    def test_non_directory(self):
354        with _inside_empty_temp_dir():
355            tempdir = os.path.join(tempfile.tempdir, 'file')
356            open(tempdir, 'wb').close()
357            with support.swap_attr(tempfile, 'tempdir', tempdir):
358                with self.assertRaises((NotADirectoryError, FileNotFoundError)):
359                    self.make_temp()
360
361
362class TestMkstempInner(TestBadTempdir, BaseTestCase):
363    """Test the internal function _mkstemp_inner."""
364
365    class mkstemped:
366        _bflags = tempfile._bin_openflags
367        _tflags = tempfile._text_openflags
368        _close = os.close
369        _unlink = os.unlink
370
371        def __init__(self, dir, pre, suf, bin):
372            if bin: flags = self._bflags
373            else:   flags = self._tflags
374
375            output_type = tempfile._infer_return_type(dir, pre, suf)
376            (self.fd, self.name) = tempfile._mkstemp_inner(dir, pre, suf, flags, output_type)
377
378        def write(self, str):
379            os.write(self.fd, str)
380
381        def __del__(self):
382            self._close(self.fd)
383            self._unlink(self.name)
384
385    def do_create(self, dir=None, pre=None, suf=None, bin=1):
386        output_type = tempfile._infer_return_type(dir, pre, suf)
387        if dir is None:
388            if output_type is str:
389                dir = tempfile.gettempdir()
390            else:
391                dir = tempfile.gettempdirb()
392        if pre is None:
393            pre = output_type()
394        if suf is None:
395            suf = output_type()
396        file = self.mkstemped(dir, pre, suf, bin)
397
398        self.nameCheck(file.name, dir, pre, suf)
399        return file
400
401    def test_basic(self):
402        # _mkstemp_inner can create files
403        self.do_create().write(b"blat")
404        self.do_create(pre="a").write(b"blat")
405        self.do_create(suf="b").write(b"blat")
406        self.do_create(pre="a", suf="b").write(b"blat")
407        self.do_create(pre="aa", suf=".txt").write(b"blat")
408
409    def test_basic_with_bytes_names(self):
410        # _mkstemp_inner can create files when given name parts all
411        # specified as bytes.
412        dir_b = tempfile.gettempdirb()
413        self.do_create(dir=dir_b, suf=b"").write(b"blat")
414        self.do_create(dir=dir_b, pre=b"a").write(b"blat")
415        self.do_create(dir=dir_b, suf=b"b").write(b"blat")
416        self.do_create(dir=dir_b, pre=b"a", suf=b"b").write(b"blat")
417        self.do_create(dir=dir_b, pre=b"aa", suf=b".txt").write(b"blat")
418        # Can't mix str & binary types in the args.
419        with self.assertRaises(TypeError):
420            self.do_create(dir="", suf=b"").write(b"blat")
421        with self.assertRaises(TypeError):
422            self.do_create(dir=dir_b, pre="").write(b"blat")
423        with self.assertRaises(TypeError):
424            self.do_create(dir=dir_b, pre=b"", suf="").write(b"blat")
425
426    def test_basic_many(self):
427        # _mkstemp_inner can create many files (stochastic)
428        extant = list(range(TEST_FILES))
429        for i in extant:
430            extant[i] = self.do_create(pre="aa")
431
432    def test_choose_directory(self):
433        # _mkstemp_inner can create files in a user-selected directory
434        dir = tempfile.mkdtemp()
435        try:
436            self.do_create(dir=dir).write(b"blat")
437            self.do_create(dir=pathlib.Path(dir)).write(b"blat")
438        finally:
439            os.rmdir(dir)
440
441    def test_file_mode(self):
442        # _mkstemp_inner creates files with the proper mode
443
444        file = self.do_create()
445        mode = stat.S_IMODE(os.stat(file.name).st_mode)
446        expected = 0o600
447        if sys.platform == 'win32':
448            # There's no distinction among 'user', 'group' and 'world';
449            # replicate the 'user' bits.
450            user = expected >> 6
451            expected = user * (1 + 8 + 64)
452        self.assertEqual(mode, expected)
453
454    @unittest.skipUnless(has_spawnl, 'os.spawnl not available')
455    def test_noinherit(self):
456        # _mkstemp_inner file handles are not inherited by child processes
457
458        if support.verbose:
459            v="v"
460        else:
461            v="q"
462
463        file = self.do_create()
464        self.assertEqual(os.get_inheritable(file.fd), False)
465        fd = "%d" % file.fd
466
467        try:
468            me = __file__
469        except NameError:
470            me = sys.argv[0]
471
472        # We have to exec something, so that FD_CLOEXEC will take
473        # effect.  The core of this test is therefore in
474        # tf_inherit_check.py, which see.
475        tester = os.path.join(os.path.dirname(os.path.abspath(me)),
476                              "tf_inherit_check.py")
477
478        # On Windows a spawn* /path/ with embedded spaces shouldn't be quoted,
479        # but an arg with embedded spaces should be decorated with double
480        # quotes on each end
481        if sys.platform == 'win32':
482            decorated = '"%s"' % sys.executable
483            tester = '"%s"' % tester
484        else:
485            decorated = sys.executable
486
487        retval = os.spawnl(os.P_WAIT, sys.executable, decorated, tester, v, fd)
488        self.assertFalse(retval < 0,
489                    "child process caught fatal signal %d" % -retval)
490        self.assertFalse(retval > 0, "child process reports failure %d"%retval)
491
492    @unittest.skipUnless(has_textmode, "text mode not available")
493    def test_textmode(self):
494        # _mkstemp_inner can create files in text mode
495
496        # A text file is truncated at the first Ctrl+Z byte
497        f = self.do_create(bin=0)
498        f.write(b"blat\x1a")
499        f.write(b"extra\n")
500        os.lseek(f.fd, 0, os.SEEK_SET)
501        self.assertEqual(os.read(f.fd, 20), b"blat")
502
503    def make_temp(self):
504        return tempfile._mkstemp_inner(tempfile.gettempdir(),
505                                       tempfile.gettempprefix(),
506                                       '',
507                                       tempfile._bin_openflags,
508                                       str)
509
510    def test_collision_with_existing_file(self):
511        # _mkstemp_inner tries another name when a file with
512        # the chosen name already exists
513        with _inside_empty_temp_dir(), \
514             _mock_candidate_names('aaa', 'aaa', 'bbb'):
515            (fd1, name1) = self.make_temp()
516            os.close(fd1)
517            self.assertTrue(name1.endswith('aaa'))
518
519            (fd2, name2) = self.make_temp()
520            os.close(fd2)
521            self.assertTrue(name2.endswith('bbb'))
522
523    def test_collision_with_existing_directory(self):
524        # _mkstemp_inner tries another name when a directory with
525        # the chosen name already exists
526        with _inside_empty_temp_dir(), \
527             _mock_candidate_names('aaa', 'aaa', 'bbb'):
528            dir = tempfile.mkdtemp()
529            self.assertTrue(dir.endswith('aaa'))
530
531            (fd, name) = self.make_temp()
532            os.close(fd)
533            self.assertTrue(name.endswith('bbb'))
534
535
536class TestGetTempPrefix(BaseTestCase):
537    """Test gettempprefix()."""
538
539    def test_sane_template(self):
540        # gettempprefix returns a nonempty prefix string
541        p = tempfile.gettempprefix()
542
543        self.assertIsInstance(p, str)
544        self.assertGreater(len(p), 0)
545
546        pb = tempfile.gettempprefixb()
547
548        self.assertIsInstance(pb, bytes)
549        self.assertGreater(len(pb), 0)
550
551    def test_usable_template(self):
552        # gettempprefix returns a usable prefix string
553
554        # Create a temp directory, avoiding use of the prefix.
555        # Then attempt to create a file whose name is
556        # prefix + 'xxxxxx.xxx' in that directory.
557        p = tempfile.gettempprefix() + "xxxxxx.xxx"
558        d = tempfile.mkdtemp(prefix="")
559        try:
560            p = os.path.join(d, p)
561            fd = os.open(p, os.O_RDWR | os.O_CREAT)
562            os.close(fd)
563            os.unlink(p)
564        finally:
565            os.rmdir(d)
566
567
568class TestGetTempDir(BaseTestCase):
569    """Test gettempdir()."""
570
571    def test_directory_exists(self):
572        # gettempdir returns a directory which exists
573
574        for d in (tempfile.gettempdir(), tempfile.gettempdirb()):
575            self.assertTrue(os.path.isabs(d) or d == os.curdir,
576                            "%r is not an absolute path" % d)
577            self.assertTrue(os.path.isdir(d),
578                            "%r is not a directory" % d)
579
580    def test_directory_writable(self):
581        # gettempdir returns a directory writable by the user
582
583        # sneaky: just instantiate a NamedTemporaryFile, which
584        # defaults to writing into the directory returned by
585        # gettempdir.
586        with tempfile.NamedTemporaryFile() as file:
587            file.write(b"blat")
588
589    def test_same_thing(self):
590        # gettempdir always returns the same object
591        a = tempfile.gettempdir()
592        b = tempfile.gettempdir()
593        c = tempfile.gettempdirb()
594
595        self.assertTrue(a is b)
596        self.assertNotEqual(type(a), type(c))
597        self.assertEqual(a, os.fsdecode(c))
598
599    def test_case_sensitive(self):
600        # gettempdir should not flatten its case
601        # even on a case-insensitive file system
602        case_sensitive_tempdir = tempfile.mkdtemp("-Temp")
603        _tempdir, tempfile.tempdir = tempfile.tempdir, None
604        try:
605            with support.EnvironmentVarGuard() as env:
606                # Fake the first env var which is checked as a candidate
607                env["TMPDIR"] = case_sensitive_tempdir
608                self.assertEqual(tempfile.gettempdir(), case_sensitive_tempdir)
609        finally:
610            tempfile.tempdir = _tempdir
611            support.rmdir(case_sensitive_tempdir)
612
613
614class TestMkstemp(BaseTestCase):
615    """Test mkstemp()."""
616
617    def do_create(self, dir=None, pre=None, suf=None):
618        output_type = tempfile._infer_return_type(dir, pre, suf)
619        if dir is None:
620            if output_type is str:
621                dir = tempfile.gettempdir()
622            else:
623                dir = tempfile.gettempdirb()
624        if pre is None:
625            pre = output_type()
626        if suf is None:
627            suf = output_type()
628        (fd, name) = tempfile.mkstemp(dir=dir, prefix=pre, suffix=suf)
629        (ndir, nbase) = os.path.split(name)
630        adir = os.path.abspath(dir)
631        self.assertEqual(adir, ndir,
632            "Directory '%s' incorrectly returned as '%s'" % (adir, ndir))
633
634        try:
635            self.nameCheck(name, dir, pre, suf)
636        finally:
637            os.close(fd)
638            os.unlink(name)
639
640    def test_basic(self):
641        # mkstemp can create files
642        self.do_create()
643        self.do_create(pre="a")
644        self.do_create(suf="b")
645        self.do_create(pre="a", suf="b")
646        self.do_create(pre="aa", suf=".txt")
647        self.do_create(dir=".")
648
649    def test_basic_with_bytes_names(self):
650        # mkstemp can create files when given name parts all
651        # specified as bytes.
652        d = tempfile.gettempdirb()
653        self.do_create(dir=d, suf=b"")
654        self.do_create(dir=d, pre=b"a")
655        self.do_create(dir=d, suf=b"b")
656        self.do_create(dir=d, pre=b"a", suf=b"b")
657        self.do_create(dir=d, pre=b"aa", suf=b".txt")
658        self.do_create(dir=b".")
659        with self.assertRaises(TypeError):
660            self.do_create(dir=".", pre=b"aa", suf=b".txt")
661        with self.assertRaises(TypeError):
662            self.do_create(dir=b".", pre="aa", suf=b".txt")
663        with self.assertRaises(TypeError):
664            self.do_create(dir=b".", pre=b"aa", suf=".txt")
665
666
667    def test_choose_directory(self):
668        # mkstemp can create directories in a user-selected directory
669        dir = tempfile.mkdtemp()
670        try:
671            self.do_create(dir=dir)
672            self.do_create(dir=pathlib.Path(dir))
673        finally:
674            os.rmdir(dir)
675
676
677class TestMkdtemp(TestBadTempdir, BaseTestCase):
678    """Test mkdtemp()."""
679
680    def make_temp(self):
681        return tempfile.mkdtemp()
682
683    def do_create(self, dir=None, pre=None, suf=None):
684        output_type = tempfile._infer_return_type(dir, pre, suf)
685        if dir is None:
686            if output_type is str:
687                dir = tempfile.gettempdir()
688            else:
689                dir = tempfile.gettempdirb()
690        if pre is None:
691            pre = output_type()
692        if suf is None:
693            suf = output_type()
694        name = tempfile.mkdtemp(dir=dir, prefix=pre, suffix=suf)
695
696        try:
697            self.nameCheck(name, dir, pre, suf)
698            return name
699        except:
700            os.rmdir(name)
701            raise
702
703    def test_basic(self):
704        # mkdtemp can create directories
705        os.rmdir(self.do_create())
706        os.rmdir(self.do_create(pre="a"))
707        os.rmdir(self.do_create(suf="b"))
708        os.rmdir(self.do_create(pre="a", suf="b"))
709        os.rmdir(self.do_create(pre="aa", suf=".txt"))
710
711    def test_basic_with_bytes_names(self):
712        # mkdtemp can create directories when given all binary parts
713        d = tempfile.gettempdirb()
714        os.rmdir(self.do_create(dir=d))
715        os.rmdir(self.do_create(dir=d, pre=b"a"))
716        os.rmdir(self.do_create(dir=d, suf=b"b"))
717        os.rmdir(self.do_create(dir=d, pre=b"a", suf=b"b"))
718        os.rmdir(self.do_create(dir=d, pre=b"aa", suf=b".txt"))
719        with self.assertRaises(TypeError):
720            os.rmdir(self.do_create(dir=d, pre="aa", suf=b".txt"))
721        with self.assertRaises(TypeError):
722            os.rmdir(self.do_create(dir=d, pre=b"aa", suf=".txt"))
723        with self.assertRaises(TypeError):
724            os.rmdir(self.do_create(dir="", pre=b"aa", suf=b".txt"))
725
726    def test_basic_many(self):
727        # mkdtemp can create many directories (stochastic)
728        extant = list(range(TEST_FILES))
729        try:
730            for i in extant:
731                extant[i] = self.do_create(pre="aa")
732        finally:
733            for i in extant:
734                if(isinstance(i, str)):
735                    os.rmdir(i)
736
737    def test_choose_directory(self):
738        # mkdtemp can create directories in a user-selected directory
739        dir = tempfile.mkdtemp()
740        try:
741            os.rmdir(self.do_create(dir=dir))
742            os.rmdir(self.do_create(dir=pathlib.Path(dir)))
743        finally:
744            os.rmdir(dir)
745
746    def test_mode(self):
747        # mkdtemp creates directories with the proper mode
748
749        dir = self.do_create()
750        try:
751            mode = stat.S_IMODE(os.stat(dir).st_mode)
752            mode &= 0o777 # Mask off sticky bits inherited from /tmp
753            expected = 0o700
754            if sys.platform == 'win32':
755                # There's no distinction among 'user', 'group' and 'world';
756                # replicate the 'user' bits.
757                user = expected >> 6
758                expected = user * (1 + 8 + 64)
759            self.assertEqual(mode, expected)
760        finally:
761            os.rmdir(dir)
762
763    def test_collision_with_existing_file(self):
764        # mkdtemp tries another name when a file with
765        # the chosen name already exists
766        with _inside_empty_temp_dir(), \
767             _mock_candidate_names('aaa', 'aaa', 'bbb'):
768            file = tempfile.NamedTemporaryFile(delete=False)
769            file.close()
770            self.assertTrue(file.name.endswith('aaa'))
771            dir = tempfile.mkdtemp()
772            self.assertTrue(dir.endswith('bbb'))
773
774    def test_collision_with_existing_directory(self):
775        # mkdtemp tries another name when a directory with
776        # the chosen name already exists
777        with _inside_empty_temp_dir(), \
778             _mock_candidate_names('aaa', 'aaa', 'bbb'):
779            dir1 = tempfile.mkdtemp()
780            self.assertTrue(dir1.endswith('aaa'))
781            dir2 = tempfile.mkdtemp()
782            self.assertTrue(dir2.endswith('bbb'))
783
784
785class TestMktemp(BaseTestCase):
786    """Test mktemp()."""
787
788    # For safety, all use of mktemp must occur in a private directory.
789    # We must also suppress the RuntimeWarning it generates.
790    def setUp(self):
791        self.dir = tempfile.mkdtemp()
792        super().setUp()
793
794    def tearDown(self):
795        if self.dir:
796            os.rmdir(self.dir)
797            self.dir = None
798        super().tearDown()
799
800    class mktemped:
801        _unlink = os.unlink
802        _bflags = tempfile._bin_openflags
803
804        def __init__(self, dir, pre, suf):
805            self.name = tempfile.mktemp(dir=dir, prefix=pre, suffix=suf)
806            # Create the file.  This will raise an exception if it's
807            # mysteriously appeared in the meanwhile.
808            os.close(os.open(self.name, self._bflags, 0o600))
809
810        def __del__(self):
811            self._unlink(self.name)
812
813    def do_create(self, pre="", suf=""):
814        file = self.mktemped(self.dir, pre, suf)
815
816        self.nameCheck(file.name, self.dir, pre, suf)
817        return file
818
819    def test_basic(self):
820        # mktemp can choose usable file names
821        self.do_create()
822        self.do_create(pre="a")
823        self.do_create(suf="b")
824        self.do_create(pre="a", suf="b")
825        self.do_create(pre="aa", suf=".txt")
826
827    def test_many(self):
828        # mktemp can choose many usable file names (stochastic)
829        extant = list(range(TEST_FILES))
830        for i in extant:
831            extant[i] = self.do_create(pre="aa")
832
833##     def test_warning(self):
834##         # mktemp issues a warning when used
835##         warnings.filterwarnings("error",
836##                                 category=RuntimeWarning,
837##                                 message="mktemp")
838##         self.assertRaises(RuntimeWarning,
839##                           tempfile.mktemp, dir=self.dir)
840
841
842# We test _TemporaryFileWrapper by testing NamedTemporaryFile.
843
844
845class TestNamedTemporaryFile(BaseTestCase):
846    """Test NamedTemporaryFile()."""
847
848    def do_create(self, dir=None, pre="", suf="", delete=True):
849        if dir is None:
850            dir = tempfile.gettempdir()
851        file = tempfile.NamedTemporaryFile(dir=dir, prefix=pre, suffix=suf,
852                                           delete=delete)
853
854        self.nameCheck(file.name, dir, pre, suf)
855        return file
856
857
858    def test_basic(self):
859        # NamedTemporaryFile can create files
860        self.do_create()
861        self.do_create(pre="a")
862        self.do_create(suf="b")
863        self.do_create(pre="a", suf="b")
864        self.do_create(pre="aa", suf=".txt")
865
866    def test_method_lookup(self):
867        # Issue #18879: Looking up a temporary file method should keep it
868        # alive long enough.
869        f = self.do_create()
870        wr = weakref.ref(f)
871        write = f.write
872        write2 = f.write
873        del f
874        write(b'foo')
875        del write
876        write2(b'bar')
877        del write2
878        if support.check_impl_detail(cpython=True):
879            # No reference cycle was created.
880            self.assertIsNone(wr())
881
882    def test_iter(self):
883        # Issue #23700: getting iterator from a temporary file should keep
884        # it alive as long as it's being iterated over
885        lines = [b'spam\n', b'eggs\n', b'beans\n']
886        def make_file():
887            f = tempfile.NamedTemporaryFile(mode='w+b')
888            f.write(b''.join(lines))
889            f.seek(0)
890            return f
891        for i, l in enumerate(make_file()):
892            self.assertEqual(l, lines[i])
893        self.assertEqual(i, len(lines) - 1)
894
895    def test_creates_named(self):
896        # NamedTemporaryFile creates files with names
897        f = tempfile.NamedTemporaryFile()
898        self.assertTrue(os.path.exists(f.name),
899                        "NamedTemporaryFile %s does not exist" % f.name)
900
901    def test_del_on_close(self):
902        # A NamedTemporaryFile is deleted when closed
903        dir = tempfile.mkdtemp()
904        try:
905            with tempfile.NamedTemporaryFile(dir=dir) as f:
906                f.write(b'blat')
907            self.assertFalse(os.path.exists(f.name),
908                        "NamedTemporaryFile %s exists after close" % f.name)
909        finally:
910            os.rmdir(dir)
911
912    def test_dis_del_on_close(self):
913        # Tests that delete-on-close can be disabled
914        dir = tempfile.mkdtemp()
915        tmp = None
916        try:
917            f = tempfile.NamedTemporaryFile(dir=dir, delete=False)
918            tmp = f.name
919            f.write(b'blat')
920            f.close()
921            self.assertTrue(os.path.exists(f.name),
922                        "NamedTemporaryFile %s missing after close" % f.name)
923        finally:
924            if tmp is not None:
925                os.unlink(tmp)
926            os.rmdir(dir)
927
928    def test_multiple_close(self):
929        # A NamedTemporaryFile can be closed many times without error
930        f = tempfile.NamedTemporaryFile()
931        f.write(b'abc\n')
932        f.close()
933        f.close()
934        f.close()
935
936    def test_context_manager(self):
937        # A NamedTemporaryFile can be used as a context manager
938        with tempfile.NamedTemporaryFile() as f:
939            self.assertTrue(os.path.exists(f.name))
940        self.assertFalse(os.path.exists(f.name))
941        def use_closed():
942            with f:
943                pass
944        self.assertRaises(ValueError, use_closed)
945
946    def test_no_leak_fd(self):
947        # Issue #21058: don't leak file descriptor when io.open() fails
948        closed = []
949        os_close = os.close
950        def close(fd):
951            closed.append(fd)
952            os_close(fd)
953
954        with mock.patch('os.close', side_effect=close):
955            with mock.patch('io.open', side_effect=ValueError):
956                self.assertRaises(ValueError, tempfile.NamedTemporaryFile)
957                self.assertEqual(len(closed), 1)
958
959    def test_bad_mode(self):
960        dir = tempfile.mkdtemp()
961        self.addCleanup(support.rmtree, dir)
962        with self.assertRaises(ValueError):
963            tempfile.NamedTemporaryFile(mode='wr', dir=dir)
964        with self.assertRaises(TypeError):
965            tempfile.NamedTemporaryFile(mode=2, dir=dir)
966        self.assertEqual(os.listdir(dir), [])
967
968    # How to test the mode and bufsize parameters?
969
970class TestSpooledTemporaryFile(BaseTestCase):
971    """Test SpooledTemporaryFile()."""
972
973    def do_create(self, max_size=0, dir=None, pre="", suf=""):
974        if dir is None:
975            dir = tempfile.gettempdir()
976        file = tempfile.SpooledTemporaryFile(max_size=max_size, dir=dir, prefix=pre, suffix=suf)
977
978        return file
979
980
981    def test_basic(self):
982        # SpooledTemporaryFile can create files
983        f = self.do_create()
984        self.assertFalse(f._rolled)
985        f = self.do_create(max_size=100, pre="a", suf=".txt")
986        self.assertFalse(f._rolled)
987
988    def test_del_on_close(self):
989        # A SpooledTemporaryFile is deleted when closed
990        dir = tempfile.mkdtemp()
991        try:
992            f = tempfile.SpooledTemporaryFile(max_size=10, dir=dir)
993            self.assertFalse(f._rolled)
994            f.write(b'blat ' * 5)
995            self.assertTrue(f._rolled)
996            filename = f.name
997            f.close()
998            self.assertFalse(isinstance(filename, str) and os.path.exists(filename),
999                        "SpooledTemporaryFile %s exists after close" % filename)
1000        finally:
1001            os.rmdir(dir)
1002
1003    def test_rewrite_small(self):
1004        # A SpooledTemporaryFile can be written to multiple within the max_size
1005        f = self.do_create(max_size=30)
1006        self.assertFalse(f._rolled)
1007        for i in range(5):
1008            f.seek(0, 0)
1009            f.write(b'x' * 20)
1010        self.assertFalse(f._rolled)
1011
1012    def test_write_sequential(self):
1013        # A SpooledTemporaryFile should hold exactly max_size bytes, and roll
1014        # over afterward
1015        f = self.do_create(max_size=30)
1016        self.assertFalse(f._rolled)
1017        f.write(b'x' * 20)
1018        self.assertFalse(f._rolled)
1019        f.write(b'x' * 10)
1020        self.assertFalse(f._rolled)
1021        f.write(b'x')
1022        self.assertTrue(f._rolled)
1023
1024    def test_writelines(self):
1025        # Verify writelines with a SpooledTemporaryFile
1026        f = self.do_create()
1027        f.writelines((b'x', b'y', b'z'))
1028        pos = f.seek(0)
1029        self.assertEqual(pos, 0)
1030        buf = f.read()
1031        self.assertEqual(buf, b'xyz')
1032
1033    def test_writelines_sequential(self):
1034        # A SpooledTemporaryFile should hold exactly max_size bytes, and roll
1035        # over afterward
1036        f = self.do_create(max_size=35)
1037        f.writelines((b'x' * 20, b'x' * 10, b'x' * 5))
1038        self.assertFalse(f._rolled)
1039        f.write(b'x')
1040        self.assertTrue(f._rolled)
1041
1042    def test_sparse(self):
1043        # A SpooledTemporaryFile that is written late in the file will extend
1044        # when that occurs
1045        f = self.do_create(max_size=30)
1046        self.assertFalse(f._rolled)
1047        pos = f.seek(100, 0)
1048        self.assertEqual(pos, 100)
1049        self.assertFalse(f._rolled)
1050        f.write(b'x')
1051        self.assertTrue(f._rolled)
1052
1053    def test_fileno(self):
1054        # A SpooledTemporaryFile should roll over to a real file on fileno()
1055        f = self.do_create(max_size=30)
1056        self.assertFalse(f._rolled)
1057        self.assertTrue(f.fileno() > 0)
1058        self.assertTrue(f._rolled)
1059
1060    def test_multiple_close_before_rollover(self):
1061        # A SpooledTemporaryFile can be closed many times without error
1062        f = tempfile.SpooledTemporaryFile()
1063        f.write(b'abc\n')
1064        self.assertFalse(f._rolled)
1065        f.close()
1066        f.close()
1067        f.close()
1068
1069    def test_multiple_close_after_rollover(self):
1070        # A SpooledTemporaryFile can be closed many times without error
1071        f = tempfile.SpooledTemporaryFile(max_size=1)
1072        f.write(b'abc\n')
1073        self.assertTrue(f._rolled)
1074        f.close()
1075        f.close()
1076        f.close()
1077
1078    def test_bound_methods(self):
1079        # It should be OK to steal a bound method from a SpooledTemporaryFile
1080        # and use it independently; when the file rolls over, those bound
1081        # methods should continue to function
1082        f = self.do_create(max_size=30)
1083        read = f.read
1084        write = f.write
1085        seek = f.seek
1086
1087        write(b"a" * 35)
1088        write(b"b" * 35)
1089        seek(0, 0)
1090        self.assertEqual(read(70), b'a'*35 + b'b'*35)
1091
1092    def test_properties(self):
1093        f = tempfile.SpooledTemporaryFile(max_size=10)
1094        f.write(b'x' * 10)
1095        self.assertFalse(f._rolled)
1096        self.assertEqual(f.mode, 'w+b')
1097        self.assertIsNone(f.name)
1098        with self.assertRaises(AttributeError):
1099            f.newlines
1100        with self.assertRaises(AttributeError):
1101            f.encoding
1102        with self.assertRaises(AttributeError):
1103            f.errors
1104
1105        f.write(b'x')
1106        self.assertTrue(f._rolled)
1107        self.assertEqual(f.mode, 'rb+')
1108        self.assertIsNotNone(f.name)
1109        with self.assertRaises(AttributeError):
1110            f.newlines
1111        with self.assertRaises(AttributeError):
1112            f.encoding
1113        with self.assertRaises(AttributeError):
1114            f.errors
1115
1116    def test_text_mode(self):
1117        # Creating a SpooledTemporaryFile with a text mode should produce
1118        # a file object reading and writing (Unicode) text strings.
1119        f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10,
1120                                          encoding="utf-8")
1121        f.write("abc\n")
1122        f.seek(0)
1123        self.assertEqual(f.read(), "abc\n")
1124        f.write("def\n")
1125        f.seek(0)
1126        self.assertEqual(f.read(), "abc\ndef\n")
1127        self.assertFalse(f._rolled)
1128        self.assertEqual(f.mode, 'w+')
1129        self.assertIsNone(f.name)
1130        self.assertEqual(f.newlines, os.linesep)
1131        self.assertEqual(f.encoding, "utf-8")
1132        self.assertEqual(f.errors, "strict")
1133
1134        f.write("xyzzy\n")
1135        f.seek(0)
1136        self.assertEqual(f.read(), "abc\ndef\nxyzzy\n")
1137        # Check that Ctrl+Z doesn't truncate the file
1138        f.write("foo\x1abar\n")
1139        f.seek(0)
1140        self.assertEqual(f.read(), "abc\ndef\nxyzzy\nfoo\x1abar\n")
1141        self.assertTrue(f._rolled)
1142        self.assertEqual(f.mode, 'w+')
1143        self.assertIsNotNone(f.name)
1144        self.assertEqual(f.newlines, os.linesep)
1145        self.assertEqual(f.encoding, "utf-8")
1146        self.assertEqual(f.errors, "strict")
1147
1148    def test_text_newline_and_encoding(self):
1149        f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10,
1150                                          newline='', encoding='utf-8',
1151                                          errors='ignore')
1152        f.write("\u039B\r\n")
1153        f.seek(0)
1154        self.assertEqual(f.read(), "\u039B\r\n")
1155        self.assertFalse(f._rolled)
1156        self.assertEqual(f.mode, 'w+')
1157        self.assertIsNone(f.name)
1158        self.assertIsNotNone(f.newlines)
1159        self.assertEqual(f.encoding, "utf-8")
1160        self.assertEqual(f.errors, "ignore")
1161
1162        f.write("\u039C" * 10 + "\r\n")
1163        f.write("\u039D" * 20)
1164        f.seek(0)
1165        self.assertEqual(f.read(),
1166                "\u039B\r\n" + ("\u039C" * 10) + "\r\n" + ("\u039D" * 20))
1167        self.assertTrue(f._rolled)
1168        self.assertEqual(f.mode, 'w+')
1169        self.assertIsNotNone(f.name)
1170        self.assertIsNotNone(f.newlines)
1171        self.assertEqual(f.encoding, 'utf-8')
1172        self.assertEqual(f.errors, 'ignore')
1173
1174    def test_context_manager_before_rollover(self):
1175        # A SpooledTemporaryFile can be used as a context manager
1176        with tempfile.SpooledTemporaryFile(max_size=1) as f:
1177            self.assertFalse(f._rolled)
1178            self.assertFalse(f.closed)
1179        self.assertTrue(f.closed)
1180        def use_closed():
1181            with f:
1182                pass
1183        self.assertRaises(ValueError, use_closed)
1184
1185    def test_context_manager_during_rollover(self):
1186        # A SpooledTemporaryFile can be used as a context manager
1187        with tempfile.SpooledTemporaryFile(max_size=1) as f:
1188            self.assertFalse(f._rolled)
1189            f.write(b'abc\n')
1190            f.flush()
1191            self.assertTrue(f._rolled)
1192            self.assertFalse(f.closed)
1193        self.assertTrue(f.closed)
1194        def use_closed():
1195            with f:
1196                pass
1197        self.assertRaises(ValueError, use_closed)
1198
1199    def test_context_manager_after_rollover(self):
1200        # A SpooledTemporaryFile can be used as a context manager
1201        f = tempfile.SpooledTemporaryFile(max_size=1)
1202        f.write(b'abc\n')
1203        f.flush()
1204        self.assertTrue(f._rolled)
1205        with f:
1206            self.assertFalse(f.closed)
1207        self.assertTrue(f.closed)
1208        def use_closed():
1209            with f:
1210                pass
1211        self.assertRaises(ValueError, use_closed)
1212
1213    def test_truncate_with_size_parameter(self):
1214        # A SpooledTemporaryFile can be truncated to zero size
1215        f = tempfile.SpooledTemporaryFile(max_size=10)
1216        f.write(b'abcdefg\n')
1217        f.seek(0)
1218        f.truncate()
1219        self.assertFalse(f._rolled)
1220        self.assertEqual(f._file.getvalue(), b'')
1221        # A SpooledTemporaryFile can be truncated to a specific size
1222        f = tempfile.SpooledTemporaryFile(max_size=10)
1223        f.write(b'abcdefg\n')
1224        f.truncate(4)
1225        self.assertFalse(f._rolled)
1226        self.assertEqual(f._file.getvalue(), b'abcd')
1227        # A SpooledTemporaryFile rolls over if truncated to large size
1228        f = tempfile.SpooledTemporaryFile(max_size=10)
1229        f.write(b'abcdefg\n')
1230        f.truncate(20)
1231        self.assertTrue(f._rolled)
1232        self.assertEqual(os.fstat(f.fileno()).st_size, 20)
1233
1234
1235if tempfile.NamedTemporaryFile is not tempfile.TemporaryFile:
1236
1237    class TestTemporaryFile(BaseTestCase):
1238        """Test TemporaryFile()."""
1239
1240        def test_basic(self):
1241            # TemporaryFile can create files
1242            # No point in testing the name params - the file has no name.
1243            tempfile.TemporaryFile()
1244
1245        def test_has_no_name(self):
1246            # TemporaryFile creates files with no names (on this system)
1247            dir = tempfile.mkdtemp()
1248            f = tempfile.TemporaryFile(dir=dir)
1249            f.write(b'blat')
1250
1251            # Sneaky: because this file has no name, it should not prevent
1252            # us from removing the directory it was created in.
1253            try:
1254                os.rmdir(dir)
1255            except:
1256                # cleanup
1257                f.close()
1258                os.rmdir(dir)
1259                raise
1260
1261        def test_multiple_close(self):
1262            # A TemporaryFile can be closed many times without error
1263            f = tempfile.TemporaryFile()
1264            f.write(b'abc\n')
1265            f.close()
1266            f.close()
1267            f.close()
1268
1269        # How to test the mode and bufsize parameters?
1270        def test_mode_and_encoding(self):
1271
1272            def roundtrip(input, *args, **kwargs):
1273                with tempfile.TemporaryFile(*args, **kwargs) as fileobj:
1274                    fileobj.write(input)
1275                    fileobj.seek(0)
1276                    self.assertEqual(input, fileobj.read())
1277
1278            roundtrip(b"1234", "w+b")
1279            roundtrip("abdc\n", "w+")
1280            roundtrip("\u039B", "w+", encoding="utf-16")
1281            roundtrip("foo\r\n", "w+", newline="")
1282
1283        def test_no_leak_fd(self):
1284            # Issue #21058: don't leak file descriptor when io.open() fails
1285            closed = []
1286            os_close = os.close
1287            def close(fd):
1288                closed.append(fd)
1289                os_close(fd)
1290
1291            with mock.patch('os.close', side_effect=close):
1292                with mock.patch('io.open', side_effect=ValueError):
1293                    self.assertRaises(ValueError, tempfile.TemporaryFile)
1294                    self.assertEqual(len(closed), 1)
1295
1296
1297
1298# Helper for test_del_on_shutdown
1299class NulledModules:
1300    def __init__(self, *modules):
1301        self.refs = [mod.__dict__ for mod in modules]
1302        self.contents = [ref.copy() for ref in self.refs]
1303
1304    def __enter__(self):
1305        for d in self.refs:
1306            for key in d:
1307                d[key] = None
1308
1309    def __exit__(self, *exc_info):
1310        for d, c in zip(self.refs, self.contents):
1311            d.clear()
1312            d.update(c)
1313
1314class TestTemporaryDirectory(BaseTestCase):
1315    """Test TemporaryDirectory()."""
1316
1317    def do_create(self, dir=None, pre="", suf="", recurse=1, dirs=1, files=1):
1318        if dir is None:
1319            dir = tempfile.gettempdir()
1320        tmp = tempfile.TemporaryDirectory(dir=dir, prefix=pre, suffix=suf)
1321        self.nameCheck(tmp.name, dir, pre, suf)
1322        self.do_create2(tmp.name, recurse, dirs, files)
1323        return tmp
1324
1325    def do_create2(self, path, recurse=1, dirs=1, files=1):
1326        # Create subdirectories and some files
1327        if recurse:
1328            for i in range(dirs):
1329                name = os.path.join(path, "dir%d" % i)
1330                os.mkdir(name)
1331                self.do_create2(name, recurse-1, dirs, files)
1332        for i in range(files):
1333            with open(os.path.join(path, "test%d.txt" % i), "wb") as f:
1334                f.write(b"Hello world!")
1335
1336    def test_mkdtemp_failure(self):
1337        # Check no additional exception if mkdtemp fails
1338        # Previously would raise AttributeError instead
1339        # (noted as part of Issue #10188)
1340        with tempfile.TemporaryDirectory() as nonexistent:
1341            pass
1342        with self.assertRaises(FileNotFoundError) as cm:
1343            tempfile.TemporaryDirectory(dir=nonexistent)
1344        self.assertEqual(cm.exception.errno, errno.ENOENT)
1345
1346    def test_explicit_cleanup(self):
1347        # A TemporaryDirectory is deleted when cleaned up
1348        dir = tempfile.mkdtemp()
1349        try:
1350            d = self.do_create(dir=dir)
1351            self.assertTrue(os.path.exists(d.name),
1352                            "TemporaryDirectory %s does not exist" % d.name)
1353            d.cleanup()
1354            self.assertFalse(os.path.exists(d.name),
1355                        "TemporaryDirectory %s exists after cleanup" % d.name)
1356        finally:
1357            os.rmdir(dir)
1358
1359    @support.skip_unless_symlink
1360    def test_cleanup_with_symlink_to_a_directory(self):
1361        # cleanup() should not follow symlinks to directories (issue #12464)
1362        d1 = self.do_create()
1363        d2 = self.do_create(recurse=0)
1364
1365        # Symlink d1/foo -> d2
1366        os.symlink(d2.name, os.path.join(d1.name, "foo"))
1367
1368        # This call to cleanup() should not follow the "foo" symlink
1369        d1.cleanup()
1370
1371        self.assertFalse(os.path.exists(d1.name),
1372                         "TemporaryDirectory %s exists after cleanup" % d1.name)
1373        self.assertTrue(os.path.exists(d2.name),
1374                        "Directory pointed to by a symlink was deleted")
1375        self.assertEqual(os.listdir(d2.name), ['test0.txt'],
1376                         "Contents of the directory pointed to by a symlink "
1377                         "were deleted")
1378        d2.cleanup()
1379
1380    @support.cpython_only
1381    def test_del_on_collection(self):
1382        # A TemporaryDirectory is deleted when garbage collected
1383        dir = tempfile.mkdtemp()
1384        try:
1385            d = self.do_create(dir=dir)
1386            name = d.name
1387            del d # Rely on refcounting to invoke __del__
1388            self.assertFalse(os.path.exists(name),
1389                        "TemporaryDirectory %s exists after __del__" % name)
1390        finally:
1391            os.rmdir(dir)
1392
1393    def test_del_on_shutdown(self):
1394        # A TemporaryDirectory may be cleaned up during shutdown
1395        with self.do_create() as dir:
1396            for mod in ('builtins', 'os', 'shutil', 'sys', 'tempfile', 'warnings'):
1397                code = """if True:
1398                    import builtins
1399                    import os
1400                    import shutil
1401                    import sys
1402                    import tempfile
1403                    import warnings
1404
1405                    tmp = tempfile.TemporaryDirectory(dir={dir!r})
1406                    sys.stdout.buffer.write(tmp.name.encode())
1407
1408                    tmp2 = os.path.join(tmp.name, 'test_dir')
1409                    os.mkdir(tmp2)
1410                    with open(os.path.join(tmp2, "test0.txt"), "w") as f:
1411                        f.write("Hello world!")
1412
1413                    {mod}.tmp = tmp
1414
1415                    warnings.filterwarnings("always", category=ResourceWarning)
1416                    """.format(dir=dir, mod=mod)
1417                rc, out, err = script_helper.assert_python_ok("-c", code)
1418                tmp_name = out.decode().strip()
1419                self.assertFalse(os.path.exists(tmp_name),
1420                            "TemporaryDirectory %s exists after cleanup" % tmp_name)
1421                err = err.decode('utf-8', 'backslashreplace')
1422                self.assertNotIn("Exception ", err)
1423                self.assertIn("ResourceWarning: Implicitly cleaning up", err)
1424
1425    def test_exit_on_shutdown(self):
1426        # Issue #22427
1427        with self.do_create() as dir:
1428            code = """if True:
1429                import sys
1430                import tempfile
1431                import warnings
1432
1433                def generator():
1434                    with tempfile.TemporaryDirectory(dir={dir!r}) as tmp:
1435                        yield tmp
1436                g = generator()
1437                sys.stdout.buffer.write(next(g).encode())
1438
1439                warnings.filterwarnings("always", category=ResourceWarning)
1440                """.format(dir=dir)
1441            rc, out, err = script_helper.assert_python_ok("-c", code)
1442            tmp_name = out.decode().strip()
1443            self.assertFalse(os.path.exists(tmp_name),
1444                        "TemporaryDirectory %s exists after cleanup" % tmp_name)
1445            err = err.decode('utf-8', 'backslashreplace')
1446            self.assertNotIn("Exception ", err)
1447            self.assertIn("ResourceWarning: Implicitly cleaning up", err)
1448
1449    def test_warnings_on_cleanup(self):
1450        # ResourceWarning will be triggered by __del__
1451        with self.do_create() as dir:
1452            d = self.do_create(dir=dir, recurse=3)
1453            name = d.name
1454
1455            # Check for the resource warning
1456            with support.check_warnings(('Implicitly', ResourceWarning), quiet=False):
1457                warnings.filterwarnings("always", category=ResourceWarning)
1458                del d
1459                support.gc_collect()
1460            self.assertFalse(os.path.exists(name),
1461                        "TemporaryDirectory %s exists after __del__" % name)
1462
1463    def test_multiple_close(self):
1464        # Can be cleaned-up many times without error
1465        d = self.do_create()
1466        d.cleanup()
1467        d.cleanup()
1468        d.cleanup()
1469
1470    def test_context_manager(self):
1471        # Can be used as a context manager
1472        d = self.do_create()
1473        with d as name:
1474            self.assertTrue(os.path.exists(name))
1475            self.assertEqual(name, d.name)
1476        self.assertFalse(os.path.exists(name))
1477
1478    def test_modes(self):
1479        for mode in range(8):
1480            mode <<= 6
1481            with self.subTest(mode=format(mode, '03o')):
1482                d = self.do_create(recurse=3, dirs=2, files=2)
1483                with d:
1484                    # Change files and directories mode recursively.
1485                    for root, dirs, files in os.walk(d.name, topdown=False):
1486                        for name in files:
1487                            os.chmod(os.path.join(root, name), mode)
1488                        os.chmod(root, mode)
1489                    d.cleanup()
1490                self.assertFalse(os.path.exists(d.name))
1491
1492    @unittest.skipUnless(hasattr(os, 'chflags'), 'requires os.lchflags')
1493    def test_flags(self):
1494        flags = stat.UF_IMMUTABLE | stat.UF_NOUNLINK
1495        d = self.do_create(recurse=3, dirs=2, files=2)
1496        with d:
1497            # Change files and directories flags recursively.
1498            for root, dirs, files in os.walk(d.name, topdown=False):
1499                for name in files:
1500                    os.chflags(os.path.join(root, name), flags)
1501                os.chflags(root, flags)
1502            d.cleanup()
1503        self.assertFalse(os.path.exists(d.name))
1504
1505
1506if __name__ == "__main__":
1507    unittest.main()
1508