1# tempfile.py unit tests.
2import tempfile
3import errno
4import io
5import os
6import pathlib
7import signal
8import sys
9import re
10import warnings
11import contextlib
12import weakref
13from unittest import mock
14
15import unittest
16from test import support
17from test.support import script_helper
18
19
20if hasattr(os, 'stat'):
21    import stat
22    has_stat = 1
23else:
24    has_stat = 0
25
26has_textmode = (tempfile._text_openflags != tempfile._bin_openflags)
27has_spawnl = hasattr(os, 'spawnl')
28
29# TEST_FILES may need to be tweaked for systems depending on the maximum
30# number of files that can be opened at one time (see ulimit -n)
31if sys.platform.startswith('openbsd'):
32    TEST_FILES = 48
33else:
34    TEST_FILES = 100
35
36# This is organized as one test for each chunk of code in tempfile.py,
37# in order of their appearance in the file.  Testing which requires
38# threads is not done here.
39
40class TestLowLevelInternals(unittest.TestCase):
41    def test_infer_return_type_singles(self):
42        self.assertIs(str, tempfile._infer_return_type(''))
43        self.assertIs(bytes, tempfile._infer_return_type(b''))
44        self.assertIs(str, tempfile._infer_return_type(None))
45
46    def test_infer_return_type_multiples(self):
47        self.assertIs(str, tempfile._infer_return_type('', ''))
48        self.assertIs(bytes, tempfile._infer_return_type(b'', b''))
49        with self.assertRaises(TypeError):
50            tempfile._infer_return_type('', b'')
51        with self.assertRaises(TypeError):
52            tempfile._infer_return_type(b'', '')
53
54    def test_infer_return_type_multiples_and_none(self):
55        self.assertIs(str, tempfile._infer_return_type(None, ''))
56        self.assertIs(str, tempfile._infer_return_type('', None))
57        self.assertIs(str, tempfile._infer_return_type(None, None))
58        self.assertIs(bytes, tempfile._infer_return_type(b'', None))
59        self.assertIs(bytes, tempfile._infer_return_type(None, b''))
60        with self.assertRaises(TypeError):
61            tempfile._infer_return_type('', None, b'')
62        with self.assertRaises(TypeError):
63            tempfile._infer_return_type(b'', None, '')
64
65    def test_infer_return_type_pathlib(self):
66        self.assertIs(str, tempfile._infer_return_type(pathlib.Path('/')))
67
68
69# Common functionality.
70
71class BaseTestCase(unittest.TestCase):
72
73    str_check = re.compile(r"^[a-z0-9_-]{8}$")
74    b_check = re.compile(br"^[a-z0-9_-]{8}$")
75
76    def setUp(self):
77        self._warnings_manager = support.check_warnings()
78        self._warnings_manager.__enter__()
79        warnings.filterwarnings("ignore", category=RuntimeWarning,
80                                message="mktemp", module=__name__)
81
82    def tearDown(self):
83        self._warnings_manager.__exit__(None, None, None)
84
85    def nameCheck(self, name, dir, pre, suf):
86        (ndir, nbase) = os.path.split(name)
87        npre  = nbase[:len(pre)]
88        nsuf  = nbase[len(nbase)-len(suf):]
89
90        if dir is not None:
91            self.assertIs(
92                type(name),
93                str
94                if type(dir) is str or isinstance(dir, os.PathLike) else
95                bytes,
96                "unexpected return type",
97            )
98        if pre is not None:
99            self.assertIs(type(name), str if type(pre) is str else bytes,
100                          "unexpected return type")
101        if suf is not None:
102            self.assertIs(type(name), str if type(suf) is str else bytes,
103                          "unexpected return type")
104        if (dir, pre, suf) == (None, None, None):
105            self.assertIs(type(name), str, "default return type must be str")
106
107        # check for equality of the absolute paths!
108        self.assertEqual(os.path.abspath(ndir), os.path.abspath(dir),
109                         "file %r not in directory %r" % (name, dir))
110        self.assertEqual(npre, pre,
111                         "file %r does not begin with %r" % (nbase, pre))
112        self.assertEqual(nsuf, suf,
113                         "file %r does not end with %r" % (nbase, suf))
114
115        nbase = nbase[len(pre):len(nbase)-len(suf)]
116        check = self.str_check if isinstance(nbase, str) else self.b_check
117        self.assertTrue(check.match(nbase),
118                        "random characters %r do not match %r"
119                        % (nbase, check.pattern))
120
121
122class TestExports(BaseTestCase):
123    def test_exports(self):
124        # There are no surprising symbols in the tempfile module
125        dict = tempfile.__dict__
126
127        expected = {
128            "NamedTemporaryFile" : 1,
129            "TemporaryFile" : 1,
130            "mkstemp" : 1,
131            "mkdtemp" : 1,
132            "mktemp" : 1,
133            "TMP_MAX" : 1,
134            "gettempprefix" : 1,
135            "gettempprefixb" : 1,
136            "gettempdir" : 1,
137            "gettempdirb" : 1,
138            "tempdir" : 1,
139            "template" : 1,
140            "SpooledTemporaryFile" : 1,
141            "TemporaryDirectory" : 1,
142        }
143
144        unexp = []
145        for key in dict:
146            if key[0] != '_' and key not in expected:
147                unexp.append(key)
148        self.assertTrue(len(unexp) == 0,
149                        "unexpected keys: %s" % unexp)
150
151
152class TestRandomNameSequence(BaseTestCase):
153    """Test the internal iterator object _RandomNameSequence."""
154
155    def setUp(self):
156        self.r = tempfile._RandomNameSequence()
157        super().setUp()
158
159    def test_get_six_char_str(self):
160        # _RandomNameSequence returns a six-character string
161        s = next(self.r)
162        self.nameCheck(s, '', '', '')
163
164    def test_many(self):
165        # _RandomNameSequence returns no duplicate strings (stochastic)
166
167        dict = {}
168        r = self.r
169        for i in range(TEST_FILES):
170            s = next(r)
171            self.nameCheck(s, '', '', '')
172            self.assertNotIn(s, dict)
173            dict[s] = 1
174
175    def supports_iter(self):
176        # _RandomNameSequence supports the iterator protocol
177
178        i = 0
179        r = self.r
180        for s in r:
181            i += 1
182            if i == 20:
183                break
184
185    @unittest.skipUnless(hasattr(os, 'fork'),
186        "os.fork is required for this test")
187    def test_process_awareness(self):
188        # ensure that the random source differs between
189        # child and parent.
190        read_fd, write_fd = os.pipe()
191        pid = None
192        try:
193            pid = os.fork()
194            if not pid:
195                # child process
196                os.close(read_fd)
197                os.write(write_fd, next(self.r).encode("ascii"))
198                os.close(write_fd)
199                # bypass the normal exit handlers- leave those to
200                # the parent.
201                os._exit(0)
202
203            # parent process
204            parent_value = next(self.r)
205            child_value = os.read(read_fd, len(parent_value)).decode("ascii")
206        finally:
207            if pid:
208                # best effort to ensure the process can't bleed out
209                # via any bugs above
210                try:
211                    os.kill(pid, signal.SIGKILL)
212                except OSError:
213                    pass
214
215                # Read the process exit status to avoid zombie process
216                os.waitpid(pid, 0)
217
218            os.close(read_fd)
219            os.close(write_fd)
220        self.assertNotEqual(child_value, parent_value)
221
222
223
224class TestCandidateTempdirList(BaseTestCase):
225    """Test the internal function _candidate_tempdir_list."""
226
227    def test_nonempty_list(self):
228        # _candidate_tempdir_list returns a nonempty list of strings
229
230        cand = tempfile._candidate_tempdir_list()
231
232        self.assertFalse(len(cand) == 0)
233        for c in cand:
234            self.assertIsInstance(c, str)
235
236    def test_wanted_dirs(self):
237        # _candidate_tempdir_list contains the expected directories
238
239        # Make sure the interesting environment variables are all set.
240        with support.EnvironmentVarGuard() as env:
241            for envname in 'TMPDIR', 'TEMP', 'TMP':
242                dirname = os.getenv(envname)
243                if not dirname:
244                    env[envname] = os.path.abspath(envname)
245
246            cand = tempfile._candidate_tempdir_list()
247
248            for envname in 'TMPDIR', 'TEMP', 'TMP':
249                dirname = os.getenv(envname)
250                if not dirname: raise ValueError
251                self.assertIn(dirname, cand)
252
253            try:
254                dirname = os.getcwd()
255            except (AttributeError, OSError):
256                dirname = os.curdir
257
258            self.assertIn(dirname, cand)
259
260            # Not practical to try to verify the presence of OS-specific
261            # paths in this list.
262
263
264# We test _get_default_tempdir some more by testing gettempdir.
265
266class TestGetDefaultTempdir(BaseTestCase):
267    """Test _get_default_tempdir()."""
268
269    def test_no_files_left_behind(self):
270        # use a private empty directory
271        with tempfile.TemporaryDirectory() as our_temp_directory:
272            # force _get_default_tempdir() to consider our empty directory
273            def our_candidate_list():
274                return [our_temp_directory]
275
276            with support.swap_attr(tempfile, "_candidate_tempdir_list",
277                                   our_candidate_list):
278                # verify our directory is empty after _get_default_tempdir()
279                tempfile._get_default_tempdir()
280                self.assertEqual(os.listdir(our_temp_directory), [])
281
282                def raise_OSError(*args, **kwargs):
283                    raise OSError()
284
285                with support.swap_attr(io, "open", raise_OSError):
286                    # test again with failing io.open()
287                    with self.assertRaises(FileNotFoundError):
288                        tempfile._get_default_tempdir()
289                    self.assertEqual(os.listdir(our_temp_directory), [])
290
291                def bad_writer(*args, **kwargs):
292                    fp = orig_open(*args, **kwargs)
293                    fp.write = raise_OSError
294                    return fp
295
296                with support.swap_attr(io, "open", bad_writer) as orig_open:
297                    # test again with failing write()
298                    with self.assertRaises(FileNotFoundError):
299                        tempfile._get_default_tempdir()
300                    self.assertEqual(os.listdir(our_temp_directory), [])
301
302
303class TestGetCandidateNames(BaseTestCase):
304    """Test the internal function _get_candidate_names."""
305
306    def test_retval(self):
307        # _get_candidate_names returns a _RandomNameSequence object
308        obj = tempfile._get_candidate_names()
309        self.assertIsInstance(obj, tempfile._RandomNameSequence)
310
311    def test_same_thing(self):
312        # _get_candidate_names always returns the same object
313        a = tempfile._get_candidate_names()
314        b = tempfile._get_candidate_names()
315
316        self.assertTrue(a is b)
317
318
319@contextlib.contextmanager
320def _inside_empty_temp_dir():
321    dir = tempfile.mkdtemp()
322    try:
323        with support.swap_attr(tempfile, 'tempdir', dir):
324            yield
325    finally:
326        support.rmtree(dir)
327
328
329def _mock_candidate_names(*names):
330    return support.swap_attr(tempfile,
331                             '_get_candidate_names',
332                             lambda: iter(names))
333
334
335class TestBadTempdir:
336
337    def test_read_only_directory(self):
338        with _inside_empty_temp_dir():
339            oldmode = mode = os.stat(tempfile.tempdir).st_mode
340            mode &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
341            os.chmod(tempfile.tempdir, mode)
342            try:
343                if os.access(tempfile.tempdir, os.W_OK):
344                    self.skipTest("can't set the directory read-only")
345                with self.assertRaises(PermissionError):
346                    self.make_temp()
347                self.assertEqual(os.listdir(tempfile.tempdir), [])
348            finally:
349                os.chmod(tempfile.tempdir, oldmode)
350
351    def test_nonexisting_directory(self):
352        with _inside_empty_temp_dir():
353            tempdir = os.path.join(tempfile.tempdir, 'nonexistent')
354            with support.swap_attr(tempfile, 'tempdir', tempdir):
355                with self.assertRaises(FileNotFoundError):
356                    self.make_temp()
357
358    def test_non_directory(self):
359        with _inside_empty_temp_dir():
360            tempdir = os.path.join(tempfile.tempdir, 'file')
361            open(tempdir, 'wb').close()
362            with support.swap_attr(tempfile, 'tempdir', tempdir):
363                with self.assertRaises((NotADirectoryError, FileNotFoundError)):
364                    self.make_temp()
365
366
367class TestMkstempInner(TestBadTempdir, BaseTestCase):
368    """Test the internal function _mkstemp_inner."""
369
370    class mkstemped:
371        _bflags = tempfile._bin_openflags
372        _tflags = tempfile._text_openflags
373        _close = os.close
374        _unlink = os.unlink
375
376        def __init__(self, dir, pre, suf, bin):
377            if bin: flags = self._bflags
378            else:   flags = self._tflags
379
380            output_type = tempfile._infer_return_type(dir, pre, suf)
381            (self.fd, self.name) = tempfile._mkstemp_inner(dir, pre, suf, flags, output_type)
382
383        def write(self, str):
384            os.write(self.fd, str)
385
386        def __del__(self):
387            self._close(self.fd)
388            self._unlink(self.name)
389
390    def do_create(self, dir=None, pre=None, suf=None, bin=1):
391        output_type = tempfile._infer_return_type(dir, pre, suf)
392        if dir is None:
393            if output_type is str:
394                dir = tempfile.gettempdir()
395            else:
396                dir = tempfile.gettempdirb()
397        if pre is None:
398            pre = output_type()
399        if suf is None:
400            suf = output_type()
401        file = self.mkstemped(dir, pre, suf, bin)
402
403        self.nameCheck(file.name, dir, pre, suf)
404        return file
405
406    def test_basic(self):
407        # _mkstemp_inner can create files
408        self.do_create().write(b"blat")
409        self.do_create(pre="a").write(b"blat")
410        self.do_create(suf="b").write(b"blat")
411        self.do_create(pre="a", suf="b").write(b"blat")
412        self.do_create(pre="aa", suf=".txt").write(b"blat")
413
414    def test_basic_with_bytes_names(self):
415        # _mkstemp_inner can create files when given name parts all
416        # specified as bytes.
417        dir_b = tempfile.gettempdirb()
418        self.do_create(dir=dir_b, suf=b"").write(b"blat")
419        self.do_create(dir=dir_b, pre=b"a").write(b"blat")
420        self.do_create(dir=dir_b, suf=b"b").write(b"blat")
421        self.do_create(dir=dir_b, pre=b"a", suf=b"b").write(b"blat")
422        self.do_create(dir=dir_b, pre=b"aa", suf=b".txt").write(b"blat")
423        # Can't mix str & binary types in the args.
424        with self.assertRaises(TypeError):
425            self.do_create(dir="", suf=b"").write(b"blat")
426        with self.assertRaises(TypeError):
427            self.do_create(dir=dir_b, pre="").write(b"blat")
428        with self.assertRaises(TypeError):
429            self.do_create(dir=dir_b, pre=b"", suf="").write(b"blat")
430
431    def test_basic_many(self):
432        # _mkstemp_inner can create many files (stochastic)
433        extant = list(range(TEST_FILES))
434        for i in extant:
435            extant[i] = self.do_create(pre="aa")
436
437    def test_choose_directory(self):
438        # _mkstemp_inner can create files in a user-selected directory
439        dir = tempfile.mkdtemp()
440        try:
441            self.do_create(dir=dir).write(b"blat")
442            self.do_create(dir=pathlib.Path(dir)).write(b"blat")
443        finally:
444            os.rmdir(dir)
445
446    @unittest.skipUnless(has_stat, 'os.stat not available')
447    def test_file_mode(self):
448        # _mkstemp_inner creates files with the proper mode
449
450        file = self.do_create()
451        mode = stat.S_IMODE(os.stat(file.name).st_mode)
452        expected = 0o600
453        if sys.platform == 'win32':
454            # There's no distinction among 'user', 'group' and 'world';
455            # replicate the 'user' bits.
456            user = expected >> 6
457            expected = user * (1 + 8 + 64)
458        self.assertEqual(mode, expected)
459
460    @unittest.skipUnless(has_spawnl, 'os.spawnl not available')
461    def test_noinherit(self):
462        # _mkstemp_inner file handles are not inherited by child processes
463
464        if support.verbose:
465            v="v"
466        else:
467            v="q"
468
469        file = self.do_create()
470        self.assertEqual(os.get_inheritable(file.fd), False)
471        fd = "%d" % file.fd
472
473        try:
474            me = __file__
475        except NameError:
476            me = sys.argv[0]
477
478        # We have to exec something, so that FD_CLOEXEC will take
479        # effect.  The core of this test is therefore in
480        # tf_inherit_check.py, which see.
481        tester = os.path.join(os.path.dirname(os.path.abspath(me)),
482                              "tf_inherit_check.py")
483
484        # On Windows a spawn* /path/ with embedded spaces shouldn't be quoted,
485        # but an arg with embedded spaces should be decorated with double
486        # quotes on each end
487        if sys.platform == 'win32':
488            decorated = '"%s"' % sys.executable
489            tester = '"%s"' % tester
490        else:
491            decorated = sys.executable
492
493        retval = os.spawnl(os.P_WAIT, sys.executable, decorated, tester, v, fd)
494        self.assertFalse(retval < 0,
495                    "child process caught fatal signal %d" % -retval)
496        self.assertFalse(retval > 0, "child process reports failure %d"%retval)
497
498    @unittest.skipUnless(has_textmode, "text mode not available")
499    def test_textmode(self):
500        # _mkstemp_inner can create files in text mode
501
502        # A text file is truncated at the first Ctrl+Z byte
503        f = self.do_create(bin=0)
504        f.write(b"blat\x1a")
505        f.write(b"extra\n")
506        os.lseek(f.fd, 0, os.SEEK_SET)
507        self.assertEqual(os.read(f.fd, 20), b"blat")
508
509    def make_temp(self):
510        return tempfile._mkstemp_inner(tempfile.gettempdir(),
511                                       tempfile.gettempprefix(),
512                                       '',
513                                       tempfile._bin_openflags,
514                                       str)
515
516    def test_collision_with_existing_file(self):
517        # _mkstemp_inner tries another name when a file with
518        # the chosen name already exists
519        with _inside_empty_temp_dir(), \
520             _mock_candidate_names('aaa', 'aaa', 'bbb'):
521            (fd1, name1) = self.make_temp()
522            os.close(fd1)
523            self.assertTrue(name1.endswith('aaa'))
524
525            (fd2, name2) = self.make_temp()
526            os.close(fd2)
527            self.assertTrue(name2.endswith('bbb'))
528
529    def test_collision_with_existing_directory(self):
530        # _mkstemp_inner tries another name when a directory with
531        # the chosen name already exists
532        with _inside_empty_temp_dir(), \
533             _mock_candidate_names('aaa', 'aaa', 'bbb'):
534            dir = tempfile.mkdtemp()
535            self.assertTrue(dir.endswith('aaa'))
536
537            (fd, name) = self.make_temp()
538            os.close(fd)
539            self.assertTrue(name.endswith('bbb'))
540
541
542class TestGetTempPrefix(BaseTestCase):
543    """Test gettempprefix()."""
544
545    def test_sane_template(self):
546        # gettempprefix returns a nonempty prefix string
547        p = tempfile.gettempprefix()
548
549        self.assertIsInstance(p, str)
550        self.assertGreater(len(p), 0)
551
552        pb = tempfile.gettempprefixb()
553
554        self.assertIsInstance(pb, bytes)
555        self.assertGreater(len(pb), 0)
556
557    def test_usable_template(self):
558        # gettempprefix returns a usable prefix string
559
560        # Create a temp directory, avoiding use of the prefix.
561        # Then attempt to create a file whose name is
562        # prefix + 'xxxxxx.xxx' in that directory.
563        p = tempfile.gettempprefix() + "xxxxxx.xxx"
564        d = tempfile.mkdtemp(prefix="")
565        try:
566            p = os.path.join(d, p)
567            fd = os.open(p, os.O_RDWR | os.O_CREAT)
568            os.close(fd)
569            os.unlink(p)
570        finally:
571            os.rmdir(d)
572
573
574class TestGetTempDir(BaseTestCase):
575    """Test gettempdir()."""
576
577    def test_directory_exists(self):
578        # gettempdir returns a directory which exists
579
580        for d in (tempfile.gettempdir(), tempfile.gettempdirb()):
581            self.assertTrue(os.path.isabs(d) or d == os.curdir,
582                            "%r is not an absolute path" % d)
583            self.assertTrue(os.path.isdir(d),
584                            "%r is not a directory" % d)
585
586    def test_directory_writable(self):
587        # gettempdir returns a directory writable by the user
588
589        # sneaky: just instantiate a NamedTemporaryFile, which
590        # defaults to writing into the directory returned by
591        # gettempdir.
592        file = tempfile.NamedTemporaryFile()
593        file.write(b"blat")
594        file.close()
595
596    def test_same_thing(self):
597        # gettempdir always returns the same object
598        a = tempfile.gettempdir()
599        b = tempfile.gettempdir()
600        c = tempfile.gettempdirb()
601
602        self.assertTrue(a is b)
603        self.assertNotEqual(type(a), type(c))
604        self.assertEqual(a, os.fsdecode(c))
605
606    def test_case_sensitive(self):
607        # gettempdir should not flatten its case
608        # even on a case-insensitive file system
609        case_sensitive_tempdir = tempfile.mkdtemp("-Temp")
610        _tempdir, tempfile.tempdir = tempfile.tempdir, None
611        try:
612            with support.EnvironmentVarGuard() as env:
613                # Fake the first env var which is checked as a candidate
614                env["TMPDIR"] = case_sensitive_tempdir
615                self.assertEqual(tempfile.gettempdir(), case_sensitive_tempdir)
616        finally:
617            tempfile.tempdir = _tempdir
618            support.rmdir(case_sensitive_tempdir)
619
620
621class TestMkstemp(BaseTestCase):
622    """Test mkstemp()."""
623
624    def do_create(self, dir=None, pre=None, suf=None):
625        output_type = tempfile._infer_return_type(dir, pre, suf)
626        if dir is None:
627            if output_type is str:
628                dir = tempfile.gettempdir()
629            else:
630                dir = tempfile.gettempdirb()
631        if pre is None:
632            pre = output_type()
633        if suf is None:
634            suf = output_type()
635        (fd, name) = tempfile.mkstemp(dir=dir, prefix=pre, suffix=suf)
636        (ndir, nbase) = os.path.split(name)
637        adir = os.path.abspath(dir)
638        self.assertEqual(adir, ndir,
639            "Directory '%s' incorrectly returned as '%s'" % (adir, ndir))
640
641        try:
642            self.nameCheck(name, dir, pre, suf)
643        finally:
644            os.close(fd)
645            os.unlink(name)
646
647    def test_basic(self):
648        # mkstemp can create files
649        self.do_create()
650        self.do_create(pre="a")
651        self.do_create(suf="b")
652        self.do_create(pre="a", suf="b")
653        self.do_create(pre="aa", suf=".txt")
654        self.do_create(dir=".")
655
656    def test_basic_with_bytes_names(self):
657        # mkstemp can create files when given name parts all
658        # specified as bytes.
659        d = tempfile.gettempdirb()
660        self.do_create(dir=d, suf=b"")
661        self.do_create(dir=d, pre=b"a")
662        self.do_create(dir=d, suf=b"b")
663        self.do_create(dir=d, pre=b"a", suf=b"b")
664        self.do_create(dir=d, pre=b"aa", suf=b".txt")
665        self.do_create(dir=b".")
666        with self.assertRaises(TypeError):
667            self.do_create(dir=".", pre=b"aa", suf=b".txt")
668        with self.assertRaises(TypeError):
669            self.do_create(dir=b".", pre="aa", suf=b".txt")
670        with self.assertRaises(TypeError):
671            self.do_create(dir=b".", pre=b"aa", suf=".txt")
672
673
674    def test_choose_directory(self):
675        # mkstemp can create directories in a user-selected directory
676        dir = tempfile.mkdtemp()
677        try:
678            self.do_create(dir=dir)
679            self.do_create(dir=pathlib.Path(dir))
680        finally:
681            os.rmdir(dir)
682
683
684class TestMkdtemp(TestBadTempdir, BaseTestCase):
685    """Test mkdtemp()."""
686
687    def make_temp(self):
688        return tempfile.mkdtemp()
689
690    def do_create(self, dir=None, pre=None, suf=None):
691        output_type = tempfile._infer_return_type(dir, pre, suf)
692        if dir is None:
693            if output_type is str:
694                dir = tempfile.gettempdir()
695            else:
696                dir = tempfile.gettempdirb()
697        if pre is None:
698            pre = output_type()
699        if suf is None:
700            suf = output_type()
701        name = tempfile.mkdtemp(dir=dir, prefix=pre, suffix=suf)
702
703        try:
704            self.nameCheck(name, dir, pre, suf)
705            return name
706        except:
707            os.rmdir(name)
708            raise
709
710    def test_basic(self):
711        # mkdtemp can create directories
712        os.rmdir(self.do_create())
713        os.rmdir(self.do_create(pre="a"))
714        os.rmdir(self.do_create(suf="b"))
715        os.rmdir(self.do_create(pre="a", suf="b"))
716        os.rmdir(self.do_create(pre="aa", suf=".txt"))
717
718    def test_basic_with_bytes_names(self):
719        # mkdtemp can create directories when given all binary parts
720        d = tempfile.gettempdirb()
721        os.rmdir(self.do_create(dir=d))
722        os.rmdir(self.do_create(dir=d, pre=b"a"))
723        os.rmdir(self.do_create(dir=d, suf=b"b"))
724        os.rmdir(self.do_create(dir=d, pre=b"a", suf=b"b"))
725        os.rmdir(self.do_create(dir=d, pre=b"aa", suf=b".txt"))
726        with self.assertRaises(TypeError):
727            os.rmdir(self.do_create(dir=d, pre="aa", suf=b".txt"))
728        with self.assertRaises(TypeError):
729            os.rmdir(self.do_create(dir=d, pre=b"aa", suf=".txt"))
730        with self.assertRaises(TypeError):
731            os.rmdir(self.do_create(dir="", pre=b"aa", suf=b".txt"))
732
733    def test_basic_many(self):
734        # mkdtemp can create many directories (stochastic)
735        extant = list(range(TEST_FILES))
736        try:
737            for i in extant:
738                extant[i] = self.do_create(pre="aa")
739        finally:
740            for i in extant:
741                if(isinstance(i, str)):
742                    os.rmdir(i)
743
744    def test_choose_directory(self):
745        # mkdtemp can create directories in a user-selected directory
746        dir = tempfile.mkdtemp()
747        try:
748            os.rmdir(self.do_create(dir=dir))
749            os.rmdir(self.do_create(dir=pathlib.Path(dir)))
750        finally:
751            os.rmdir(dir)
752
753    @unittest.skipUnless(has_stat, 'os.stat not available')
754    def test_mode(self):
755        # mkdtemp creates directories with the proper mode
756
757        dir = self.do_create()
758        try:
759            mode = stat.S_IMODE(os.stat(dir).st_mode)
760            mode &= 0o777 # Mask off sticky bits inherited from /tmp
761            expected = 0o700
762            if sys.platform == 'win32':
763                # There's no distinction among 'user', 'group' and 'world';
764                # replicate the 'user' bits.
765                user = expected >> 6
766                expected = user * (1 + 8 + 64)
767            self.assertEqual(mode, expected)
768        finally:
769            os.rmdir(dir)
770
771    def test_collision_with_existing_file(self):
772        # mkdtemp tries another name when a file with
773        # the chosen name already exists
774        with _inside_empty_temp_dir(), \
775             _mock_candidate_names('aaa', 'aaa', 'bbb'):
776            file = tempfile.NamedTemporaryFile(delete=False)
777            file.close()
778            self.assertTrue(file.name.endswith('aaa'))
779            dir = tempfile.mkdtemp()
780            self.assertTrue(dir.endswith('bbb'))
781
782    def test_collision_with_existing_directory(self):
783        # mkdtemp tries another name when a directory with
784        # the chosen name already exists
785        with _inside_empty_temp_dir(), \
786             _mock_candidate_names('aaa', 'aaa', 'bbb'):
787            dir1 = tempfile.mkdtemp()
788            self.assertTrue(dir1.endswith('aaa'))
789            dir2 = tempfile.mkdtemp()
790            self.assertTrue(dir2.endswith('bbb'))
791
792
793class TestMktemp(BaseTestCase):
794    """Test mktemp()."""
795
796    # For safety, all use of mktemp must occur in a private directory.
797    # We must also suppress the RuntimeWarning it generates.
798    def setUp(self):
799        self.dir = tempfile.mkdtemp()
800        super().setUp()
801
802    def tearDown(self):
803        if self.dir:
804            os.rmdir(self.dir)
805            self.dir = None
806        super().tearDown()
807
808    class mktemped:
809        _unlink = os.unlink
810        _bflags = tempfile._bin_openflags
811
812        def __init__(self, dir, pre, suf):
813            self.name = tempfile.mktemp(dir=dir, prefix=pre, suffix=suf)
814            # Create the file.  This will raise an exception if it's
815            # mysteriously appeared in the meanwhile.
816            os.close(os.open(self.name, self._bflags, 0o600))
817
818        def __del__(self):
819            self._unlink(self.name)
820
821    def do_create(self, pre="", suf=""):
822        file = self.mktemped(self.dir, pre, suf)
823
824        self.nameCheck(file.name, self.dir, pre, suf)
825        return file
826
827    def test_basic(self):
828        # mktemp can choose usable file names
829        self.do_create()
830        self.do_create(pre="a")
831        self.do_create(suf="b")
832        self.do_create(pre="a", suf="b")
833        self.do_create(pre="aa", suf=".txt")
834
835    def test_many(self):
836        # mktemp can choose many usable file names (stochastic)
837        extant = list(range(TEST_FILES))
838        for i in extant:
839            extant[i] = self.do_create(pre="aa")
840
841##     def test_warning(self):
842##         # mktemp issues a warning when used
843##         warnings.filterwarnings("error",
844##                                 category=RuntimeWarning,
845##                                 message="mktemp")
846##         self.assertRaises(RuntimeWarning,
847##                           tempfile.mktemp, dir=self.dir)
848
849
850# We test _TemporaryFileWrapper by testing NamedTemporaryFile.
851
852
853class TestNamedTemporaryFile(BaseTestCase):
854    """Test NamedTemporaryFile()."""
855
856    def do_create(self, dir=None, pre="", suf="", delete=True):
857        if dir is None:
858            dir = tempfile.gettempdir()
859        file = tempfile.NamedTemporaryFile(dir=dir, prefix=pre, suffix=suf,
860                                           delete=delete)
861
862        self.nameCheck(file.name, dir, pre, suf)
863        return file
864
865
866    def test_basic(self):
867        # NamedTemporaryFile can create files
868        self.do_create()
869        self.do_create(pre="a")
870        self.do_create(suf="b")
871        self.do_create(pre="a", suf="b")
872        self.do_create(pre="aa", suf=".txt")
873
874    def test_method_lookup(self):
875        # Issue #18879: Looking up a temporary file method should keep it
876        # alive long enough.
877        f = self.do_create()
878        wr = weakref.ref(f)
879        write = f.write
880        write2 = f.write
881        del f
882        write(b'foo')
883        del write
884        write2(b'bar')
885        del write2
886        if support.check_impl_detail(cpython=True):
887            # No reference cycle was created.
888            self.assertIsNone(wr())
889
890    def test_iter(self):
891        # Issue #23700: getting iterator from a temporary file should keep
892        # it alive as long as it's being iterated over
893        lines = [b'spam\n', b'eggs\n', b'beans\n']
894        def make_file():
895            f = tempfile.NamedTemporaryFile(mode='w+b')
896            f.write(b''.join(lines))
897            f.seek(0)
898            return f
899        for i, l in enumerate(make_file()):
900            self.assertEqual(l, lines[i])
901        self.assertEqual(i, len(lines) - 1)
902
903    def test_creates_named(self):
904        # NamedTemporaryFile creates files with names
905        f = tempfile.NamedTemporaryFile()
906        self.assertTrue(os.path.exists(f.name),
907                        "NamedTemporaryFile %s does not exist" % f.name)
908
909    def test_del_on_close(self):
910        # A NamedTemporaryFile is deleted when closed
911        dir = tempfile.mkdtemp()
912        try:
913            f = tempfile.NamedTemporaryFile(dir=dir)
914            f.write(b'blat')
915            f.close()
916            self.assertFalse(os.path.exists(f.name),
917                        "NamedTemporaryFile %s exists after close" % f.name)
918        finally:
919            os.rmdir(dir)
920
921    def test_dis_del_on_close(self):
922        # Tests that delete-on-close can be disabled
923        dir = tempfile.mkdtemp()
924        tmp = None
925        try:
926            f = tempfile.NamedTemporaryFile(dir=dir, delete=False)
927            tmp = f.name
928            f.write(b'blat')
929            f.close()
930            self.assertTrue(os.path.exists(f.name),
931                        "NamedTemporaryFile %s missing after close" % f.name)
932        finally:
933            if tmp is not None:
934                os.unlink(tmp)
935            os.rmdir(dir)
936
937    def test_multiple_close(self):
938        # A NamedTemporaryFile can be closed many times without error
939        f = tempfile.NamedTemporaryFile()
940        f.write(b'abc\n')
941        f.close()
942        f.close()
943        f.close()
944
945    def test_context_manager(self):
946        # A NamedTemporaryFile can be used as a context manager
947        with tempfile.NamedTemporaryFile() as f:
948            self.assertTrue(os.path.exists(f.name))
949        self.assertFalse(os.path.exists(f.name))
950        def use_closed():
951            with f:
952                pass
953        self.assertRaises(ValueError, use_closed)
954
955    def test_no_leak_fd(self):
956        # Issue #21058: don't leak file descriptor when io.open() fails
957        closed = []
958        os_close = os.close
959        def close(fd):
960            closed.append(fd)
961            os_close(fd)
962
963        with mock.patch('os.close', side_effect=close):
964            with mock.patch('io.open', side_effect=ValueError):
965                self.assertRaises(ValueError, tempfile.NamedTemporaryFile)
966                self.assertEqual(len(closed), 1)
967
968    def test_bad_mode(self):
969        dir = tempfile.mkdtemp()
970        self.addCleanup(support.rmtree, dir)
971        with self.assertRaises(ValueError):
972            tempfile.NamedTemporaryFile(mode='wr', dir=dir)
973        with self.assertRaises(TypeError):
974            tempfile.NamedTemporaryFile(mode=2, dir=dir)
975        self.assertEqual(os.listdir(dir), [])
976
977    # How to test the mode and bufsize parameters?
978
979class TestSpooledTemporaryFile(BaseTestCase):
980    """Test SpooledTemporaryFile()."""
981
982    def do_create(self, max_size=0, dir=None, pre="", suf=""):
983        if dir is None:
984            dir = tempfile.gettempdir()
985        file = tempfile.SpooledTemporaryFile(max_size=max_size, dir=dir, prefix=pre, suffix=suf)
986
987        return file
988
989
990    def test_basic(self):
991        # SpooledTemporaryFile can create files
992        f = self.do_create()
993        self.assertFalse(f._rolled)
994        f = self.do_create(max_size=100, pre="a", suf=".txt")
995        self.assertFalse(f._rolled)
996
997    def test_del_on_close(self):
998        # A SpooledTemporaryFile is deleted when closed
999        dir = tempfile.mkdtemp()
1000        try:
1001            f = tempfile.SpooledTemporaryFile(max_size=10, dir=dir)
1002            self.assertFalse(f._rolled)
1003            f.write(b'blat ' * 5)
1004            self.assertTrue(f._rolled)
1005            filename = f.name
1006            f.close()
1007            self.assertFalse(isinstance(filename, str) and os.path.exists(filename),
1008                        "SpooledTemporaryFile %s exists after close" % filename)
1009        finally:
1010            os.rmdir(dir)
1011
1012    def test_rewrite_small(self):
1013        # A SpooledTemporaryFile can be written to multiple within the max_size
1014        f = self.do_create(max_size=30)
1015        self.assertFalse(f._rolled)
1016        for i in range(5):
1017            f.seek(0, 0)
1018            f.write(b'x' * 20)
1019        self.assertFalse(f._rolled)
1020
1021    def test_write_sequential(self):
1022        # A SpooledTemporaryFile should hold exactly max_size bytes, and roll
1023        # over afterward
1024        f = self.do_create(max_size=30)
1025        self.assertFalse(f._rolled)
1026        f.write(b'x' * 20)
1027        self.assertFalse(f._rolled)
1028        f.write(b'x' * 10)
1029        self.assertFalse(f._rolled)
1030        f.write(b'x')
1031        self.assertTrue(f._rolled)
1032
1033    def test_writelines(self):
1034        # Verify writelines with a SpooledTemporaryFile
1035        f = self.do_create()
1036        f.writelines((b'x', b'y', b'z'))
1037        pos = f.seek(0)
1038        self.assertEqual(pos, 0)
1039        buf = f.read()
1040        self.assertEqual(buf, b'xyz')
1041
1042    def test_writelines_sequential(self):
1043        # A SpooledTemporaryFile should hold exactly max_size bytes, and roll
1044        # over afterward
1045        f = self.do_create(max_size=35)
1046        f.writelines((b'x' * 20, b'x' * 10, b'x' * 5))
1047        self.assertFalse(f._rolled)
1048        f.write(b'x')
1049        self.assertTrue(f._rolled)
1050
1051    def test_sparse(self):
1052        # A SpooledTemporaryFile that is written late in the file will extend
1053        # when that occurs
1054        f = self.do_create(max_size=30)
1055        self.assertFalse(f._rolled)
1056        pos = f.seek(100, 0)
1057        self.assertEqual(pos, 100)
1058        self.assertFalse(f._rolled)
1059        f.write(b'x')
1060        self.assertTrue(f._rolled)
1061
1062    def test_fileno(self):
1063        # A SpooledTemporaryFile should roll over to a real file on fileno()
1064        f = self.do_create(max_size=30)
1065        self.assertFalse(f._rolled)
1066        self.assertTrue(f.fileno() > 0)
1067        self.assertTrue(f._rolled)
1068
1069    def test_multiple_close_before_rollover(self):
1070        # A SpooledTemporaryFile can be closed many times without error
1071        f = tempfile.SpooledTemporaryFile()
1072        f.write(b'abc\n')
1073        self.assertFalse(f._rolled)
1074        f.close()
1075        f.close()
1076        f.close()
1077
1078    def test_multiple_close_after_rollover(self):
1079        # A SpooledTemporaryFile can be closed many times without error
1080        f = tempfile.SpooledTemporaryFile(max_size=1)
1081        f.write(b'abc\n')
1082        self.assertTrue(f._rolled)
1083        f.close()
1084        f.close()
1085        f.close()
1086
1087    def test_bound_methods(self):
1088        # It should be OK to steal a bound method from a SpooledTemporaryFile
1089        # and use it independently; when the file rolls over, those bound
1090        # methods should continue to function
1091        f = self.do_create(max_size=30)
1092        read = f.read
1093        write = f.write
1094        seek = f.seek
1095
1096        write(b"a" * 35)
1097        write(b"b" * 35)
1098        seek(0, 0)
1099        self.assertEqual(read(70), b'a'*35 + b'b'*35)
1100
1101    def test_properties(self):
1102        f = tempfile.SpooledTemporaryFile(max_size=10)
1103        f.write(b'x' * 10)
1104        self.assertFalse(f._rolled)
1105        self.assertEqual(f.mode, 'w+b')
1106        self.assertIsNone(f.name)
1107        with self.assertRaises(AttributeError):
1108            f.newlines
1109        with self.assertRaises(AttributeError):
1110            f.encoding
1111
1112        f.write(b'x')
1113        self.assertTrue(f._rolled)
1114        self.assertEqual(f.mode, 'rb+')
1115        self.assertIsNotNone(f.name)
1116        with self.assertRaises(AttributeError):
1117            f.newlines
1118        with self.assertRaises(AttributeError):
1119            f.encoding
1120
1121    def test_text_mode(self):
1122        # Creating a SpooledTemporaryFile with a text mode should produce
1123        # a file object reading and writing (Unicode) text strings.
1124        f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10,
1125                                          encoding="utf-8")
1126        f.write("abc\n")
1127        f.seek(0)
1128        self.assertEqual(f.read(), "abc\n")
1129        f.write("def\n")
1130        f.seek(0)
1131        self.assertEqual(f.read(), "abc\ndef\n")
1132        self.assertFalse(f._rolled)
1133        self.assertEqual(f.mode, 'w+')
1134        self.assertIsNone(f.name)
1135        self.assertEqual(f.newlines, os.linesep)
1136        self.assertEqual(f.encoding, "utf-8")
1137
1138        f.write("xyzzy\n")
1139        f.seek(0)
1140        self.assertEqual(f.read(), "abc\ndef\nxyzzy\n")
1141        # Check that Ctrl+Z doesn't truncate the file
1142        f.write("foo\x1abar\n")
1143        f.seek(0)
1144        self.assertEqual(f.read(), "abc\ndef\nxyzzy\nfoo\x1abar\n")
1145        self.assertTrue(f._rolled)
1146        self.assertEqual(f.mode, 'w+')
1147        self.assertIsNotNone(f.name)
1148        self.assertEqual(f.newlines, os.linesep)
1149        self.assertEqual(f.encoding, "utf-8")
1150
1151    def test_text_newline_and_encoding(self):
1152        f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10,
1153                                          newline='', encoding='utf-8')
1154        f.write("\u039B\r\n")
1155        f.seek(0)
1156        self.assertEqual(f.read(), "\u039B\r\n")
1157        self.assertFalse(f._rolled)
1158        self.assertEqual(f.mode, 'w+')
1159        self.assertIsNone(f.name)
1160        self.assertIsNotNone(f.newlines)
1161        self.assertEqual(f.encoding, "utf-8")
1162
1163        f.write("\u039C" * 10 + "\r\n")
1164        f.write("\u039D" * 20)
1165        f.seek(0)
1166        self.assertEqual(f.read(),
1167                "\u039B\r\n" + ("\u039C" * 10) + "\r\n" + ("\u039D" * 20))
1168        self.assertTrue(f._rolled)
1169        self.assertEqual(f.mode, 'w+')
1170        self.assertIsNotNone(f.name)
1171        self.assertIsNotNone(f.newlines)
1172        self.assertEqual(f.encoding, 'utf-8')
1173
1174    def test_context_manager_before_rollover(self):
1175        # A SpooledTemporaryFile can be used as a context manager
1176        with tempfile.SpooledTemporaryFile(max_size=1) as f:
1177            self.assertFalse(f._rolled)
1178            self.assertFalse(f.closed)
1179        self.assertTrue(f.closed)
1180        def use_closed():
1181            with f:
1182                pass
1183        self.assertRaises(ValueError, use_closed)
1184
1185    def test_context_manager_during_rollover(self):
1186        # A SpooledTemporaryFile can be used as a context manager
1187        with tempfile.SpooledTemporaryFile(max_size=1) as f:
1188            self.assertFalse(f._rolled)
1189            f.write(b'abc\n')
1190            f.flush()
1191            self.assertTrue(f._rolled)
1192            self.assertFalse(f.closed)
1193        self.assertTrue(f.closed)
1194        def use_closed():
1195            with f:
1196                pass
1197        self.assertRaises(ValueError, use_closed)
1198
1199    def test_context_manager_after_rollover(self):
1200        # A SpooledTemporaryFile can be used as a context manager
1201        f = tempfile.SpooledTemporaryFile(max_size=1)
1202        f.write(b'abc\n')
1203        f.flush()
1204        self.assertTrue(f._rolled)
1205        with f:
1206            self.assertFalse(f.closed)
1207        self.assertTrue(f.closed)
1208        def use_closed():
1209            with f:
1210                pass
1211        self.assertRaises(ValueError, use_closed)
1212
1213    def test_truncate_with_size_parameter(self):
1214        # A SpooledTemporaryFile can be truncated to zero size
1215        f = tempfile.SpooledTemporaryFile(max_size=10)
1216        f.write(b'abcdefg\n')
1217        f.seek(0)
1218        f.truncate()
1219        self.assertFalse(f._rolled)
1220        self.assertEqual(f._file.getvalue(), b'')
1221        # A SpooledTemporaryFile can be truncated to a specific size
1222        f = tempfile.SpooledTemporaryFile(max_size=10)
1223        f.write(b'abcdefg\n')
1224        f.truncate(4)
1225        self.assertFalse(f._rolled)
1226        self.assertEqual(f._file.getvalue(), b'abcd')
1227        # A SpooledTemporaryFile rolls over if truncated to large size
1228        f = tempfile.SpooledTemporaryFile(max_size=10)
1229        f.write(b'abcdefg\n')
1230        f.truncate(20)
1231        self.assertTrue(f._rolled)
1232        if has_stat:
1233            self.assertEqual(os.fstat(f.fileno()).st_size, 20)
1234
1235
1236if tempfile.NamedTemporaryFile is not tempfile.TemporaryFile:
1237
1238    class TestTemporaryFile(BaseTestCase):
1239        """Test TemporaryFile()."""
1240
1241        def test_basic(self):
1242            # TemporaryFile can create files
1243            # No point in testing the name params - the file has no name.
1244            tempfile.TemporaryFile()
1245
1246        def test_has_no_name(self):
1247            # TemporaryFile creates files with no names (on this system)
1248            dir = tempfile.mkdtemp()
1249            f = tempfile.TemporaryFile(dir=dir)
1250            f.write(b'blat')
1251
1252            # Sneaky: because this file has no name, it should not prevent
1253            # us from removing the directory it was created in.
1254            try:
1255                os.rmdir(dir)
1256            except:
1257                # cleanup
1258                f.close()
1259                os.rmdir(dir)
1260                raise
1261
1262        def test_multiple_close(self):
1263            # A TemporaryFile can be closed many times without error
1264            f = tempfile.TemporaryFile()
1265            f.write(b'abc\n')
1266            f.close()
1267            f.close()
1268            f.close()
1269
1270        # How to test the mode and bufsize parameters?
1271        def test_mode_and_encoding(self):
1272
1273            def roundtrip(input, *args, **kwargs):
1274                with tempfile.TemporaryFile(*args, **kwargs) as fileobj:
1275                    fileobj.write(input)
1276                    fileobj.seek(0)
1277                    self.assertEqual(input, fileobj.read())
1278
1279            roundtrip(b"1234", "w+b")
1280            roundtrip("abdc\n", "w+")
1281            roundtrip("\u039B", "w+", encoding="utf-16")
1282            roundtrip("foo\r\n", "w+", newline="")
1283
1284        def test_no_leak_fd(self):
1285            # Issue #21058: don't leak file descriptor when io.open() fails
1286            closed = []
1287            os_close = os.close
1288            def close(fd):
1289                closed.append(fd)
1290                os_close(fd)
1291
1292            with mock.patch('os.close', side_effect=close):
1293                with mock.patch('io.open', side_effect=ValueError):
1294                    self.assertRaises(ValueError, tempfile.TemporaryFile)
1295                    self.assertEqual(len(closed), 1)
1296
1297
1298
1299# Helper for test_del_on_shutdown
1300class NulledModules:
1301    def __init__(self, *modules):
1302        self.refs = [mod.__dict__ for mod in modules]
1303        self.contents = [ref.copy() for ref in self.refs]
1304
1305    def __enter__(self):
1306        for d in self.refs:
1307            for key in d:
1308                d[key] = None
1309
1310    def __exit__(self, *exc_info):
1311        for d, c in zip(self.refs, self.contents):
1312            d.clear()
1313            d.update(c)
1314
1315class TestTemporaryDirectory(BaseTestCase):
1316    """Test TemporaryDirectory()."""
1317
1318    def do_create(self, dir=None, pre="", suf="", recurse=1):
1319        if dir is None:
1320            dir = tempfile.gettempdir()
1321        tmp = tempfile.TemporaryDirectory(dir=dir, prefix=pre, suffix=suf)
1322        self.nameCheck(tmp.name, dir, pre, suf)
1323        # Create a subdirectory and some files
1324        if recurse:
1325            d1 = self.do_create(tmp.name, pre, suf, recurse-1)
1326            d1.name = None
1327        with open(os.path.join(tmp.name, "test.txt"), "wb") as f:
1328            f.write(b"Hello world!")
1329        return tmp
1330
1331    def test_mkdtemp_failure(self):
1332        # Check no additional exception if mkdtemp fails
1333        # Previously would raise AttributeError instead
1334        # (noted as part of Issue #10188)
1335        with tempfile.TemporaryDirectory() as nonexistent:
1336            pass
1337        with self.assertRaises(FileNotFoundError) as cm:
1338            tempfile.TemporaryDirectory(dir=nonexistent)
1339        self.assertEqual(cm.exception.errno, errno.ENOENT)
1340
1341    def test_explicit_cleanup(self):
1342        # A TemporaryDirectory is deleted when cleaned up
1343        dir = tempfile.mkdtemp()
1344        try:
1345            d = self.do_create(dir=dir)
1346            self.assertTrue(os.path.exists(d.name),
1347                            "TemporaryDirectory %s does not exist" % d.name)
1348            d.cleanup()
1349            self.assertFalse(os.path.exists(d.name),
1350                        "TemporaryDirectory %s exists after cleanup" % d.name)
1351        finally:
1352            os.rmdir(dir)
1353
1354    @support.skip_unless_symlink
1355    def test_cleanup_with_symlink_to_a_directory(self):
1356        # cleanup() should not follow symlinks to directories (issue #12464)
1357        d1 = self.do_create()
1358        d2 = self.do_create(recurse=0)
1359
1360        # Symlink d1/foo -> d2
1361        os.symlink(d2.name, os.path.join(d1.name, "foo"))
1362
1363        # This call to cleanup() should not follow the "foo" symlink
1364        d1.cleanup()
1365
1366        self.assertFalse(os.path.exists(d1.name),
1367                         "TemporaryDirectory %s exists after cleanup" % d1.name)
1368        self.assertTrue(os.path.exists(d2.name),
1369                        "Directory pointed to by a symlink was deleted")
1370        self.assertEqual(os.listdir(d2.name), ['test.txt'],
1371                         "Contents of the directory pointed to by a symlink "
1372                         "were deleted")
1373        d2.cleanup()
1374
1375    @support.cpython_only
1376    def test_del_on_collection(self):
1377        # A TemporaryDirectory is deleted when garbage collected
1378        dir = tempfile.mkdtemp()
1379        try:
1380            d = self.do_create(dir=dir)
1381            name = d.name
1382            del d # Rely on refcounting to invoke __del__
1383            self.assertFalse(os.path.exists(name),
1384                        "TemporaryDirectory %s exists after __del__" % name)
1385        finally:
1386            os.rmdir(dir)
1387
1388    def test_del_on_shutdown(self):
1389        # A TemporaryDirectory may be cleaned up during shutdown
1390        with self.do_create() as dir:
1391            for mod in ('builtins', 'os', 'shutil', 'sys', 'tempfile', 'warnings'):
1392                code = """if True:
1393                    import builtins
1394                    import os
1395                    import shutil
1396                    import sys
1397                    import tempfile
1398                    import warnings
1399
1400                    tmp = tempfile.TemporaryDirectory(dir={dir!r})
1401                    sys.stdout.buffer.write(tmp.name.encode())
1402
1403                    tmp2 = os.path.join(tmp.name, 'test_dir')
1404                    os.mkdir(tmp2)
1405                    with open(os.path.join(tmp2, "test.txt"), "w") as f:
1406                        f.write("Hello world!")
1407
1408                    {mod}.tmp = tmp
1409
1410                    warnings.filterwarnings("always", category=ResourceWarning)
1411                    """.format(dir=dir, mod=mod)
1412                rc, out, err = script_helper.assert_python_ok("-c", code)
1413                tmp_name = out.decode().strip()
1414                self.assertFalse(os.path.exists(tmp_name),
1415                            "TemporaryDirectory %s exists after cleanup" % tmp_name)
1416                err = err.decode('utf-8', 'backslashreplace')
1417                self.assertNotIn("Exception ", err)
1418                self.assertIn("ResourceWarning: Implicitly cleaning up", err)
1419
1420    def test_exit_on_shutdown(self):
1421        # Issue #22427
1422        with self.do_create() as dir:
1423            code = """if True:
1424                import sys
1425                import tempfile
1426                import warnings
1427
1428                def generator():
1429                    with tempfile.TemporaryDirectory(dir={dir!r}) as tmp:
1430                        yield tmp
1431                g = generator()
1432                sys.stdout.buffer.write(next(g).encode())
1433
1434                warnings.filterwarnings("always", category=ResourceWarning)
1435                """.format(dir=dir)
1436            rc, out, err = script_helper.assert_python_ok("-c", code)
1437            tmp_name = out.decode().strip()
1438            self.assertFalse(os.path.exists(tmp_name),
1439                        "TemporaryDirectory %s exists after cleanup" % tmp_name)
1440            err = err.decode('utf-8', 'backslashreplace')
1441            self.assertNotIn("Exception ", err)
1442            self.assertIn("ResourceWarning: Implicitly cleaning up", err)
1443
1444    def test_warnings_on_cleanup(self):
1445        # ResourceWarning will be triggered by __del__
1446        with self.do_create() as dir:
1447            d = self.do_create(dir=dir, recurse=3)
1448            name = d.name
1449
1450            # Check for the resource warning
1451            with support.check_warnings(('Implicitly', ResourceWarning), quiet=False):
1452                warnings.filterwarnings("always", category=ResourceWarning)
1453                del d
1454                support.gc_collect()
1455            self.assertFalse(os.path.exists(name),
1456                        "TemporaryDirectory %s exists after __del__" % name)
1457
1458    def test_multiple_close(self):
1459        # Can be cleaned-up many times without error
1460        d = self.do_create()
1461        d.cleanup()
1462        d.cleanup()
1463        d.cleanup()
1464
1465    def test_context_manager(self):
1466        # Can be used as a context manager
1467        d = self.do_create()
1468        with d as name:
1469            self.assertTrue(os.path.exists(name))
1470            self.assertEqual(name, d.name)
1471        self.assertFalse(os.path.exists(name))
1472
1473
1474if __name__ == "__main__":
1475    unittest.main()
1476