1# tempfile.py unit tests.
2import tempfile
3import errno
4import io
5import os
6import pathlib
7import sys
8import re
9import warnings
10import contextlib
11import stat
12import types
13import weakref
14from unittest import mock
15
16import unittest
17from test import support
18from test.support import script_helper
19
20
21has_textmode = (tempfile._text_openflags != tempfile._bin_openflags)
22has_spawnl = hasattr(os, 'spawnl')
23
24# TEST_FILES may need to be tweaked for systems depending on the maximum
25# number of files that can be opened at one time (see ulimit -n)
26if sys.platform.startswith('openbsd'):
27    TEST_FILES = 48
28else:
29    TEST_FILES = 100
30
31# This is organized as one test for each chunk of code in tempfile.py,
32# in order of their appearance in the file.  Testing which requires
33# threads is not done here.
34
35class TestLowLevelInternals(unittest.TestCase):
36    def test_infer_return_type_singles(self):
37        self.assertIs(str, tempfile._infer_return_type(''))
38        self.assertIs(bytes, tempfile._infer_return_type(b''))
39        self.assertIs(str, tempfile._infer_return_type(None))
40
41    def test_infer_return_type_multiples(self):
42        self.assertIs(str, tempfile._infer_return_type('', ''))
43        self.assertIs(bytes, tempfile._infer_return_type(b'', b''))
44        with self.assertRaises(TypeError):
45            tempfile._infer_return_type('', b'')
46        with self.assertRaises(TypeError):
47            tempfile._infer_return_type(b'', '')
48
49    def test_infer_return_type_multiples_and_none(self):
50        self.assertIs(str, tempfile._infer_return_type(None, ''))
51        self.assertIs(str, tempfile._infer_return_type('', None))
52        self.assertIs(str, tempfile._infer_return_type(None, None))
53        self.assertIs(bytes, tempfile._infer_return_type(b'', None))
54        self.assertIs(bytes, tempfile._infer_return_type(None, b''))
55        with self.assertRaises(TypeError):
56            tempfile._infer_return_type('', None, b'')
57        with self.assertRaises(TypeError):
58            tempfile._infer_return_type(b'', None, '')
59
60    def test_infer_return_type_pathlib(self):
61        self.assertIs(str, tempfile._infer_return_type(pathlib.Path('/')))
62
63    def test_infer_return_type_pathlike(self):
64        class Path:
65            def __init__(self, path):
66                self.path = path
67
68            def __fspath__(self):
69                return self.path
70
71        self.assertIs(str, tempfile._infer_return_type(Path('/')))
72        self.assertIs(bytes, tempfile._infer_return_type(Path(b'/')))
73        self.assertIs(str, tempfile._infer_return_type('', Path('')))
74        self.assertIs(bytes, tempfile._infer_return_type(b'', Path(b'')))
75        self.assertIs(bytes, tempfile._infer_return_type(None, Path(b'')))
76        self.assertIs(str, tempfile._infer_return_type(None, Path('')))
77
78        with self.assertRaises(TypeError):
79            tempfile._infer_return_type('', Path(b''))
80        with self.assertRaises(TypeError):
81            tempfile._infer_return_type(b'', Path(''))
82
83# Common functionality.
84
85class BaseTestCase(unittest.TestCase):
86
87    str_check = re.compile(r"^[a-z0-9_-]{8}$")
88    b_check = re.compile(br"^[a-z0-9_-]{8}$")
89
90    def setUp(self):
91        self._warnings_manager = support.check_warnings()
92        self._warnings_manager.__enter__()
93        warnings.filterwarnings("ignore", category=RuntimeWarning,
94                                message="mktemp", module=__name__)
95
96    def tearDown(self):
97        self._warnings_manager.__exit__(None, None, None)
98
99    def nameCheck(self, name, dir, pre, suf):
100        (ndir, nbase) = os.path.split(name)
101        npre  = nbase[:len(pre)]
102        nsuf  = nbase[len(nbase)-len(suf):]
103
104        if dir is not None:
105            self.assertIs(
106                type(name),
107                str
108                if type(dir) is str or isinstance(dir, os.PathLike) else
109                bytes,
110                "unexpected return type",
111            )
112        if pre is not None:
113            self.assertIs(type(name), str if type(pre) is str else bytes,
114                          "unexpected return type")
115        if suf is not None:
116            self.assertIs(type(name), str if type(suf) is str else bytes,
117                          "unexpected return type")
118        if (dir, pre, suf) == (None, None, None):
119            self.assertIs(type(name), str, "default return type must be str")
120
121        # check for equality of the absolute paths!
122        self.assertEqual(os.path.abspath(ndir), os.path.abspath(dir),
123                         "file %r not in directory %r" % (name, dir))
124        self.assertEqual(npre, pre,
125                         "file %r does not begin with %r" % (nbase, pre))
126        self.assertEqual(nsuf, suf,
127                         "file %r does not end with %r" % (nbase, suf))
128
129        nbase = nbase[len(pre):len(nbase)-len(suf)]
130        check = self.str_check if isinstance(nbase, str) else self.b_check
131        self.assertTrue(check.match(nbase),
132                        "random characters %r do not match %r"
133                        % (nbase, check.pattern))
134
135
136class TestExports(BaseTestCase):
137    def test_exports(self):
138        # There are no surprising symbols in the tempfile module
139        dict = tempfile.__dict__
140
141        expected = {
142            "NamedTemporaryFile" : 1,
143            "TemporaryFile" : 1,
144            "mkstemp" : 1,
145            "mkdtemp" : 1,
146            "mktemp" : 1,
147            "TMP_MAX" : 1,
148            "gettempprefix" : 1,
149            "gettempprefixb" : 1,
150            "gettempdir" : 1,
151            "gettempdirb" : 1,
152            "tempdir" : 1,
153            "template" : 1,
154            "SpooledTemporaryFile" : 1,
155            "TemporaryDirectory" : 1,
156        }
157
158        unexp = []
159        for key in dict:
160            if key[0] != '_' and key not in expected:
161                unexp.append(key)
162        self.assertTrue(len(unexp) == 0,
163                        "unexpected keys: %s" % unexp)
164
165
166class TestRandomNameSequence(BaseTestCase):
167    """Test the internal iterator object _RandomNameSequence."""
168
169    def setUp(self):
170        self.r = tempfile._RandomNameSequence()
171        super().setUp()
172
173    def test_get_six_char_str(self):
174        # _RandomNameSequence returns a six-character string
175        s = next(self.r)
176        self.nameCheck(s, '', '', '')
177
178    def test_many(self):
179        # _RandomNameSequence returns no duplicate strings (stochastic)
180
181        dict = {}
182        r = self.r
183        for i in range(TEST_FILES):
184            s = next(r)
185            self.nameCheck(s, '', '', '')
186            self.assertNotIn(s, dict)
187            dict[s] = 1
188
189    def supports_iter(self):
190        # _RandomNameSequence supports the iterator protocol
191
192        i = 0
193        r = self.r
194        for s in r:
195            i += 1
196            if i == 20:
197                break
198
199    @unittest.skipUnless(hasattr(os, 'fork'),
200        "os.fork is required for this test")
201    def test_process_awareness(self):
202        # ensure that the random source differs between
203        # child and parent.
204        read_fd, write_fd = os.pipe()
205        pid = None
206        try:
207            pid = os.fork()
208            if not pid:
209                # child process
210                os.close(read_fd)
211                os.write(write_fd, next(self.r).encode("ascii"))
212                os.close(write_fd)
213                # bypass the normal exit handlers- leave those to
214                # the parent.
215                os._exit(0)
216
217            # parent process
218            parent_value = next(self.r)
219            child_value = os.read(read_fd, len(parent_value)).decode("ascii")
220        finally:
221            if pid:
222                support.wait_process(pid, exitcode=0)
223
224            os.close(read_fd)
225            os.close(write_fd)
226        self.assertNotEqual(child_value, parent_value)
227
228
229
230class TestCandidateTempdirList(BaseTestCase):
231    """Test the internal function _candidate_tempdir_list."""
232
233    def test_nonempty_list(self):
234        # _candidate_tempdir_list returns a nonempty list of strings
235
236        cand = tempfile._candidate_tempdir_list()
237
238        self.assertFalse(len(cand) == 0)
239        for c in cand:
240            self.assertIsInstance(c, str)
241
242    def test_wanted_dirs(self):
243        # _candidate_tempdir_list contains the expected directories
244
245        # Make sure the interesting environment variables are all set.
246        with support.EnvironmentVarGuard() as env:
247            for envname in 'TMPDIR', 'TEMP', 'TMP':
248                dirname = os.getenv(envname)
249                if not dirname:
250                    env[envname] = os.path.abspath(envname)
251
252            cand = tempfile._candidate_tempdir_list()
253
254            for envname in 'TMPDIR', 'TEMP', 'TMP':
255                dirname = os.getenv(envname)
256                if not dirname: raise ValueError
257                self.assertIn(dirname, cand)
258
259            try:
260                dirname = os.getcwd()
261            except (AttributeError, OSError):
262                dirname = os.curdir
263
264            self.assertIn(dirname, cand)
265
266            # Not practical to try to verify the presence of OS-specific
267            # paths in this list.
268
269
270# We test _get_default_tempdir some more by testing gettempdir.
271
272class TestGetDefaultTempdir(BaseTestCase):
273    """Test _get_default_tempdir()."""
274
275    def test_no_files_left_behind(self):
276        # use a private empty directory
277        with tempfile.TemporaryDirectory() as our_temp_directory:
278            # force _get_default_tempdir() to consider our empty directory
279            def our_candidate_list():
280                return [our_temp_directory]
281
282            with support.swap_attr(tempfile, "_candidate_tempdir_list",
283                                   our_candidate_list):
284                # verify our directory is empty after _get_default_tempdir()
285                tempfile._get_default_tempdir()
286                self.assertEqual(os.listdir(our_temp_directory), [])
287
288                def raise_OSError(*args, **kwargs):
289                    raise OSError()
290
291                with support.swap_attr(io, "open", raise_OSError):
292                    # test again with failing io.open()
293                    with self.assertRaises(FileNotFoundError):
294                        tempfile._get_default_tempdir()
295                    self.assertEqual(os.listdir(our_temp_directory), [])
296
297                def bad_writer(*args, **kwargs):
298                    fp = orig_open(*args, **kwargs)
299                    fp.write = raise_OSError
300                    return fp
301
302                with support.swap_attr(io, "open", bad_writer) as orig_open:
303                    # test again with failing write()
304                    with self.assertRaises(FileNotFoundError):
305                        tempfile._get_default_tempdir()
306                    self.assertEqual(os.listdir(our_temp_directory), [])
307
308
309class TestGetCandidateNames(BaseTestCase):
310    """Test the internal function _get_candidate_names."""
311
312    def test_retval(self):
313        # _get_candidate_names returns a _RandomNameSequence object
314        obj = tempfile._get_candidate_names()
315        self.assertIsInstance(obj, tempfile._RandomNameSequence)
316
317    def test_same_thing(self):
318        # _get_candidate_names always returns the same object
319        a = tempfile._get_candidate_names()
320        b = tempfile._get_candidate_names()
321
322        self.assertTrue(a is b)
323
324
325@contextlib.contextmanager
326def _inside_empty_temp_dir():
327    dir = tempfile.mkdtemp()
328    try:
329        with support.swap_attr(tempfile, 'tempdir', dir):
330            yield
331    finally:
332        support.rmtree(dir)
333
334
335def _mock_candidate_names(*names):
336    return support.swap_attr(tempfile,
337                             '_get_candidate_names',
338                             lambda: iter(names))
339
340
341class TestBadTempdir:
342
343    def test_read_only_directory(self):
344        with _inside_empty_temp_dir():
345            oldmode = mode = os.stat(tempfile.tempdir).st_mode
346            mode &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
347            os.chmod(tempfile.tempdir, mode)
348            try:
349                if os.access(tempfile.tempdir, os.W_OK):
350                    self.skipTest("can't set the directory read-only")
351                with self.assertRaises(PermissionError):
352                    self.make_temp()
353                self.assertEqual(os.listdir(tempfile.tempdir), [])
354            finally:
355                os.chmod(tempfile.tempdir, oldmode)
356
357    def test_nonexisting_directory(self):
358        with _inside_empty_temp_dir():
359            tempdir = os.path.join(tempfile.tempdir, 'nonexistent')
360            with support.swap_attr(tempfile, 'tempdir', tempdir):
361                with self.assertRaises(FileNotFoundError):
362                    self.make_temp()
363
364    def test_non_directory(self):
365        with _inside_empty_temp_dir():
366            tempdir = os.path.join(tempfile.tempdir, 'file')
367            open(tempdir, 'wb').close()
368            with support.swap_attr(tempfile, 'tempdir', tempdir):
369                with self.assertRaises((NotADirectoryError, FileNotFoundError)):
370                    self.make_temp()
371
372
373class TestMkstempInner(TestBadTempdir, BaseTestCase):
374    """Test the internal function _mkstemp_inner."""
375
376    class mkstemped:
377        _bflags = tempfile._bin_openflags
378        _tflags = tempfile._text_openflags
379        _close = os.close
380        _unlink = os.unlink
381
382        def __init__(self, dir, pre, suf, bin):
383            if bin: flags = self._bflags
384            else:   flags = self._tflags
385
386            output_type = tempfile._infer_return_type(dir, pre, suf)
387            (self.fd, self.name) = tempfile._mkstemp_inner(dir, pre, suf, flags, output_type)
388
389        def write(self, str):
390            os.write(self.fd, str)
391
392        def __del__(self):
393            self._close(self.fd)
394            self._unlink(self.name)
395
396    def do_create(self, dir=None, pre=None, suf=None, bin=1):
397        output_type = tempfile._infer_return_type(dir, pre, suf)
398        if dir is None:
399            if output_type is str:
400                dir = tempfile.gettempdir()
401            else:
402                dir = tempfile.gettempdirb()
403        if pre is None:
404            pre = output_type()
405        if suf is None:
406            suf = output_type()
407        file = self.mkstemped(dir, pre, suf, bin)
408
409        self.nameCheck(file.name, dir, pre, suf)
410        return file
411
412    def test_basic(self):
413        # _mkstemp_inner can create files
414        self.do_create().write(b"blat")
415        self.do_create(pre="a").write(b"blat")
416        self.do_create(suf="b").write(b"blat")
417        self.do_create(pre="a", suf="b").write(b"blat")
418        self.do_create(pre="aa", suf=".txt").write(b"blat")
419
420    def test_basic_with_bytes_names(self):
421        # _mkstemp_inner can create files when given name parts all
422        # specified as bytes.
423        dir_b = tempfile.gettempdirb()
424        self.do_create(dir=dir_b, suf=b"").write(b"blat")
425        self.do_create(dir=dir_b, pre=b"a").write(b"blat")
426        self.do_create(dir=dir_b, suf=b"b").write(b"blat")
427        self.do_create(dir=dir_b, pre=b"a", suf=b"b").write(b"blat")
428        self.do_create(dir=dir_b, pre=b"aa", suf=b".txt").write(b"blat")
429        # Can't mix str & binary types in the args.
430        with self.assertRaises(TypeError):
431            self.do_create(dir="", suf=b"").write(b"blat")
432        with self.assertRaises(TypeError):
433            self.do_create(dir=dir_b, pre="").write(b"blat")
434        with self.assertRaises(TypeError):
435            self.do_create(dir=dir_b, pre=b"", suf="").write(b"blat")
436
437    def test_basic_many(self):
438        # _mkstemp_inner can create many files (stochastic)
439        extant = list(range(TEST_FILES))
440        for i in extant:
441            extant[i] = self.do_create(pre="aa")
442
443    def test_choose_directory(self):
444        # _mkstemp_inner can create files in a user-selected directory
445        dir = tempfile.mkdtemp()
446        try:
447            self.do_create(dir=dir).write(b"blat")
448            self.do_create(dir=pathlib.Path(dir)).write(b"blat")
449        finally:
450            support.gc_collect()  # For PyPy or other GCs.
451            os.rmdir(dir)
452
453    def test_file_mode(self):
454        # _mkstemp_inner creates files with the proper mode
455
456        file = self.do_create()
457        mode = stat.S_IMODE(os.stat(file.name).st_mode)
458        expected = 0o600
459        if sys.platform == 'win32':
460            # There's no distinction among 'user', 'group' and 'world';
461            # replicate the 'user' bits.
462            user = expected >> 6
463            expected = user * (1 + 8 + 64)
464        self.assertEqual(mode, expected)
465
466    @unittest.skipUnless(has_spawnl, 'os.spawnl not available')
467    def test_noinherit(self):
468        # _mkstemp_inner file handles are not inherited by child processes
469
470        if support.verbose:
471            v="v"
472        else:
473            v="q"
474
475        file = self.do_create()
476        self.assertEqual(os.get_inheritable(file.fd), False)
477        fd = "%d" % file.fd
478
479        try:
480            me = __file__
481        except NameError:
482            me = sys.argv[0]
483
484        # We have to exec something, so that FD_CLOEXEC will take
485        # effect.  The core of this test is therefore in
486        # tf_inherit_check.py, which see.
487        tester = os.path.join(os.path.dirname(os.path.abspath(me)),
488                              "tf_inherit_check.py")
489
490        # On Windows a spawn* /path/ with embedded spaces shouldn't be quoted,
491        # but an arg with embedded spaces should be decorated with double
492        # quotes on each end
493        if sys.platform == 'win32':
494            decorated = '"%s"' % sys.executable
495            tester = '"%s"' % tester
496        else:
497            decorated = sys.executable
498
499        retval = os.spawnl(os.P_WAIT, sys.executable, decorated, tester, v, fd)
500        self.assertFalse(retval < 0,
501                    "child process caught fatal signal %d" % -retval)
502        self.assertFalse(retval > 0, "child process reports failure %d"%retval)
503
504    @unittest.skipUnless(has_textmode, "text mode not available")
505    def test_textmode(self):
506        # _mkstemp_inner can create files in text mode
507
508        # A text file is truncated at the first Ctrl+Z byte
509        f = self.do_create(bin=0)
510        f.write(b"blat\x1a")
511        f.write(b"extra\n")
512        os.lseek(f.fd, 0, os.SEEK_SET)
513        self.assertEqual(os.read(f.fd, 20), b"blat")
514
515    def make_temp(self):
516        return tempfile._mkstemp_inner(tempfile.gettempdir(),
517                                       tempfile.gettempprefix(),
518                                       '',
519                                       tempfile._bin_openflags,
520                                       str)
521
522    def test_collision_with_existing_file(self):
523        # _mkstemp_inner tries another name when a file with
524        # the chosen name already exists
525        with _inside_empty_temp_dir(), \
526             _mock_candidate_names('aaa', 'aaa', 'bbb'):
527            (fd1, name1) = self.make_temp()
528            os.close(fd1)
529            self.assertTrue(name1.endswith('aaa'))
530
531            (fd2, name2) = self.make_temp()
532            os.close(fd2)
533            self.assertTrue(name2.endswith('bbb'))
534
535    def test_collision_with_existing_directory(self):
536        # _mkstemp_inner tries another name when a directory with
537        # the chosen name already exists
538        with _inside_empty_temp_dir(), \
539             _mock_candidate_names('aaa', 'aaa', 'bbb'):
540            dir = tempfile.mkdtemp()
541            self.assertTrue(dir.endswith('aaa'))
542
543            (fd, name) = self.make_temp()
544            os.close(fd)
545            self.assertTrue(name.endswith('bbb'))
546
547
548class TestGetTempPrefix(BaseTestCase):
549    """Test gettempprefix()."""
550
551    def test_sane_template(self):
552        # gettempprefix returns a nonempty prefix string
553        p = tempfile.gettempprefix()
554
555        self.assertIsInstance(p, str)
556        self.assertGreater(len(p), 0)
557
558        pb = tempfile.gettempprefixb()
559
560        self.assertIsInstance(pb, bytes)
561        self.assertGreater(len(pb), 0)
562
563    def test_usable_template(self):
564        # gettempprefix returns a usable prefix string
565
566        # Create a temp directory, avoiding use of the prefix.
567        # Then attempt to create a file whose name is
568        # prefix + 'xxxxxx.xxx' in that directory.
569        p = tempfile.gettempprefix() + "xxxxxx.xxx"
570        d = tempfile.mkdtemp(prefix="")
571        try:
572            p = os.path.join(d, p)
573            fd = os.open(p, os.O_RDWR | os.O_CREAT)
574            os.close(fd)
575            os.unlink(p)
576        finally:
577            os.rmdir(d)
578
579
580class TestGetTempDir(BaseTestCase):
581    """Test gettempdir()."""
582
583    def test_directory_exists(self):
584        # gettempdir returns a directory which exists
585
586        for d in (tempfile.gettempdir(), tempfile.gettempdirb()):
587            self.assertTrue(os.path.isabs(d) or d == os.curdir,
588                            "%r is not an absolute path" % d)
589            self.assertTrue(os.path.isdir(d),
590                            "%r is not a directory" % d)
591
592    def test_directory_writable(self):
593        # gettempdir returns a directory writable by the user
594
595        # sneaky: just instantiate a NamedTemporaryFile, which
596        # defaults to writing into the directory returned by
597        # gettempdir.
598        with tempfile.NamedTemporaryFile() as file:
599            file.write(b"blat")
600
601    def test_same_thing(self):
602        # gettempdir always returns the same object
603        a = tempfile.gettempdir()
604        b = tempfile.gettempdir()
605        c = tempfile.gettempdirb()
606
607        self.assertTrue(a is b)
608        self.assertNotEqual(type(a), type(c))
609        self.assertEqual(a, os.fsdecode(c))
610
611    def test_case_sensitive(self):
612        # gettempdir should not flatten its case
613        # even on a case-insensitive file system
614        case_sensitive_tempdir = tempfile.mkdtemp("-Temp")
615        _tempdir, tempfile.tempdir = tempfile.tempdir, None
616        try:
617            with support.EnvironmentVarGuard() as env:
618                # Fake the first env var which is checked as a candidate
619                env["TMPDIR"] = case_sensitive_tempdir
620                self.assertEqual(tempfile.gettempdir(), case_sensitive_tempdir)
621        finally:
622            tempfile.tempdir = _tempdir
623            support.rmdir(case_sensitive_tempdir)
624
625
626class TestMkstemp(BaseTestCase):
627    """Test mkstemp()."""
628
629    def do_create(self, dir=None, pre=None, suf=None):
630        output_type = tempfile._infer_return_type(dir, pre, suf)
631        if dir is None:
632            if output_type is str:
633                dir = tempfile.gettempdir()
634            else:
635                dir = tempfile.gettempdirb()
636        if pre is None:
637            pre = output_type()
638        if suf is None:
639            suf = output_type()
640        (fd, name) = tempfile.mkstemp(dir=dir, prefix=pre, suffix=suf)
641        (ndir, nbase) = os.path.split(name)
642        adir = os.path.abspath(dir)
643        self.assertEqual(adir, ndir,
644            "Directory '%s' incorrectly returned as '%s'" % (adir, ndir))
645
646        try:
647            self.nameCheck(name, dir, pre, suf)
648        finally:
649            os.close(fd)
650            os.unlink(name)
651
652    def test_basic(self):
653        # mkstemp can create files
654        self.do_create()
655        self.do_create(pre="a")
656        self.do_create(suf="b")
657        self.do_create(pre="a", suf="b")
658        self.do_create(pre="aa", suf=".txt")
659        self.do_create(dir=".")
660
661    def test_basic_with_bytes_names(self):
662        # mkstemp can create files when given name parts all
663        # specified as bytes.
664        d = tempfile.gettempdirb()
665        self.do_create(dir=d, suf=b"")
666        self.do_create(dir=d, pre=b"a")
667        self.do_create(dir=d, suf=b"b")
668        self.do_create(dir=d, pre=b"a", suf=b"b")
669        self.do_create(dir=d, pre=b"aa", suf=b".txt")
670        self.do_create(dir=b".")
671        with self.assertRaises(TypeError):
672            self.do_create(dir=".", pre=b"aa", suf=b".txt")
673        with self.assertRaises(TypeError):
674            self.do_create(dir=b".", pre="aa", suf=b".txt")
675        with self.assertRaises(TypeError):
676            self.do_create(dir=b".", pre=b"aa", suf=".txt")
677
678
679    def test_choose_directory(self):
680        # mkstemp can create directories in a user-selected directory
681        dir = tempfile.mkdtemp()
682        try:
683            self.do_create(dir=dir)
684            self.do_create(dir=pathlib.Path(dir))
685        finally:
686            os.rmdir(dir)
687
688
689class TestMkdtemp(TestBadTempdir, BaseTestCase):
690    """Test mkdtemp()."""
691
692    def make_temp(self):
693        return tempfile.mkdtemp()
694
695    def do_create(self, dir=None, pre=None, suf=None):
696        output_type = tempfile._infer_return_type(dir, pre, suf)
697        if dir is None:
698            if output_type is str:
699                dir = tempfile.gettempdir()
700            else:
701                dir = tempfile.gettempdirb()
702        if pre is None:
703            pre = output_type()
704        if suf is None:
705            suf = output_type()
706        name = tempfile.mkdtemp(dir=dir, prefix=pre, suffix=suf)
707
708        try:
709            self.nameCheck(name, dir, pre, suf)
710            return name
711        except:
712            os.rmdir(name)
713            raise
714
715    def test_basic(self):
716        # mkdtemp can create directories
717        os.rmdir(self.do_create())
718        os.rmdir(self.do_create(pre="a"))
719        os.rmdir(self.do_create(suf="b"))
720        os.rmdir(self.do_create(pre="a", suf="b"))
721        os.rmdir(self.do_create(pre="aa", suf=".txt"))
722
723    def test_basic_with_bytes_names(self):
724        # mkdtemp can create directories when given all binary parts
725        d = tempfile.gettempdirb()
726        os.rmdir(self.do_create(dir=d))
727        os.rmdir(self.do_create(dir=d, pre=b"a"))
728        os.rmdir(self.do_create(dir=d, suf=b"b"))
729        os.rmdir(self.do_create(dir=d, pre=b"a", suf=b"b"))
730        os.rmdir(self.do_create(dir=d, pre=b"aa", suf=b".txt"))
731        with self.assertRaises(TypeError):
732            os.rmdir(self.do_create(dir=d, pre="aa", suf=b".txt"))
733        with self.assertRaises(TypeError):
734            os.rmdir(self.do_create(dir=d, pre=b"aa", suf=".txt"))
735        with self.assertRaises(TypeError):
736            os.rmdir(self.do_create(dir="", pre=b"aa", suf=b".txt"))
737
738    def test_basic_many(self):
739        # mkdtemp can create many directories (stochastic)
740        extant = list(range(TEST_FILES))
741        try:
742            for i in extant:
743                extant[i] = self.do_create(pre="aa")
744        finally:
745            for i in extant:
746                if(isinstance(i, str)):
747                    os.rmdir(i)
748
749    def test_choose_directory(self):
750        # mkdtemp can create directories in a user-selected directory
751        dir = tempfile.mkdtemp()
752        try:
753            os.rmdir(self.do_create(dir=dir))
754            os.rmdir(self.do_create(dir=pathlib.Path(dir)))
755        finally:
756            os.rmdir(dir)
757
758    def test_mode(self):
759        # mkdtemp creates directories with the proper mode
760
761        dir = self.do_create()
762        try:
763            mode = stat.S_IMODE(os.stat(dir).st_mode)
764            mode &= 0o777 # Mask off sticky bits inherited from /tmp
765            expected = 0o700
766            if sys.platform == 'win32':
767                # There's no distinction among 'user', 'group' and 'world';
768                # replicate the 'user' bits.
769                user = expected >> 6
770                expected = user * (1 + 8 + 64)
771            self.assertEqual(mode, expected)
772        finally:
773            os.rmdir(dir)
774
775    def test_collision_with_existing_file(self):
776        # mkdtemp tries another name when a file with
777        # the chosen name already exists
778        with _inside_empty_temp_dir(), \
779             _mock_candidate_names('aaa', 'aaa', 'bbb'):
780            file = tempfile.NamedTemporaryFile(delete=False)
781            file.close()
782            self.assertTrue(file.name.endswith('aaa'))
783            dir = tempfile.mkdtemp()
784            self.assertTrue(dir.endswith('bbb'))
785
786    def test_collision_with_existing_directory(self):
787        # mkdtemp tries another name when a directory with
788        # the chosen name already exists
789        with _inside_empty_temp_dir(), \
790             _mock_candidate_names('aaa', 'aaa', 'bbb'):
791            dir1 = tempfile.mkdtemp()
792            self.assertTrue(dir1.endswith('aaa'))
793            dir2 = tempfile.mkdtemp()
794            self.assertTrue(dir2.endswith('bbb'))
795
796
797class TestMktemp(BaseTestCase):
798    """Test mktemp()."""
799
800    # For safety, all use of mktemp must occur in a private directory.
801    # We must also suppress the RuntimeWarning it generates.
802    def setUp(self):
803        self.dir = tempfile.mkdtemp()
804        super().setUp()
805
806    def tearDown(self):
807        if self.dir:
808            os.rmdir(self.dir)
809            self.dir = None
810        super().tearDown()
811
812    class mktemped:
813        _unlink = os.unlink
814        _bflags = tempfile._bin_openflags
815
816        def __init__(self, dir, pre, suf):
817            self.name = tempfile.mktemp(dir=dir, prefix=pre, suffix=suf)
818            # Create the file.  This will raise an exception if it's
819            # mysteriously appeared in the meanwhile.
820            os.close(os.open(self.name, self._bflags, 0o600))
821
822        def __del__(self):
823            self._unlink(self.name)
824
825    def do_create(self, pre="", suf=""):
826        file = self.mktemped(self.dir, pre, suf)
827
828        self.nameCheck(file.name, self.dir, pre, suf)
829        return file
830
831    def test_basic(self):
832        # mktemp can choose usable file names
833        self.do_create()
834        self.do_create(pre="a")
835        self.do_create(suf="b")
836        self.do_create(pre="a", suf="b")
837        self.do_create(pre="aa", suf=".txt")
838
839    def test_many(self):
840        # mktemp can choose many usable file names (stochastic)
841        extant = list(range(TEST_FILES))
842        for i in extant:
843            extant[i] = self.do_create(pre="aa")
844        del extant
845        support.gc_collect()  # For PyPy or other GCs.
846
847##     def test_warning(self):
848##         # mktemp issues a warning when used
849##         warnings.filterwarnings("error",
850##                                 category=RuntimeWarning,
851##                                 message="mktemp")
852##         self.assertRaises(RuntimeWarning,
853##                           tempfile.mktemp, dir=self.dir)
854
855
856# We test _TemporaryFileWrapper by testing NamedTemporaryFile.
857
858
859class TestNamedTemporaryFile(BaseTestCase):
860    """Test NamedTemporaryFile()."""
861
862    def do_create(self, dir=None, pre="", suf="", delete=True):
863        if dir is None:
864            dir = tempfile.gettempdir()
865        file = tempfile.NamedTemporaryFile(dir=dir, prefix=pre, suffix=suf,
866                                           delete=delete)
867
868        self.nameCheck(file.name, dir, pre, suf)
869        return file
870
871
872    def test_basic(self):
873        # NamedTemporaryFile can create files
874        self.do_create()
875        self.do_create(pre="a")
876        self.do_create(suf="b")
877        self.do_create(pre="a", suf="b")
878        self.do_create(pre="aa", suf=".txt")
879
880    def test_method_lookup(self):
881        # Issue #18879: Looking up a temporary file method should keep it
882        # alive long enough.
883        f = self.do_create()
884        wr = weakref.ref(f)
885        write = f.write
886        write2 = f.write
887        del f
888        write(b'foo')
889        del write
890        write2(b'bar')
891        del write2
892        if support.check_impl_detail(cpython=True):
893            # No reference cycle was created.
894            self.assertIsNone(wr())
895
896    def test_iter(self):
897        # Issue #23700: getting iterator from a temporary file should keep
898        # it alive as long as it's being iterated over
899        lines = [b'spam\n', b'eggs\n', b'beans\n']
900        def make_file():
901            f = tempfile.NamedTemporaryFile(mode='w+b')
902            f.write(b''.join(lines))
903            f.seek(0)
904            return f
905        for i, l in enumerate(make_file()):
906            self.assertEqual(l, lines[i])
907        self.assertEqual(i, len(lines) - 1)
908
909    def test_creates_named(self):
910        # NamedTemporaryFile creates files with names
911        f = tempfile.NamedTemporaryFile()
912        self.assertTrue(os.path.exists(f.name),
913                        "NamedTemporaryFile %s does not exist" % f.name)
914
915    def test_del_on_close(self):
916        # A NamedTemporaryFile is deleted when closed
917        dir = tempfile.mkdtemp()
918        try:
919            with tempfile.NamedTemporaryFile(dir=dir) as f:
920                f.write(b'blat')
921            self.assertFalse(os.path.exists(f.name),
922                        "NamedTemporaryFile %s exists after close" % f.name)
923        finally:
924            os.rmdir(dir)
925
926    def test_dis_del_on_close(self):
927        # Tests that delete-on-close can be disabled
928        dir = tempfile.mkdtemp()
929        tmp = None
930        try:
931            f = tempfile.NamedTemporaryFile(dir=dir, delete=False)
932            tmp = f.name
933            f.write(b'blat')
934            f.close()
935            self.assertTrue(os.path.exists(f.name),
936                        "NamedTemporaryFile %s missing after close" % f.name)
937        finally:
938            if tmp is not None:
939                os.unlink(tmp)
940            os.rmdir(dir)
941
942    def test_multiple_close(self):
943        # A NamedTemporaryFile can be closed many times without error
944        f = tempfile.NamedTemporaryFile()
945        f.write(b'abc\n')
946        f.close()
947        f.close()
948        f.close()
949
950    def test_context_manager(self):
951        # A NamedTemporaryFile can be used as a context manager
952        with tempfile.NamedTemporaryFile() as f:
953            self.assertTrue(os.path.exists(f.name))
954        self.assertFalse(os.path.exists(f.name))
955        def use_closed():
956            with f:
957                pass
958        self.assertRaises(ValueError, use_closed)
959
960    def test_no_leak_fd(self):
961        # Issue #21058: don't leak file descriptor when io.open() fails
962        closed = []
963        os_close = os.close
964        def close(fd):
965            closed.append(fd)
966            os_close(fd)
967
968        with mock.patch('os.close', side_effect=close):
969            with mock.patch('io.open', side_effect=ValueError):
970                self.assertRaises(ValueError, tempfile.NamedTemporaryFile)
971                self.assertEqual(len(closed), 1)
972
973    def test_bad_mode(self):
974        dir = tempfile.mkdtemp()
975        self.addCleanup(support.rmtree, dir)
976        with self.assertRaises(ValueError):
977            tempfile.NamedTemporaryFile(mode='wr', dir=dir)
978        with self.assertRaises(TypeError):
979            tempfile.NamedTemporaryFile(mode=2, dir=dir)
980        self.assertEqual(os.listdir(dir), [])
981
982    # How to test the mode and bufsize parameters?
983
984class TestSpooledTemporaryFile(BaseTestCase):
985    """Test SpooledTemporaryFile()."""
986
987    def do_create(self, max_size=0, dir=None, pre="", suf=""):
988        if dir is None:
989            dir = tempfile.gettempdir()
990        file = tempfile.SpooledTemporaryFile(max_size=max_size, dir=dir, prefix=pre, suffix=suf)
991
992        return file
993
994
995    def test_basic(self):
996        # SpooledTemporaryFile can create files
997        f = self.do_create()
998        self.assertFalse(f._rolled)
999        f = self.do_create(max_size=100, pre="a", suf=".txt")
1000        self.assertFalse(f._rolled)
1001
1002    def test_del_on_close(self):
1003        # A SpooledTemporaryFile is deleted when closed
1004        dir = tempfile.mkdtemp()
1005        try:
1006            f = tempfile.SpooledTemporaryFile(max_size=10, dir=dir)
1007            self.assertFalse(f._rolled)
1008            f.write(b'blat ' * 5)
1009            self.assertTrue(f._rolled)
1010            filename = f.name
1011            f.close()
1012            self.assertFalse(isinstance(filename, str) and os.path.exists(filename),
1013                        "SpooledTemporaryFile %s exists after close" % filename)
1014        finally:
1015            os.rmdir(dir)
1016
1017    def test_rewrite_small(self):
1018        # A SpooledTemporaryFile can be written to multiple within the max_size
1019        f = self.do_create(max_size=30)
1020        self.assertFalse(f._rolled)
1021        for i in range(5):
1022            f.seek(0, 0)
1023            f.write(b'x' * 20)
1024        self.assertFalse(f._rolled)
1025
1026    def test_write_sequential(self):
1027        # A SpooledTemporaryFile should hold exactly max_size bytes, and roll
1028        # over afterward
1029        f = self.do_create(max_size=30)
1030        self.assertFalse(f._rolled)
1031        f.write(b'x' * 20)
1032        self.assertFalse(f._rolled)
1033        f.write(b'x' * 10)
1034        self.assertFalse(f._rolled)
1035        f.write(b'x')
1036        self.assertTrue(f._rolled)
1037
1038    def test_writelines(self):
1039        # Verify writelines with a SpooledTemporaryFile
1040        f = self.do_create()
1041        f.writelines((b'x', b'y', b'z'))
1042        pos = f.seek(0)
1043        self.assertEqual(pos, 0)
1044        buf = f.read()
1045        self.assertEqual(buf, b'xyz')
1046
1047    def test_writelines_sequential(self):
1048        # A SpooledTemporaryFile should hold exactly max_size bytes, and roll
1049        # over afterward
1050        f = self.do_create(max_size=35)
1051        f.writelines((b'x' * 20, b'x' * 10, b'x' * 5))
1052        self.assertFalse(f._rolled)
1053        f.write(b'x')
1054        self.assertTrue(f._rolled)
1055
1056    def test_sparse(self):
1057        # A SpooledTemporaryFile that is written late in the file will extend
1058        # when that occurs
1059        f = self.do_create(max_size=30)
1060        self.assertFalse(f._rolled)
1061        pos = f.seek(100, 0)
1062        self.assertEqual(pos, 100)
1063        self.assertFalse(f._rolled)
1064        f.write(b'x')
1065        self.assertTrue(f._rolled)
1066
1067    def test_fileno(self):
1068        # A SpooledTemporaryFile should roll over to a real file on fileno()
1069        f = self.do_create(max_size=30)
1070        self.assertFalse(f._rolled)
1071        self.assertTrue(f.fileno() > 0)
1072        self.assertTrue(f._rolled)
1073
1074    def test_multiple_close_before_rollover(self):
1075        # A SpooledTemporaryFile can be closed many times without error
1076        f = tempfile.SpooledTemporaryFile()
1077        f.write(b'abc\n')
1078        self.assertFalse(f._rolled)
1079        f.close()
1080        f.close()
1081        f.close()
1082
1083    def test_multiple_close_after_rollover(self):
1084        # A SpooledTemporaryFile can be closed many times without error
1085        f = tempfile.SpooledTemporaryFile(max_size=1)
1086        f.write(b'abc\n')
1087        self.assertTrue(f._rolled)
1088        f.close()
1089        f.close()
1090        f.close()
1091
1092    def test_bound_methods(self):
1093        # It should be OK to steal a bound method from a SpooledTemporaryFile
1094        # and use it independently; when the file rolls over, those bound
1095        # methods should continue to function
1096        f = self.do_create(max_size=30)
1097        read = f.read
1098        write = f.write
1099        seek = f.seek
1100
1101        write(b"a" * 35)
1102        write(b"b" * 35)
1103        seek(0, 0)
1104        self.assertEqual(read(70), b'a'*35 + b'b'*35)
1105
1106    def test_properties(self):
1107        f = tempfile.SpooledTemporaryFile(max_size=10)
1108        f.write(b'x' * 10)
1109        self.assertFalse(f._rolled)
1110        self.assertEqual(f.mode, 'w+b')
1111        self.assertIsNone(f.name)
1112        with self.assertRaises(AttributeError):
1113            f.newlines
1114        with self.assertRaises(AttributeError):
1115            f.encoding
1116        with self.assertRaises(AttributeError):
1117            f.errors
1118
1119        f.write(b'x')
1120        self.assertTrue(f._rolled)
1121        self.assertEqual(f.mode, 'rb+')
1122        self.assertIsNotNone(f.name)
1123        with self.assertRaises(AttributeError):
1124            f.newlines
1125        with self.assertRaises(AttributeError):
1126            f.encoding
1127        with self.assertRaises(AttributeError):
1128            f.errors
1129
1130    def test_text_mode(self):
1131        # Creating a SpooledTemporaryFile with a text mode should produce
1132        # a file object reading and writing (Unicode) text strings.
1133        f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10,
1134                                          encoding="utf-8")
1135        f.write("abc\n")
1136        f.seek(0)
1137        self.assertEqual(f.read(), "abc\n")
1138        f.write("def\n")
1139        f.seek(0)
1140        self.assertEqual(f.read(), "abc\ndef\n")
1141        self.assertFalse(f._rolled)
1142        self.assertEqual(f.mode, 'w+')
1143        self.assertIsNone(f.name)
1144        self.assertEqual(f.newlines, os.linesep)
1145        self.assertEqual(f.encoding, "utf-8")
1146        self.assertEqual(f.errors, "strict")
1147
1148        f.write("xyzzy\n")
1149        f.seek(0)
1150        self.assertEqual(f.read(), "abc\ndef\nxyzzy\n")
1151        # Check that Ctrl+Z doesn't truncate the file
1152        f.write("foo\x1abar\n")
1153        f.seek(0)
1154        self.assertEqual(f.read(), "abc\ndef\nxyzzy\nfoo\x1abar\n")
1155        self.assertTrue(f._rolled)
1156        self.assertEqual(f.mode, 'w+')
1157        self.assertIsNotNone(f.name)
1158        self.assertEqual(f.newlines, os.linesep)
1159        self.assertEqual(f.encoding, "utf-8")
1160        self.assertEqual(f.errors, "strict")
1161
1162    def test_text_newline_and_encoding(self):
1163        f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10,
1164                                          newline='', encoding='utf-8',
1165                                          errors='ignore')
1166        f.write("\u039B\r\n")
1167        f.seek(0)
1168        self.assertEqual(f.read(), "\u039B\r\n")
1169        self.assertFalse(f._rolled)
1170        self.assertEqual(f.mode, 'w+')
1171        self.assertIsNone(f.name)
1172        self.assertIsNotNone(f.newlines)
1173        self.assertEqual(f.encoding, "utf-8")
1174        self.assertEqual(f.errors, "ignore")
1175
1176        f.write("\u039C" * 10 + "\r\n")
1177        f.write("\u039D" * 20)
1178        f.seek(0)
1179        self.assertEqual(f.read(),
1180                "\u039B\r\n" + ("\u039C" * 10) + "\r\n" + ("\u039D" * 20))
1181        self.assertTrue(f._rolled)
1182        self.assertEqual(f.mode, 'w+')
1183        self.assertIsNotNone(f.name)
1184        self.assertIsNotNone(f.newlines)
1185        self.assertEqual(f.encoding, 'utf-8')
1186        self.assertEqual(f.errors, 'ignore')
1187
1188    def test_context_manager_before_rollover(self):
1189        # A SpooledTemporaryFile can be used as a context manager
1190        with tempfile.SpooledTemporaryFile(max_size=1) as f:
1191            self.assertFalse(f._rolled)
1192            self.assertFalse(f.closed)
1193        self.assertTrue(f.closed)
1194        def use_closed():
1195            with f:
1196                pass
1197        self.assertRaises(ValueError, use_closed)
1198
1199    def test_context_manager_during_rollover(self):
1200        # A SpooledTemporaryFile can be used as a context manager
1201        with tempfile.SpooledTemporaryFile(max_size=1) as f:
1202            self.assertFalse(f._rolled)
1203            f.write(b'abc\n')
1204            f.flush()
1205            self.assertTrue(f._rolled)
1206            self.assertFalse(f.closed)
1207        self.assertTrue(f.closed)
1208        def use_closed():
1209            with f:
1210                pass
1211        self.assertRaises(ValueError, use_closed)
1212
1213    def test_context_manager_after_rollover(self):
1214        # A SpooledTemporaryFile can be used as a context manager
1215        f = tempfile.SpooledTemporaryFile(max_size=1)
1216        f.write(b'abc\n')
1217        f.flush()
1218        self.assertTrue(f._rolled)
1219        with f:
1220            self.assertFalse(f.closed)
1221        self.assertTrue(f.closed)
1222        def use_closed():
1223            with f:
1224                pass
1225        self.assertRaises(ValueError, use_closed)
1226
1227    def test_truncate_with_size_parameter(self):
1228        # A SpooledTemporaryFile can be truncated to zero size
1229        f = tempfile.SpooledTemporaryFile(max_size=10)
1230        f.write(b'abcdefg\n')
1231        f.seek(0)
1232        f.truncate()
1233        self.assertFalse(f._rolled)
1234        self.assertEqual(f._file.getvalue(), b'')
1235        # A SpooledTemporaryFile can be truncated to a specific size
1236        f = tempfile.SpooledTemporaryFile(max_size=10)
1237        f.write(b'abcdefg\n')
1238        f.truncate(4)
1239        self.assertFalse(f._rolled)
1240        self.assertEqual(f._file.getvalue(), b'abcd')
1241        # A SpooledTemporaryFile rolls over if truncated to large size
1242        f = tempfile.SpooledTemporaryFile(max_size=10)
1243        f.write(b'abcdefg\n')
1244        f.truncate(20)
1245        self.assertTrue(f._rolled)
1246        self.assertEqual(os.fstat(f.fileno()).st_size, 20)
1247
1248    def test_class_getitem(self):
1249        self.assertIsInstance(tempfile.SpooledTemporaryFile[bytes],
1250                      types.GenericAlias)
1251
1252if tempfile.NamedTemporaryFile is not tempfile.TemporaryFile:
1253
1254    class TestTemporaryFile(BaseTestCase):
1255        """Test TemporaryFile()."""
1256
1257        def test_basic(self):
1258            # TemporaryFile can create files
1259            # No point in testing the name params - the file has no name.
1260            tempfile.TemporaryFile()
1261
1262        def test_has_no_name(self):
1263            # TemporaryFile creates files with no names (on this system)
1264            dir = tempfile.mkdtemp()
1265            f = tempfile.TemporaryFile(dir=dir)
1266            f.write(b'blat')
1267
1268            # Sneaky: because this file has no name, it should not prevent
1269            # us from removing the directory it was created in.
1270            try:
1271                os.rmdir(dir)
1272            except:
1273                # cleanup
1274                f.close()
1275                os.rmdir(dir)
1276                raise
1277
1278        def test_multiple_close(self):
1279            # A TemporaryFile can be closed many times without error
1280            f = tempfile.TemporaryFile()
1281            f.write(b'abc\n')
1282            f.close()
1283            f.close()
1284            f.close()
1285
1286        # How to test the mode and bufsize parameters?
1287        def test_mode_and_encoding(self):
1288
1289            def roundtrip(input, *args, **kwargs):
1290                with tempfile.TemporaryFile(*args, **kwargs) as fileobj:
1291                    fileobj.write(input)
1292                    fileobj.seek(0)
1293                    self.assertEqual(input, fileobj.read())
1294
1295            roundtrip(b"1234", "w+b")
1296            roundtrip("abdc\n", "w+")
1297            roundtrip("\u039B", "w+", encoding="utf-16")
1298            roundtrip("foo\r\n", "w+", newline="")
1299
1300        def test_no_leak_fd(self):
1301            # Issue #21058: don't leak file descriptor when io.open() fails
1302            closed = []
1303            os_close = os.close
1304            def close(fd):
1305                closed.append(fd)
1306                os_close(fd)
1307
1308            with mock.patch('os.close', side_effect=close):
1309                with mock.patch('io.open', side_effect=ValueError):
1310                    self.assertRaises(ValueError, tempfile.TemporaryFile)
1311                    self.assertEqual(len(closed), 1)
1312
1313
1314
1315# Helper for test_del_on_shutdown
1316class NulledModules:
1317    def __init__(self, *modules):
1318        self.refs = [mod.__dict__ for mod in modules]
1319        self.contents = [ref.copy() for ref in self.refs]
1320
1321    def __enter__(self):
1322        for d in self.refs:
1323            for key in d:
1324                d[key] = None
1325
1326    def __exit__(self, *exc_info):
1327        for d, c in zip(self.refs, self.contents):
1328            d.clear()
1329            d.update(c)
1330
1331class TestTemporaryDirectory(BaseTestCase):
1332    """Test TemporaryDirectory()."""
1333
1334    def do_create(self, dir=None, pre="", suf="", recurse=1, dirs=1, files=1):
1335        if dir is None:
1336            dir = tempfile.gettempdir()
1337        tmp = tempfile.TemporaryDirectory(dir=dir, prefix=pre, suffix=suf)
1338        self.nameCheck(tmp.name, dir, pre, suf)
1339        self.do_create2(tmp.name, recurse, dirs, files)
1340        return tmp
1341
1342    def do_create2(self, path, recurse=1, dirs=1, files=1):
1343        # Create subdirectories and some files
1344        if recurse:
1345            for i in range(dirs):
1346                name = os.path.join(path, "dir%d" % i)
1347                os.mkdir(name)
1348                self.do_create2(name, recurse-1, dirs, files)
1349        for i in range(files):
1350            with open(os.path.join(path, "test%d.txt" % i), "wb") as f:
1351                f.write(b"Hello world!")
1352
1353    def test_mkdtemp_failure(self):
1354        # Check no additional exception if mkdtemp fails
1355        # Previously would raise AttributeError instead
1356        # (noted as part of Issue #10188)
1357        with tempfile.TemporaryDirectory() as nonexistent:
1358            pass
1359        with self.assertRaises(FileNotFoundError) as cm:
1360            tempfile.TemporaryDirectory(dir=nonexistent)
1361        self.assertEqual(cm.exception.errno, errno.ENOENT)
1362
1363    def test_explicit_cleanup(self):
1364        # A TemporaryDirectory is deleted when cleaned up
1365        dir = tempfile.mkdtemp()
1366        try:
1367            d = self.do_create(dir=dir)
1368            self.assertTrue(os.path.exists(d.name),
1369                            "TemporaryDirectory %s does not exist" % d.name)
1370            d.cleanup()
1371            self.assertFalse(os.path.exists(d.name),
1372                        "TemporaryDirectory %s exists after cleanup" % d.name)
1373        finally:
1374            os.rmdir(dir)
1375
1376    @support.skip_unless_symlink
1377    def test_cleanup_with_symlink_to_a_directory(self):
1378        # cleanup() should not follow symlinks to directories (issue #12464)
1379        d1 = self.do_create()
1380        d2 = self.do_create(recurse=0)
1381
1382        # Symlink d1/foo -> d2
1383        os.symlink(d2.name, os.path.join(d1.name, "foo"))
1384
1385        # This call to cleanup() should not follow the "foo" symlink
1386        d1.cleanup()
1387
1388        self.assertFalse(os.path.exists(d1.name),
1389                         "TemporaryDirectory %s exists after cleanup" % d1.name)
1390        self.assertTrue(os.path.exists(d2.name),
1391                        "Directory pointed to by a symlink was deleted")
1392        self.assertEqual(os.listdir(d2.name), ['test0.txt'],
1393                         "Contents of the directory pointed to by a symlink "
1394                         "were deleted")
1395        d2.cleanup()
1396
1397    @support.cpython_only
1398    def test_del_on_collection(self):
1399        # A TemporaryDirectory is deleted when garbage collected
1400        dir = tempfile.mkdtemp()
1401        try:
1402            d = self.do_create(dir=dir)
1403            name = d.name
1404            del d # Rely on refcounting to invoke __del__
1405            self.assertFalse(os.path.exists(name),
1406                        "TemporaryDirectory %s exists after __del__" % name)
1407        finally:
1408            os.rmdir(dir)
1409
1410    def test_del_on_shutdown(self):
1411        # A TemporaryDirectory may be cleaned up during shutdown
1412        with self.do_create() as dir:
1413            for mod in ('builtins', 'os', 'shutil', 'sys', 'tempfile', 'warnings'):
1414                code = """if True:
1415                    import builtins
1416                    import os
1417                    import shutil
1418                    import sys
1419                    import tempfile
1420                    import warnings
1421
1422                    tmp = tempfile.TemporaryDirectory(dir={dir!r})
1423                    sys.stdout.buffer.write(tmp.name.encode())
1424
1425                    tmp2 = os.path.join(tmp.name, 'test_dir')
1426                    os.mkdir(tmp2)
1427                    with open(os.path.join(tmp2, "test0.txt"), "w") as f:
1428                        f.write("Hello world!")
1429
1430                    {mod}.tmp = tmp
1431
1432                    warnings.filterwarnings("always", category=ResourceWarning)
1433                    """.format(dir=dir, mod=mod)
1434                rc, out, err = script_helper.assert_python_ok("-c", code)
1435                tmp_name = out.decode().strip()
1436                self.assertFalse(os.path.exists(tmp_name),
1437                            "TemporaryDirectory %s exists after cleanup" % tmp_name)
1438                err = err.decode('utf-8', 'backslashreplace')
1439                self.assertNotIn("Exception ", err)
1440                self.assertIn("ResourceWarning: Implicitly cleaning up", err)
1441
1442    def test_exit_on_shutdown(self):
1443        # Issue #22427
1444        with self.do_create() as dir:
1445            code = """if True:
1446                import sys
1447                import tempfile
1448                import warnings
1449
1450                def generator():
1451                    with tempfile.TemporaryDirectory(dir={dir!r}) as tmp:
1452                        yield tmp
1453                g = generator()
1454                sys.stdout.buffer.write(next(g).encode())
1455
1456                warnings.filterwarnings("always", category=ResourceWarning)
1457                """.format(dir=dir)
1458            rc, out, err = script_helper.assert_python_ok("-c", code)
1459            tmp_name = out.decode().strip()
1460            self.assertFalse(os.path.exists(tmp_name),
1461                        "TemporaryDirectory %s exists after cleanup" % tmp_name)
1462            err = err.decode('utf-8', 'backslashreplace')
1463            self.assertNotIn("Exception ", err)
1464            self.assertIn("ResourceWarning: Implicitly cleaning up", err)
1465
1466    def test_warnings_on_cleanup(self):
1467        # ResourceWarning will be triggered by __del__
1468        with self.do_create() as dir:
1469            d = self.do_create(dir=dir, recurse=3)
1470            name = d.name
1471
1472            # Check for the resource warning
1473            with support.check_warnings(('Implicitly', ResourceWarning), quiet=False):
1474                warnings.filterwarnings("always", category=ResourceWarning)
1475                del d
1476                support.gc_collect()
1477            self.assertFalse(os.path.exists(name),
1478                        "TemporaryDirectory %s exists after __del__" % name)
1479
1480    def test_multiple_close(self):
1481        # Can be cleaned-up many times without error
1482        d = self.do_create()
1483        d.cleanup()
1484        d.cleanup()
1485        d.cleanup()
1486
1487    def test_context_manager(self):
1488        # Can be used as a context manager
1489        d = self.do_create()
1490        with d as name:
1491            self.assertTrue(os.path.exists(name))
1492            self.assertEqual(name, d.name)
1493        self.assertFalse(os.path.exists(name))
1494
1495    def test_modes(self):
1496        for mode in range(8):
1497            mode <<= 6
1498            with self.subTest(mode=format(mode, '03o')):
1499                d = self.do_create(recurse=3, dirs=2, files=2)
1500                with d:
1501                    # Change files and directories mode recursively.
1502                    for root, dirs, files in os.walk(d.name, topdown=False):
1503                        for name in files:
1504                            os.chmod(os.path.join(root, name), mode)
1505                        os.chmod(root, mode)
1506                    d.cleanup()
1507                self.assertFalse(os.path.exists(d.name))
1508
1509    @unittest.skipUnless(hasattr(os, 'chflags'), 'requires os.lchflags')
1510    def test_flags(self):
1511        flags = stat.UF_IMMUTABLE | stat.UF_NOUNLINK
1512        d = self.do_create(recurse=3, dirs=2, files=2)
1513        with d:
1514            # Change files and directories flags recursively.
1515            for root, dirs, files in os.walk(d.name, topdown=False):
1516                for name in files:
1517                    os.chflags(os.path.join(root, name), flags)
1518                os.chflags(root, flags)
1519            d.cleanup()
1520        self.assertFalse(os.path.exists(d.name))
1521
1522
1523if __name__ == "__main__":
1524    unittest.main()
1525