1# tempfile.py unit tests.
2import tempfile
3import errno
4import io
5import os
6import pathlib
7import sys
8import re
9import warnings
10import contextlib
11import stat
12import types
13import weakref
14from unittest import mock
15
16import unittest
17from test import support
18from test.support import os_helper
19from test.support import script_helper
20from test.support import warnings_helper
21
22
23has_textmode = (tempfile._text_openflags != tempfile._bin_openflags)
24has_spawnl = hasattr(os, 'spawnl')
25
26# TEST_FILES may need to be tweaked for systems depending on the maximum
27# number of files that can be opened at one time (see ulimit -n)
28if sys.platform.startswith('openbsd'):
29    TEST_FILES = 48
30else:
31    TEST_FILES = 100
32
33# This is organized as one test for each chunk of code in tempfile.py,
34# in order of their appearance in the file.  Testing which requires
35# threads is not done here.
36
37class TestLowLevelInternals(unittest.TestCase):
38    def test_infer_return_type_singles(self):
39        self.assertIs(str, tempfile._infer_return_type(''))
40        self.assertIs(bytes, tempfile._infer_return_type(b''))
41        self.assertIs(str, tempfile._infer_return_type(None))
42
43    def test_infer_return_type_multiples(self):
44        self.assertIs(str, tempfile._infer_return_type('', ''))
45        self.assertIs(bytes, tempfile._infer_return_type(b'', b''))
46        with self.assertRaises(TypeError):
47            tempfile._infer_return_type('', b'')
48        with self.assertRaises(TypeError):
49            tempfile._infer_return_type(b'', '')
50
51    def test_infer_return_type_multiples_and_none(self):
52        self.assertIs(str, tempfile._infer_return_type(None, ''))
53        self.assertIs(str, tempfile._infer_return_type('', None))
54        self.assertIs(str, tempfile._infer_return_type(None, None))
55        self.assertIs(bytes, tempfile._infer_return_type(b'', None))
56        self.assertIs(bytes, tempfile._infer_return_type(None, b''))
57        with self.assertRaises(TypeError):
58            tempfile._infer_return_type('', None, b'')
59        with self.assertRaises(TypeError):
60            tempfile._infer_return_type(b'', None, '')
61
62    def test_infer_return_type_pathlib(self):
63        self.assertIs(str, tempfile._infer_return_type(pathlib.Path('/')))
64
65    def test_infer_return_type_pathlike(self):
66        class Path:
67            def __init__(self, path):
68                self.path = path
69
70            def __fspath__(self):
71                return self.path
72
73        self.assertIs(str, tempfile._infer_return_type(Path('/')))
74        self.assertIs(bytes, tempfile._infer_return_type(Path(b'/')))
75        self.assertIs(str, tempfile._infer_return_type('', Path('')))
76        self.assertIs(bytes, tempfile._infer_return_type(b'', Path(b'')))
77        self.assertIs(bytes, tempfile._infer_return_type(None, Path(b'')))
78        self.assertIs(str, tempfile._infer_return_type(None, Path('')))
79
80        with self.assertRaises(TypeError):
81            tempfile._infer_return_type('', Path(b''))
82        with self.assertRaises(TypeError):
83            tempfile._infer_return_type(b'', Path(''))
84
85# Common functionality.
86
87class BaseTestCase(unittest.TestCase):
88
89    str_check = re.compile(r"^[a-z0-9_-]{8}$")
90    b_check = re.compile(br"^[a-z0-9_-]{8}$")
91
92    def setUp(self):
93        self._warnings_manager = warnings_helper.check_warnings()
94        self._warnings_manager.__enter__()
95        warnings.filterwarnings("ignore", category=RuntimeWarning,
96                                message="mktemp", module=__name__)
97
98    def tearDown(self):
99        self._warnings_manager.__exit__(None, None, None)
100
101    def nameCheck(self, name, dir, pre, suf):
102        (ndir, nbase) = os.path.split(name)
103        npre  = nbase[:len(pre)]
104        nsuf  = nbase[len(nbase)-len(suf):]
105
106        if dir is not None:
107            self.assertIs(
108                type(name),
109                str
110                if type(dir) is str or isinstance(dir, os.PathLike) else
111                bytes,
112                "unexpected return type",
113            )
114        if pre is not None:
115            self.assertIs(type(name), str if type(pre) is str else bytes,
116                          "unexpected return type")
117        if suf is not None:
118            self.assertIs(type(name), str if type(suf) is str else bytes,
119                          "unexpected return type")
120        if (dir, pre, suf) == (None, None, None):
121            self.assertIs(type(name), str, "default return type must be str")
122
123        # check for equality of the absolute paths!
124        self.assertEqual(os.path.abspath(ndir), os.path.abspath(dir),
125                         "file %r not in directory %r" % (name, dir))
126        self.assertEqual(npre, pre,
127                         "file %r does not begin with %r" % (nbase, pre))
128        self.assertEqual(nsuf, suf,
129                         "file %r does not end with %r" % (nbase, suf))
130
131        nbase = nbase[len(pre):len(nbase)-len(suf)]
132        check = self.str_check if isinstance(nbase, str) else self.b_check
133        self.assertTrue(check.match(nbase),
134                        "random characters %r do not match %r"
135                        % (nbase, check.pattern))
136
137
138class TestExports(BaseTestCase):
139    def test_exports(self):
140        # There are no surprising symbols in the tempfile module
141        dict = tempfile.__dict__
142
143        expected = {
144            "NamedTemporaryFile" : 1,
145            "TemporaryFile" : 1,
146            "mkstemp" : 1,
147            "mkdtemp" : 1,
148            "mktemp" : 1,
149            "TMP_MAX" : 1,
150            "gettempprefix" : 1,
151            "gettempprefixb" : 1,
152            "gettempdir" : 1,
153            "gettempdirb" : 1,
154            "tempdir" : 1,
155            "template" : 1,
156            "SpooledTemporaryFile" : 1,
157            "TemporaryDirectory" : 1,
158        }
159
160        unexp = []
161        for key in dict:
162            if key[0] != '_' and key not in expected:
163                unexp.append(key)
164        self.assertTrue(len(unexp) == 0,
165                        "unexpected keys: %s" % unexp)
166
167
168class TestRandomNameSequence(BaseTestCase):
169    """Test the internal iterator object _RandomNameSequence."""
170
171    def setUp(self):
172        self.r = tempfile._RandomNameSequence()
173        super().setUp()
174
175    def test_get_eight_char_str(self):
176        # _RandomNameSequence returns a eight-character string
177        s = next(self.r)
178        self.nameCheck(s, '', '', '')
179
180    def test_many(self):
181        # _RandomNameSequence returns no duplicate strings (stochastic)
182
183        dict = {}
184        r = self.r
185        for i in range(TEST_FILES):
186            s = next(r)
187            self.nameCheck(s, '', '', '')
188            self.assertNotIn(s, dict)
189            dict[s] = 1
190
191    def supports_iter(self):
192        # _RandomNameSequence supports the iterator protocol
193
194        i = 0
195        r = self.r
196        for s in r:
197            i += 1
198            if i == 20:
199                break
200
201    @unittest.skipUnless(hasattr(os, 'fork'),
202        "os.fork is required for this test")
203    def test_process_awareness(self):
204        # ensure that the random source differs between
205        # child and parent.
206        read_fd, write_fd = os.pipe()
207        pid = None
208        try:
209            pid = os.fork()
210            if not pid:
211                # child process
212                os.close(read_fd)
213                os.write(write_fd, next(self.r).encode("ascii"))
214                os.close(write_fd)
215                # bypass the normal exit handlers- leave those to
216                # the parent.
217                os._exit(0)
218
219            # parent process
220            parent_value = next(self.r)
221            child_value = os.read(read_fd, len(parent_value)).decode("ascii")
222        finally:
223            if pid:
224                support.wait_process(pid, exitcode=0)
225
226            os.close(read_fd)
227            os.close(write_fd)
228        self.assertNotEqual(child_value, parent_value)
229
230
231
232class TestCandidateTempdirList(BaseTestCase):
233    """Test the internal function _candidate_tempdir_list."""
234
235    def test_nonempty_list(self):
236        # _candidate_tempdir_list returns a nonempty list of strings
237
238        cand = tempfile._candidate_tempdir_list()
239
240        self.assertFalse(len(cand) == 0)
241        for c in cand:
242            self.assertIsInstance(c, str)
243
244    def test_wanted_dirs(self):
245        # _candidate_tempdir_list contains the expected directories
246
247        # Make sure the interesting environment variables are all set.
248        with os_helper.EnvironmentVarGuard() as env:
249            for envname in 'TMPDIR', 'TEMP', 'TMP':
250                dirname = os.getenv(envname)
251                if not dirname:
252                    env[envname] = os.path.abspath(envname)
253
254            cand = tempfile._candidate_tempdir_list()
255
256            for envname in 'TMPDIR', 'TEMP', 'TMP':
257                dirname = os.getenv(envname)
258                if not dirname: raise ValueError
259                self.assertIn(dirname, cand)
260
261            try:
262                dirname = os.getcwd()
263            except (AttributeError, OSError):
264                dirname = os.curdir
265
266            self.assertIn(dirname, cand)
267
268            # Not practical to try to verify the presence of OS-specific
269            # paths in this list.
270
271
272# We test _get_default_tempdir some more by testing gettempdir.
273
274class TestGetDefaultTempdir(BaseTestCase):
275    """Test _get_default_tempdir()."""
276
277    def test_no_files_left_behind(self):
278        # use a private empty directory
279        with tempfile.TemporaryDirectory() as our_temp_directory:
280            # force _get_default_tempdir() to consider our empty directory
281            def our_candidate_list():
282                return [our_temp_directory]
283
284            with support.swap_attr(tempfile, "_candidate_tempdir_list",
285                                   our_candidate_list):
286                # verify our directory is empty after _get_default_tempdir()
287                tempfile._get_default_tempdir()
288                self.assertEqual(os.listdir(our_temp_directory), [])
289
290                def raise_OSError(*args, **kwargs):
291                    raise OSError()
292
293                with support.swap_attr(io, "open", raise_OSError):
294                    # test again with failing io.open()
295                    with self.assertRaises(FileNotFoundError):
296                        tempfile._get_default_tempdir()
297                    self.assertEqual(os.listdir(our_temp_directory), [])
298
299                def bad_writer(*args, **kwargs):
300                    fp = orig_open(*args, **kwargs)
301                    fp.write = raise_OSError
302                    return fp
303
304                with support.swap_attr(io, "open", bad_writer) as orig_open:
305                    # test again with failing write()
306                    with self.assertRaises(FileNotFoundError):
307                        tempfile._get_default_tempdir()
308                    self.assertEqual(os.listdir(our_temp_directory), [])
309
310
311class TestGetCandidateNames(BaseTestCase):
312    """Test the internal function _get_candidate_names."""
313
314    def test_retval(self):
315        # _get_candidate_names returns a _RandomNameSequence object
316        obj = tempfile._get_candidate_names()
317        self.assertIsInstance(obj, tempfile._RandomNameSequence)
318
319    def test_same_thing(self):
320        # _get_candidate_names always returns the same object
321        a = tempfile._get_candidate_names()
322        b = tempfile._get_candidate_names()
323
324        self.assertTrue(a is b)
325
326
327@contextlib.contextmanager
328def _inside_empty_temp_dir():
329    dir = tempfile.mkdtemp()
330    try:
331        with support.swap_attr(tempfile, 'tempdir', dir):
332            yield
333    finally:
334        os_helper.rmtree(dir)
335
336
337def _mock_candidate_names(*names):
338    return support.swap_attr(tempfile,
339                             '_get_candidate_names',
340                             lambda: iter(names))
341
342
343class TestBadTempdir:
344
345    def test_read_only_directory(self):
346        with _inside_empty_temp_dir():
347            oldmode = mode = os.stat(tempfile.tempdir).st_mode
348            mode &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
349            os.chmod(tempfile.tempdir, mode)
350            try:
351                if os.access(tempfile.tempdir, os.W_OK):
352                    self.skipTest("can't set the directory read-only")
353                with self.assertRaises(PermissionError):
354                    self.make_temp()
355                self.assertEqual(os.listdir(tempfile.tempdir), [])
356            finally:
357                os.chmod(tempfile.tempdir, oldmode)
358
359    def test_nonexisting_directory(self):
360        with _inside_empty_temp_dir():
361            tempdir = os.path.join(tempfile.tempdir, 'nonexistent')
362            with support.swap_attr(tempfile, 'tempdir', tempdir):
363                with self.assertRaises(FileNotFoundError):
364                    self.make_temp()
365
366    def test_non_directory(self):
367        with _inside_empty_temp_dir():
368            tempdir = os.path.join(tempfile.tempdir, 'file')
369            open(tempdir, 'wb').close()
370            with support.swap_attr(tempfile, 'tempdir', tempdir):
371                with self.assertRaises((NotADirectoryError, FileNotFoundError)):
372                    self.make_temp()
373
374
375class TestMkstempInner(TestBadTempdir, BaseTestCase):
376    """Test the internal function _mkstemp_inner."""
377
378    class mkstemped:
379        _bflags = tempfile._bin_openflags
380        _tflags = tempfile._text_openflags
381        _close = os.close
382        _unlink = os.unlink
383
384        def __init__(self, dir, pre, suf, bin):
385            if bin: flags = self._bflags
386            else:   flags = self._tflags
387
388            output_type = tempfile._infer_return_type(dir, pre, suf)
389            (self.fd, self.name) = tempfile._mkstemp_inner(dir, pre, suf, flags, output_type)
390
391        def write(self, str):
392            os.write(self.fd, str)
393
394        def __del__(self):
395            self._close(self.fd)
396            self._unlink(self.name)
397
398    def do_create(self, dir=None, pre=None, suf=None, bin=1):
399        output_type = tempfile._infer_return_type(dir, pre, suf)
400        if dir is None:
401            if output_type is str:
402                dir = tempfile.gettempdir()
403            else:
404                dir = tempfile.gettempdirb()
405        if pre is None:
406            pre = output_type()
407        if suf is None:
408            suf = output_type()
409        file = self.mkstemped(dir, pre, suf, bin)
410
411        self.nameCheck(file.name, dir, pre, suf)
412        return file
413
414    def test_basic(self):
415        # _mkstemp_inner can create files
416        self.do_create().write(b"blat")
417        self.do_create(pre="a").write(b"blat")
418        self.do_create(suf="b").write(b"blat")
419        self.do_create(pre="a", suf="b").write(b"blat")
420        self.do_create(pre="aa", suf=".txt").write(b"blat")
421
422    def test_basic_with_bytes_names(self):
423        # _mkstemp_inner can create files when given name parts all
424        # specified as bytes.
425        dir_b = tempfile.gettempdirb()
426        self.do_create(dir=dir_b, suf=b"").write(b"blat")
427        self.do_create(dir=dir_b, pre=b"a").write(b"blat")
428        self.do_create(dir=dir_b, suf=b"b").write(b"blat")
429        self.do_create(dir=dir_b, pre=b"a", suf=b"b").write(b"blat")
430        self.do_create(dir=dir_b, pre=b"aa", suf=b".txt").write(b"blat")
431        # Can't mix str & binary types in the args.
432        with self.assertRaises(TypeError):
433            self.do_create(dir="", suf=b"").write(b"blat")
434        with self.assertRaises(TypeError):
435            self.do_create(dir=dir_b, pre="").write(b"blat")
436        with self.assertRaises(TypeError):
437            self.do_create(dir=dir_b, pre=b"", suf="").write(b"blat")
438
439    def test_basic_many(self):
440        # _mkstemp_inner can create many files (stochastic)
441        extant = list(range(TEST_FILES))
442        for i in extant:
443            extant[i] = self.do_create(pre="aa")
444
445    def test_choose_directory(self):
446        # _mkstemp_inner can create files in a user-selected directory
447        dir = tempfile.mkdtemp()
448        try:
449            self.do_create(dir=dir).write(b"blat")
450            self.do_create(dir=pathlib.Path(dir)).write(b"blat")
451        finally:
452            support.gc_collect()  # For PyPy or other GCs.
453            os.rmdir(dir)
454
455    def test_file_mode(self):
456        # _mkstemp_inner creates files with the proper mode
457
458        file = self.do_create()
459        mode = stat.S_IMODE(os.stat(file.name).st_mode)
460        expected = 0o600
461        if sys.platform == 'win32':
462            # There's no distinction among 'user', 'group' and 'world';
463            # replicate the 'user' bits.
464            user = expected >> 6
465            expected = user * (1 + 8 + 64)
466        self.assertEqual(mode, expected)
467
468    @unittest.skipUnless(has_spawnl, 'os.spawnl not available')
469    def test_noinherit(self):
470        # _mkstemp_inner file handles are not inherited by child processes
471
472        if support.verbose:
473            v="v"
474        else:
475            v="q"
476
477        file = self.do_create()
478        self.assertEqual(os.get_inheritable(file.fd), False)
479        fd = "%d" % file.fd
480
481        try:
482            me = __file__
483        except NameError:
484            me = sys.argv[0]
485
486        # We have to exec something, so that FD_CLOEXEC will take
487        # effect.  The core of this test is therefore in
488        # tf_inherit_check.py, which see.
489        tester = os.path.join(os.path.dirname(os.path.abspath(me)),
490                              "tf_inherit_check.py")
491
492        # On Windows a spawn* /path/ with embedded spaces shouldn't be quoted,
493        # but an arg with embedded spaces should be decorated with double
494        # quotes on each end
495        if sys.platform == 'win32':
496            decorated = '"%s"' % sys.executable
497            tester = '"%s"' % tester
498        else:
499            decorated = sys.executable
500
501        retval = os.spawnl(os.P_WAIT, sys.executable, decorated, tester, v, fd)
502        self.assertFalse(retval < 0,
503                    "child process caught fatal signal %d" % -retval)
504        self.assertFalse(retval > 0, "child process reports failure %d"%retval)
505
506    @unittest.skipUnless(has_textmode, "text mode not available")
507    def test_textmode(self):
508        # _mkstemp_inner can create files in text mode
509
510        # A text file is truncated at the first Ctrl+Z byte
511        f = self.do_create(bin=0)
512        f.write(b"blat\x1a")
513        f.write(b"extra\n")
514        os.lseek(f.fd, 0, os.SEEK_SET)
515        self.assertEqual(os.read(f.fd, 20), b"blat")
516
517    def make_temp(self):
518        return tempfile._mkstemp_inner(tempfile.gettempdir(),
519                                       tempfile.gettempprefix(),
520                                       '',
521                                       tempfile._bin_openflags,
522                                       str)
523
524    def test_collision_with_existing_file(self):
525        # _mkstemp_inner tries another name when a file with
526        # the chosen name already exists
527        with _inside_empty_temp_dir(), \
528             _mock_candidate_names('aaa', 'aaa', 'bbb'):
529            (fd1, name1) = self.make_temp()
530            os.close(fd1)
531            self.assertTrue(name1.endswith('aaa'))
532
533            (fd2, name2) = self.make_temp()
534            os.close(fd2)
535            self.assertTrue(name2.endswith('bbb'))
536
537    def test_collision_with_existing_directory(self):
538        # _mkstemp_inner tries another name when a directory with
539        # the chosen name already exists
540        with _inside_empty_temp_dir(), \
541             _mock_candidate_names('aaa', 'aaa', 'bbb'):
542            dir = tempfile.mkdtemp()
543            self.assertTrue(dir.endswith('aaa'))
544
545            (fd, name) = self.make_temp()
546            os.close(fd)
547            self.assertTrue(name.endswith('bbb'))
548
549
550class TestGetTempPrefix(BaseTestCase):
551    """Test gettempprefix()."""
552
553    def test_sane_template(self):
554        # gettempprefix returns a nonempty prefix string
555        p = tempfile.gettempprefix()
556
557        self.assertIsInstance(p, str)
558        self.assertGreater(len(p), 0)
559
560        pb = tempfile.gettempprefixb()
561
562        self.assertIsInstance(pb, bytes)
563        self.assertGreater(len(pb), 0)
564
565    def test_usable_template(self):
566        # gettempprefix returns a usable prefix string
567
568        # Create a temp directory, avoiding use of the prefix.
569        # Then attempt to create a file whose name is
570        # prefix + 'xxxxxx.xxx' in that directory.
571        p = tempfile.gettempprefix() + "xxxxxx.xxx"
572        d = tempfile.mkdtemp(prefix="")
573        try:
574            p = os.path.join(d, p)
575            fd = os.open(p, os.O_RDWR | os.O_CREAT)
576            os.close(fd)
577            os.unlink(p)
578        finally:
579            os.rmdir(d)
580
581
582class TestGetTempDir(BaseTestCase):
583    """Test gettempdir()."""
584
585    def test_directory_exists(self):
586        # gettempdir returns a directory which exists
587
588        for d in (tempfile.gettempdir(), tempfile.gettempdirb()):
589            self.assertTrue(os.path.isabs(d) or d == os.curdir,
590                            "%r is not an absolute path" % d)
591            self.assertTrue(os.path.isdir(d),
592                            "%r is not a directory" % d)
593
594    def test_directory_writable(self):
595        # gettempdir returns a directory writable by the user
596
597        # sneaky: just instantiate a NamedTemporaryFile, which
598        # defaults to writing into the directory returned by
599        # gettempdir.
600        with tempfile.NamedTemporaryFile() as file:
601            file.write(b"blat")
602
603    def test_same_thing(self):
604        # gettempdir always returns the same object
605        a = tempfile.gettempdir()
606        b = tempfile.gettempdir()
607        c = tempfile.gettempdirb()
608
609        self.assertTrue(a is b)
610        self.assertNotEqual(type(a), type(c))
611        self.assertEqual(a, os.fsdecode(c))
612
613    def test_case_sensitive(self):
614        # gettempdir should not flatten its case
615        # even on a case-insensitive file system
616        case_sensitive_tempdir = tempfile.mkdtemp("-Temp")
617        _tempdir, tempfile.tempdir = tempfile.tempdir, None
618        try:
619            with os_helper.EnvironmentVarGuard() as env:
620                # Fake the first env var which is checked as a candidate
621                env["TMPDIR"] = case_sensitive_tempdir
622                self.assertEqual(tempfile.gettempdir(), case_sensitive_tempdir)
623        finally:
624            tempfile.tempdir = _tempdir
625            os_helper.rmdir(case_sensitive_tempdir)
626
627
628class TestMkstemp(BaseTestCase):
629    """Test mkstemp()."""
630
631    def do_create(self, dir=None, pre=None, suf=None):
632        output_type = tempfile._infer_return_type(dir, pre, suf)
633        if dir is None:
634            if output_type is str:
635                dir = tempfile.gettempdir()
636            else:
637                dir = tempfile.gettempdirb()
638        if pre is None:
639            pre = output_type()
640        if suf is None:
641            suf = output_type()
642        (fd, name) = tempfile.mkstemp(dir=dir, prefix=pre, suffix=suf)
643        (ndir, nbase) = os.path.split(name)
644        adir = os.path.abspath(dir)
645        self.assertEqual(adir, ndir,
646            "Directory '%s' incorrectly returned as '%s'" % (adir, ndir))
647
648        try:
649            self.nameCheck(name, dir, pre, suf)
650        finally:
651            os.close(fd)
652            os.unlink(name)
653
654    def test_basic(self):
655        # mkstemp can create files
656        self.do_create()
657        self.do_create(pre="a")
658        self.do_create(suf="b")
659        self.do_create(pre="a", suf="b")
660        self.do_create(pre="aa", suf=".txt")
661        self.do_create(dir=".")
662
663    def test_basic_with_bytes_names(self):
664        # mkstemp can create files when given name parts all
665        # specified as bytes.
666        d = tempfile.gettempdirb()
667        self.do_create(dir=d, suf=b"")
668        self.do_create(dir=d, pre=b"a")
669        self.do_create(dir=d, suf=b"b")
670        self.do_create(dir=d, pre=b"a", suf=b"b")
671        self.do_create(dir=d, pre=b"aa", suf=b".txt")
672        self.do_create(dir=b".")
673        with self.assertRaises(TypeError):
674            self.do_create(dir=".", pre=b"aa", suf=b".txt")
675        with self.assertRaises(TypeError):
676            self.do_create(dir=b".", pre="aa", suf=b".txt")
677        with self.assertRaises(TypeError):
678            self.do_create(dir=b".", pre=b"aa", suf=".txt")
679
680
681    def test_choose_directory(self):
682        # mkstemp can create directories in a user-selected directory
683        dir = tempfile.mkdtemp()
684        try:
685            self.do_create(dir=dir)
686            self.do_create(dir=pathlib.Path(dir))
687        finally:
688            os.rmdir(dir)
689
690    def test_for_tempdir_is_bytes_issue40701_api_warts(self):
691        orig_tempdir = tempfile.tempdir
692        self.assertIsInstance(tempfile.tempdir, (str, type(None)))
693        try:
694            fd, path = tempfile.mkstemp()
695            os.close(fd)
696            os.unlink(path)
697            self.assertIsInstance(path, str)
698            tempfile.tempdir = tempfile.gettempdirb()
699            self.assertIsInstance(tempfile.tempdir, bytes)
700            self.assertIsInstance(tempfile.gettempdir(), str)
701            self.assertIsInstance(tempfile.gettempdirb(), bytes)
702            fd, path = tempfile.mkstemp()
703            os.close(fd)
704            os.unlink(path)
705            self.assertIsInstance(path, bytes)
706            fd, path = tempfile.mkstemp(suffix='.txt')
707            os.close(fd)
708            os.unlink(path)
709            self.assertIsInstance(path, str)
710            fd, path = tempfile.mkstemp(prefix='test-temp-')
711            os.close(fd)
712            os.unlink(path)
713            self.assertIsInstance(path, str)
714            fd, path = tempfile.mkstemp(dir=tempfile.gettempdir())
715            os.close(fd)
716            os.unlink(path)
717            self.assertIsInstance(path, str)
718        finally:
719            tempfile.tempdir = orig_tempdir
720
721
722class TestMkdtemp(TestBadTempdir, BaseTestCase):
723    """Test mkdtemp()."""
724
725    def make_temp(self):
726        return tempfile.mkdtemp()
727
728    def do_create(self, dir=None, pre=None, suf=None):
729        output_type = tempfile._infer_return_type(dir, pre, suf)
730        if dir is None:
731            if output_type is str:
732                dir = tempfile.gettempdir()
733            else:
734                dir = tempfile.gettempdirb()
735        if pre is None:
736            pre = output_type()
737        if suf is None:
738            suf = output_type()
739        name = tempfile.mkdtemp(dir=dir, prefix=pre, suffix=suf)
740
741        try:
742            self.nameCheck(name, dir, pre, suf)
743            return name
744        except:
745            os.rmdir(name)
746            raise
747
748    def test_basic(self):
749        # mkdtemp can create directories
750        os.rmdir(self.do_create())
751        os.rmdir(self.do_create(pre="a"))
752        os.rmdir(self.do_create(suf="b"))
753        os.rmdir(self.do_create(pre="a", suf="b"))
754        os.rmdir(self.do_create(pre="aa", suf=".txt"))
755
756    def test_basic_with_bytes_names(self):
757        # mkdtemp can create directories when given all binary parts
758        d = tempfile.gettempdirb()
759        os.rmdir(self.do_create(dir=d))
760        os.rmdir(self.do_create(dir=d, pre=b"a"))
761        os.rmdir(self.do_create(dir=d, suf=b"b"))
762        os.rmdir(self.do_create(dir=d, pre=b"a", suf=b"b"))
763        os.rmdir(self.do_create(dir=d, pre=b"aa", suf=b".txt"))
764        with self.assertRaises(TypeError):
765            os.rmdir(self.do_create(dir=d, pre="aa", suf=b".txt"))
766        with self.assertRaises(TypeError):
767            os.rmdir(self.do_create(dir=d, pre=b"aa", suf=".txt"))
768        with self.assertRaises(TypeError):
769            os.rmdir(self.do_create(dir="", pre=b"aa", suf=b".txt"))
770
771    def test_basic_many(self):
772        # mkdtemp can create many directories (stochastic)
773        extant = list(range(TEST_FILES))
774        try:
775            for i in extant:
776                extant[i] = self.do_create(pre="aa")
777        finally:
778            for i in extant:
779                if(isinstance(i, str)):
780                    os.rmdir(i)
781
782    def test_choose_directory(self):
783        # mkdtemp can create directories in a user-selected directory
784        dir = tempfile.mkdtemp()
785        try:
786            os.rmdir(self.do_create(dir=dir))
787            os.rmdir(self.do_create(dir=pathlib.Path(dir)))
788        finally:
789            os.rmdir(dir)
790
791    def test_mode(self):
792        # mkdtemp creates directories with the proper mode
793
794        dir = self.do_create()
795        try:
796            mode = stat.S_IMODE(os.stat(dir).st_mode)
797            mode &= 0o777 # Mask off sticky bits inherited from /tmp
798            expected = 0o700
799            if sys.platform == 'win32':
800                # There's no distinction among 'user', 'group' and 'world';
801                # replicate the 'user' bits.
802                user = expected >> 6
803                expected = user * (1 + 8 + 64)
804            self.assertEqual(mode, expected)
805        finally:
806            os.rmdir(dir)
807
808    def test_collision_with_existing_file(self):
809        # mkdtemp tries another name when a file with
810        # the chosen name already exists
811        with _inside_empty_temp_dir(), \
812             _mock_candidate_names('aaa', 'aaa', 'bbb'):
813            file = tempfile.NamedTemporaryFile(delete=False)
814            file.close()
815            self.assertTrue(file.name.endswith('aaa'))
816            dir = tempfile.mkdtemp()
817            self.assertTrue(dir.endswith('bbb'))
818
819    def test_collision_with_existing_directory(self):
820        # mkdtemp tries another name when a directory with
821        # the chosen name already exists
822        with _inside_empty_temp_dir(), \
823             _mock_candidate_names('aaa', 'aaa', 'bbb'):
824            dir1 = tempfile.mkdtemp()
825            self.assertTrue(dir1.endswith('aaa'))
826            dir2 = tempfile.mkdtemp()
827            self.assertTrue(dir2.endswith('bbb'))
828
829    def test_for_tempdir_is_bytes_issue40701_api_warts(self):
830        orig_tempdir = tempfile.tempdir
831        self.assertIsInstance(tempfile.tempdir, (str, type(None)))
832        try:
833            path = tempfile.mkdtemp()
834            os.rmdir(path)
835            self.assertIsInstance(path, str)
836            tempfile.tempdir = tempfile.gettempdirb()
837            self.assertIsInstance(tempfile.tempdir, bytes)
838            self.assertIsInstance(tempfile.gettempdir(), str)
839            self.assertIsInstance(tempfile.gettempdirb(), bytes)
840            path = tempfile.mkdtemp()
841            os.rmdir(path)
842            self.assertIsInstance(path, bytes)
843            path = tempfile.mkdtemp(suffix='-dir')
844            os.rmdir(path)
845            self.assertIsInstance(path, str)
846            path = tempfile.mkdtemp(prefix='test-mkdtemp-')
847            os.rmdir(path)
848            self.assertIsInstance(path, str)
849            path = tempfile.mkdtemp(dir=tempfile.gettempdir())
850            os.rmdir(path)
851            self.assertIsInstance(path, str)
852        finally:
853            tempfile.tempdir = orig_tempdir
854
855
856class TestMktemp(BaseTestCase):
857    """Test mktemp()."""
858
859    # For safety, all use of mktemp must occur in a private directory.
860    # We must also suppress the RuntimeWarning it generates.
861    def setUp(self):
862        self.dir = tempfile.mkdtemp()
863        super().setUp()
864
865    def tearDown(self):
866        if self.dir:
867            os.rmdir(self.dir)
868            self.dir = None
869        super().tearDown()
870
871    class mktemped:
872        _unlink = os.unlink
873        _bflags = tempfile._bin_openflags
874
875        def __init__(self, dir, pre, suf):
876            self.name = tempfile.mktemp(dir=dir, prefix=pre, suffix=suf)
877            # Create the file.  This will raise an exception if it's
878            # mysteriously appeared in the meanwhile.
879            os.close(os.open(self.name, self._bflags, 0o600))
880
881        def __del__(self):
882            self._unlink(self.name)
883
884    def do_create(self, pre="", suf=""):
885        file = self.mktemped(self.dir, pre, suf)
886
887        self.nameCheck(file.name, self.dir, pre, suf)
888        return file
889
890    def test_basic(self):
891        # mktemp can choose usable file names
892        self.do_create()
893        self.do_create(pre="a")
894        self.do_create(suf="b")
895        self.do_create(pre="a", suf="b")
896        self.do_create(pre="aa", suf=".txt")
897
898    def test_many(self):
899        # mktemp can choose many usable file names (stochastic)
900        extant = list(range(TEST_FILES))
901        for i in extant:
902            extant[i] = self.do_create(pre="aa")
903        del extant
904        support.gc_collect()  # For PyPy or other GCs.
905
906##     def test_warning(self):
907##         # mktemp issues a warning when used
908##         warnings.filterwarnings("error",
909##                                 category=RuntimeWarning,
910##                                 message="mktemp")
911##         self.assertRaises(RuntimeWarning,
912##                           tempfile.mktemp, dir=self.dir)
913
914
915# We test _TemporaryFileWrapper by testing NamedTemporaryFile.
916
917
918class TestNamedTemporaryFile(BaseTestCase):
919    """Test NamedTemporaryFile()."""
920
921    def do_create(self, dir=None, pre="", suf="", delete=True):
922        if dir is None:
923            dir = tempfile.gettempdir()
924        file = tempfile.NamedTemporaryFile(dir=dir, prefix=pre, suffix=suf,
925                                           delete=delete)
926
927        self.nameCheck(file.name, dir, pre, suf)
928        return file
929
930
931    def test_basic(self):
932        # NamedTemporaryFile can create files
933        self.do_create()
934        self.do_create(pre="a")
935        self.do_create(suf="b")
936        self.do_create(pre="a", suf="b")
937        self.do_create(pre="aa", suf=".txt")
938
939    def test_method_lookup(self):
940        # Issue #18879: Looking up a temporary file method should keep it
941        # alive long enough.
942        f = self.do_create()
943        wr = weakref.ref(f)
944        write = f.write
945        write2 = f.write
946        del f
947        write(b'foo')
948        del write
949        write2(b'bar')
950        del write2
951        if support.check_impl_detail(cpython=True):
952            # No reference cycle was created.
953            self.assertIsNone(wr())
954
955    def test_iter(self):
956        # Issue #23700: getting iterator from a temporary file should keep
957        # it alive as long as it's being iterated over
958        lines = [b'spam\n', b'eggs\n', b'beans\n']
959        def make_file():
960            f = tempfile.NamedTemporaryFile(mode='w+b')
961            f.write(b''.join(lines))
962            f.seek(0)
963            return f
964        for i, l in enumerate(make_file()):
965            self.assertEqual(l, lines[i])
966        self.assertEqual(i, len(lines) - 1)
967
968    def test_creates_named(self):
969        # NamedTemporaryFile creates files with names
970        f = tempfile.NamedTemporaryFile()
971        self.assertTrue(os.path.exists(f.name),
972                        "NamedTemporaryFile %s does not exist" % f.name)
973
974    def test_del_on_close(self):
975        # A NamedTemporaryFile is deleted when closed
976        dir = tempfile.mkdtemp()
977        try:
978            with tempfile.NamedTemporaryFile(dir=dir) as f:
979                f.write(b'blat')
980            self.assertFalse(os.path.exists(f.name),
981                        "NamedTemporaryFile %s exists after close" % f.name)
982        finally:
983            os.rmdir(dir)
984
985    def test_dis_del_on_close(self):
986        # Tests that delete-on-close can be disabled
987        dir = tempfile.mkdtemp()
988        tmp = None
989        try:
990            f = tempfile.NamedTemporaryFile(dir=dir, delete=False)
991            tmp = f.name
992            f.write(b'blat')
993            f.close()
994            self.assertTrue(os.path.exists(f.name),
995                        "NamedTemporaryFile %s missing after close" % f.name)
996        finally:
997            if tmp is not None:
998                os.unlink(tmp)
999            os.rmdir(dir)
1000
1001    def test_multiple_close(self):
1002        # A NamedTemporaryFile can be closed many times without error
1003        f = tempfile.NamedTemporaryFile()
1004        f.write(b'abc\n')
1005        f.close()
1006        f.close()
1007        f.close()
1008
1009    def test_context_manager(self):
1010        # A NamedTemporaryFile can be used as a context manager
1011        with tempfile.NamedTemporaryFile() as f:
1012            self.assertTrue(os.path.exists(f.name))
1013        self.assertFalse(os.path.exists(f.name))
1014        def use_closed():
1015            with f:
1016                pass
1017        self.assertRaises(ValueError, use_closed)
1018
1019    def test_no_leak_fd(self):
1020        # Issue #21058: don't leak file descriptor when io.open() fails
1021        closed = []
1022        os_close = os.close
1023        def close(fd):
1024            closed.append(fd)
1025            os_close(fd)
1026
1027        with mock.patch('os.close', side_effect=close):
1028            with mock.patch('io.open', side_effect=ValueError):
1029                self.assertRaises(ValueError, tempfile.NamedTemporaryFile)
1030                self.assertEqual(len(closed), 1)
1031
1032    def test_bad_mode(self):
1033        dir = tempfile.mkdtemp()
1034        self.addCleanup(os_helper.rmtree, dir)
1035        with self.assertRaises(ValueError):
1036            tempfile.NamedTemporaryFile(mode='wr', dir=dir)
1037        with self.assertRaises(TypeError):
1038            tempfile.NamedTemporaryFile(mode=2, dir=dir)
1039        self.assertEqual(os.listdir(dir), [])
1040
1041    # How to test the mode and bufsize parameters?
1042
1043class TestSpooledTemporaryFile(BaseTestCase):
1044    """Test SpooledTemporaryFile()."""
1045
1046    def do_create(self, max_size=0, dir=None, pre="", suf=""):
1047        if dir is None:
1048            dir = tempfile.gettempdir()
1049        file = tempfile.SpooledTemporaryFile(max_size=max_size, dir=dir, prefix=pre, suffix=suf)
1050
1051        return file
1052
1053
1054    def test_basic(self):
1055        # SpooledTemporaryFile can create files
1056        f = self.do_create()
1057        self.assertFalse(f._rolled)
1058        f = self.do_create(max_size=100, pre="a", suf=".txt")
1059        self.assertFalse(f._rolled)
1060
1061    def test_del_on_close(self):
1062        # A SpooledTemporaryFile is deleted when closed
1063        dir = tempfile.mkdtemp()
1064        try:
1065            f = tempfile.SpooledTemporaryFile(max_size=10, dir=dir)
1066            self.assertFalse(f._rolled)
1067            f.write(b'blat ' * 5)
1068            self.assertTrue(f._rolled)
1069            filename = f.name
1070            f.close()
1071            self.assertFalse(isinstance(filename, str) and os.path.exists(filename),
1072                        "SpooledTemporaryFile %s exists after close" % filename)
1073        finally:
1074            os.rmdir(dir)
1075
1076    def test_rewrite_small(self):
1077        # A SpooledTemporaryFile can be written to multiple within the max_size
1078        f = self.do_create(max_size=30)
1079        self.assertFalse(f._rolled)
1080        for i in range(5):
1081            f.seek(0, 0)
1082            f.write(b'x' * 20)
1083        self.assertFalse(f._rolled)
1084
1085    def test_write_sequential(self):
1086        # A SpooledTemporaryFile should hold exactly max_size bytes, and roll
1087        # over afterward
1088        f = self.do_create(max_size=30)
1089        self.assertFalse(f._rolled)
1090        f.write(b'x' * 20)
1091        self.assertFalse(f._rolled)
1092        f.write(b'x' * 10)
1093        self.assertFalse(f._rolled)
1094        f.write(b'x')
1095        self.assertTrue(f._rolled)
1096
1097    def test_writelines(self):
1098        # Verify writelines with a SpooledTemporaryFile
1099        f = self.do_create()
1100        f.writelines((b'x', b'y', b'z'))
1101        pos = f.seek(0)
1102        self.assertEqual(pos, 0)
1103        buf = f.read()
1104        self.assertEqual(buf, b'xyz')
1105
1106    def test_writelines_sequential(self):
1107        # A SpooledTemporaryFile should hold exactly max_size bytes, and roll
1108        # over afterward
1109        f = self.do_create(max_size=35)
1110        f.writelines((b'x' * 20, b'x' * 10, b'x' * 5))
1111        self.assertFalse(f._rolled)
1112        f.write(b'x')
1113        self.assertTrue(f._rolled)
1114
1115    def test_sparse(self):
1116        # A SpooledTemporaryFile that is written late in the file will extend
1117        # when that occurs
1118        f = self.do_create(max_size=30)
1119        self.assertFalse(f._rolled)
1120        pos = f.seek(100, 0)
1121        self.assertEqual(pos, 100)
1122        self.assertFalse(f._rolled)
1123        f.write(b'x')
1124        self.assertTrue(f._rolled)
1125
1126    def test_fileno(self):
1127        # A SpooledTemporaryFile should roll over to a real file on fileno()
1128        f = self.do_create(max_size=30)
1129        self.assertFalse(f._rolled)
1130        self.assertTrue(f.fileno() > 0)
1131        self.assertTrue(f._rolled)
1132
1133    def test_multiple_close_before_rollover(self):
1134        # A SpooledTemporaryFile can be closed many times without error
1135        f = tempfile.SpooledTemporaryFile()
1136        f.write(b'abc\n')
1137        self.assertFalse(f._rolled)
1138        f.close()
1139        f.close()
1140        f.close()
1141
1142    def test_multiple_close_after_rollover(self):
1143        # A SpooledTemporaryFile can be closed many times without error
1144        f = tempfile.SpooledTemporaryFile(max_size=1)
1145        f.write(b'abc\n')
1146        self.assertTrue(f._rolled)
1147        f.close()
1148        f.close()
1149        f.close()
1150
1151    def test_bound_methods(self):
1152        # It should be OK to steal a bound method from a SpooledTemporaryFile
1153        # and use it independently; when the file rolls over, those bound
1154        # methods should continue to function
1155        f = self.do_create(max_size=30)
1156        read = f.read
1157        write = f.write
1158        seek = f.seek
1159
1160        write(b"a" * 35)
1161        write(b"b" * 35)
1162        seek(0, 0)
1163        self.assertEqual(read(70), b'a'*35 + b'b'*35)
1164
1165    def test_properties(self):
1166        f = tempfile.SpooledTemporaryFile(max_size=10)
1167        f.write(b'x' * 10)
1168        self.assertFalse(f._rolled)
1169        self.assertEqual(f.mode, 'w+b')
1170        self.assertIsNone(f.name)
1171        with self.assertRaises(AttributeError):
1172            f.newlines
1173        with self.assertRaises(AttributeError):
1174            f.encoding
1175        with self.assertRaises(AttributeError):
1176            f.errors
1177
1178        f.write(b'x')
1179        self.assertTrue(f._rolled)
1180        self.assertEqual(f.mode, 'rb+')
1181        self.assertIsNotNone(f.name)
1182        with self.assertRaises(AttributeError):
1183            f.newlines
1184        with self.assertRaises(AttributeError):
1185            f.encoding
1186        with self.assertRaises(AttributeError):
1187            f.errors
1188
1189    def test_text_mode(self):
1190        # Creating a SpooledTemporaryFile with a text mode should produce
1191        # a file object reading and writing (Unicode) text strings.
1192        f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10,
1193                                          encoding="utf-8")
1194        f.write("abc\n")
1195        f.seek(0)
1196        self.assertEqual(f.read(), "abc\n")
1197        f.write("def\n")
1198        f.seek(0)
1199        self.assertEqual(f.read(), "abc\ndef\n")
1200        self.assertFalse(f._rolled)
1201        self.assertEqual(f.mode, 'w+')
1202        self.assertIsNone(f.name)
1203        self.assertEqual(f.newlines, os.linesep)
1204        self.assertEqual(f.encoding, "utf-8")
1205        self.assertEqual(f.errors, "strict")
1206
1207        f.write("xyzzy\n")
1208        f.seek(0)
1209        self.assertEqual(f.read(), "abc\ndef\nxyzzy\n")
1210        # Check that Ctrl+Z doesn't truncate the file
1211        f.write("foo\x1abar\n")
1212        f.seek(0)
1213        self.assertEqual(f.read(), "abc\ndef\nxyzzy\nfoo\x1abar\n")
1214        self.assertTrue(f._rolled)
1215        self.assertEqual(f.mode, 'w+')
1216        self.assertIsNotNone(f.name)
1217        self.assertEqual(f.newlines, os.linesep)
1218        self.assertEqual(f.encoding, "utf-8")
1219        self.assertEqual(f.errors, "strict")
1220
1221    def test_text_newline_and_encoding(self):
1222        f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10,
1223                                          newline='', encoding='utf-8',
1224                                          errors='ignore')
1225        f.write("\u039B\r\n")
1226        f.seek(0)
1227        self.assertEqual(f.read(), "\u039B\r\n")
1228        self.assertFalse(f._rolled)
1229        self.assertEqual(f.mode, 'w+')
1230        self.assertIsNone(f.name)
1231        self.assertIsNotNone(f.newlines)
1232        self.assertEqual(f.encoding, "utf-8")
1233        self.assertEqual(f.errors, "ignore")
1234
1235        f.write("\u039C" * 10 + "\r\n")
1236        f.write("\u039D" * 20)
1237        f.seek(0)
1238        self.assertEqual(f.read(),
1239                "\u039B\r\n" + ("\u039C" * 10) + "\r\n" + ("\u039D" * 20))
1240        self.assertTrue(f._rolled)
1241        self.assertEqual(f.mode, 'w+')
1242        self.assertIsNotNone(f.name)
1243        self.assertIsNotNone(f.newlines)
1244        self.assertEqual(f.encoding, 'utf-8')
1245        self.assertEqual(f.errors, 'ignore')
1246
1247    def test_context_manager_before_rollover(self):
1248        # A SpooledTemporaryFile can be used as a context manager
1249        with tempfile.SpooledTemporaryFile(max_size=1) as f:
1250            self.assertFalse(f._rolled)
1251            self.assertFalse(f.closed)
1252        self.assertTrue(f.closed)
1253        def use_closed():
1254            with f:
1255                pass
1256        self.assertRaises(ValueError, use_closed)
1257
1258    def test_context_manager_during_rollover(self):
1259        # A SpooledTemporaryFile can be used as a context manager
1260        with tempfile.SpooledTemporaryFile(max_size=1) as f:
1261            self.assertFalse(f._rolled)
1262            f.write(b'abc\n')
1263            f.flush()
1264            self.assertTrue(f._rolled)
1265            self.assertFalse(f.closed)
1266        self.assertTrue(f.closed)
1267        def use_closed():
1268            with f:
1269                pass
1270        self.assertRaises(ValueError, use_closed)
1271
1272    def test_context_manager_after_rollover(self):
1273        # A SpooledTemporaryFile can be used as a context manager
1274        f = tempfile.SpooledTemporaryFile(max_size=1)
1275        f.write(b'abc\n')
1276        f.flush()
1277        self.assertTrue(f._rolled)
1278        with f:
1279            self.assertFalse(f.closed)
1280        self.assertTrue(f.closed)
1281        def use_closed():
1282            with f:
1283                pass
1284        self.assertRaises(ValueError, use_closed)
1285
1286    def test_truncate_with_size_parameter(self):
1287        # A SpooledTemporaryFile can be truncated to zero size
1288        f = tempfile.SpooledTemporaryFile(max_size=10)
1289        f.write(b'abcdefg\n')
1290        f.seek(0)
1291        f.truncate()
1292        self.assertFalse(f._rolled)
1293        self.assertEqual(f._file.getvalue(), b'')
1294        # A SpooledTemporaryFile can be truncated to a specific size
1295        f = tempfile.SpooledTemporaryFile(max_size=10)
1296        f.write(b'abcdefg\n')
1297        f.truncate(4)
1298        self.assertFalse(f._rolled)
1299        self.assertEqual(f._file.getvalue(), b'abcd')
1300        # A SpooledTemporaryFile rolls over if truncated to large size
1301        f = tempfile.SpooledTemporaryFile(max_size=10)
1302        f.write(b'abcdefg\n')
1303        f.truncate(20)
1304        self.assertTrue(f._rolled)
1305        self.assertEqual(os.fstat(f.fileno()).st_size, 20)
1306
1307    def test_class_getitem(self):
1308        self.assertIsInstance(tempfile.SpooledTemporaryFile[bytes],
1309                      types.GenericAlias)
1310
1311if tempfile.NamedTemporaryFile is not tempfile.TemporaryFile:
1312
1313    class TestTemporaryFile(BaseTestCase):
1314        """Test TemporaryFile()."""
1315
1316        def test_basic(self):
1317            # TemporaryFile can create files
1318            # No point in testing the name params - the file has no name.
1319            tempfile.TemporaryFile()
1320
1321        def test_has_no_name(self):
1322            # TemporaryFile creates files with no names (on this system)
1323            dir = tempfile.mkdtemp()
1324            f = tempfile.TemporaryFile(dir=dir)
1325            f.write(b'blat')
1326
1327            # Sneaky: because this file has no name, it should not prevent
1328            # us from removing the directory it was created in.
1329            try:
1330                os.rmdir(dir)
1331            except:
1332                # cleanup
1333                f.close()
1334                os.rmdir(dir)
1335                raise
1336
1337        def test_multiple_close(self):
1338            # A TemporaryFile can be closed many times without error
1339            f = tempfile.TemporaryFile()
1340            f.write(b'abc\n')
1341            f.close()
1342            f.close()
1343            f.close()
1344
1345        # How to test the mode and bufsize parameters?
1346        def test_mode_and_encoding(self):
1347
1348            def roundtrip(input, *args, **kwargs):
1349                with tempfile.TemporaryFile(*args, **kwargs) as fileobj:
1350                    fileobj.write(input)
1351                    fileobj.seek(0)
1352                    self.assertEqual(input, fileobj.read())
1353
1354            roundtrip(b"1234", "w+b")
1355            roundtrip("abdc\n", "w+")
1356            roundtrip("\u039B", "w+", encoding="utf-16")
1357            roundtrip("foo\r\n", "w+", newline="")
1358
1359        def test_no_leak_fd(self):
1360            # Issue #21058: don't leak file descriptor when io.open() fails
1361            closed = []
1362            os_close = os.close
1363            def close(fd):
1364                closed.append(fd)
1365                os_close(fd)
1366
1367            with mock.patch('os.close', side_effect=close):
1368                with mock.patch('io.open', side_effect=ValueError):
1369                    self.assertRaises(ValueError, tempfile.TemporaryFile)
1370                    self.assertEqual(len(closed), 1)
1371
1372
1373
1374# Helper for test_del_on_shutdown
1375class NulledModules:
1376    def __init__(self, *modules):
1377        self.refs = [mod.__dict__ for mod in modules]
1378        self.contents = [ref.copy() for ref in self.refs]
1379
1380    def __enter__(self):
1381        for d in self.refs:
1382            for key in d:
1383                d[key] = None
1384
1385    def __exit__(self, *exc_info):
1386        for d, c in zip(self.refs, self.contents):
1387            d.clear()
1388            d.update(c)
1389
1390
1391class TestTemporaryDirectory(BaseTestCase):
1392    """Test TemporaryDirectory()."""
1393
1394    def do_create(self, dir=None, pre="", suf="", recurse=1, dirs=1, files=1,
1395                  ignore_cleanup_errors=False):
1396        if dir is None:
1397            dir = tempfile.gettempdir()
1398        tmp = tempfile.TemporaryDirectory(
1399            dir=dir, prefix=pre, suffix=suf,
1400            ignore_cleanup_errors=ignore_cleanup_errors)
1401        self.nameCheck(tmp.name, dir, pre, suf)
1402        self.do_create2(tmp.name, recurse, dirs, files)
1403        return tmp
1404
1405    def do_create2(self, path, recurse=1, dirs=1, files=1):
1406        # Create subdirectories and some files
1407        if recurse:
1408            for i in range(dirs):
1409                name = os.path.join(path, "dir%d" % i)
1410                os.mkdir(name)
1411                self.do_create2(name, recurse-1, dirs, files)
1412        for i in range(files):
1413            with open(os.path.join(path, "test%d.txt" % i), "wb") as f:
1414                f.write(b"Hello world!")
1415
1416    def test_mkdtemp_failure(self):
1417        # Check no additional exception if mkdtemp fails
1418        # Previously would raise AttributeError instead
1419        # (noted as part of Issue #10188)
1420        with tempfile.TemporaryDirectory() as nonexistent:
1421            pass
1422        with self.assertRaises(FileNotFoundError) as cm:
1423            tempfile.TemporaryDirectory(dir=nonexistent)
1424        self.assertEqual(cm.exception.errno, errno.ENOENT)
1425
1426    def test_explicit_cleanup(self):
1427        # A TemporaryDirectory is deleted when cleaned up
1428        dir = tempfile.mkdtemp()
1429        try:
1430            d = self.do_create(dir=dir)
1431            self.assertTrue(os.path.exists(d.name),
1432                            "TemporaryDirectory %s does not exist" % d.name)
1433            d.cleanup()
1434            self.assertFalse(os.path.exists(d.name),
1435                        "TemporaryDirectory %s exists after cleanup" % d.name)
1436        finally:
1437            os.rmdir(dir)
1438
1439    def test_explict_cleanup_ignore_errors(self):
1440        """Test that cleanup doesn't return an error when ignoring them."""
1441        with tempfile.TemporaryDirectory() as working_dir:
1442            temp_dir = self.do_create(
1443                dir=working_dir, ignore_cleanup_errors=True)
1444            temp_path = pathlib.Path(temp_dir.name)
1445            self.assertTrue(temp_path.exists(),
1446                            f"TemporaryDirectory {temp_path!s} does not exist")
1447            with open(temp_path / "a_file.txt", "w+t") as open_file:
1448                open_file.write("Hello world!\n")
1449                temp_dir.cleanup()
1450            self.assertEqual(len(list(temp_path.glob("*"))),
1451                             int(sys.platform.startswith("win")),
1452                             "Unexpected number of files in "
1453                             f"TemporaryDirectory {temp_path!s}")
1454            self.assertEqual(
1455                temp_path.exists(),
1456                sys.platform.startswith("win"),
1457                f"TemporaryDirectory {temp_path!s} existence state unexpected")
1458            temp_dir.cleanup()
1459            self.assertFalse(
1460                temp_path.exists(),
1461                f"TemporaryDirectory {temp_path!s} exists after cleanup")
1462
1463    @os_helper.skip_unless_symlink
1464    def test_cleanup_with_symlink_to_a_directory(self):
1465        # cleanup() should not follow symlinks to directories (issue #12464)
1466        d1 = self.do_create()
1467        d2 = self.do_create(recurse=0)
1468
1469        # Symlink d1/foo -> d2
1470        os.symlink(d2.name, os.path.join(d1.name, "foo"))
1471
1472        # This call to cleanup() should not follow the "foo" symlink
1473        d1.cleanup()
1474
1475        self.assertFalse(os.path.exists(d1.name),
1476                         "TemporaryDirectory %s exists after cleanup" % d1.name)
1477        self.assertTrue(os.path.exists(d2.name),
1478                        "Directory pointed to by a symlink was deleted")
1479        self.assertEqual(os.listdir(d2.name), ['test0.txt'],
1480                         "Contents of the directory pointed to by a symlink "
1481                         "were deleted")
1482        d2.cleanup()
1483
1484    @support.cpython_only
1485    def test_del_on_collection(self):
1486        # A TemporaryDirectory is deleted when garbage collected
1487        dir = tempfile.mkdtemp()
1488        try:
1489            d = self.do_create(dir=dir)
1490            name = d.name
1491            del d # Rely on refcounting to invoke __del__
1492            self.assertFalse(os.path.exists(name),
1493                        "TemporaryDirectory %s exists after __del__" % name)
1494        finally:
1495            os.rmdir(dir)
1496
1497    @support.cpython_only
1498    def test_del_on_collection_ignore_errors(self):
1499        """Test that ignoring errors works when TemporaryDirectory is gced."""
1500        with tempfile.TemporaryDirectory() as working_dir:
1501            temp_dir = self.do_create(
1502                dir=working_dir, ignore_cleanup_errors=True)
1503            temp_path = pathlib.Path(temp_dir.name)
1504            self.assertTrue(temp_path.exists(),
1505                            f"TemporaryDirectory {temp_path!s} does not exist")
1506            with open(temp_path / "a_file.txt", "w+t") as open_file:
1507                open_file.write("Hello world!\n")
1508                del temp_dir
1509            self.assertEqual(len(list(temp_path.glob("*"))),
1510                             int(sys.platform.startswith("win")),
1511                             "Unexpected number of files in "
1512                             f"TemporaryDirectory {temp_path!s}")
1513            self.assertEqual(
1514                temp_path.exists(),
1515                sys.platform.startswith("win"),
1516                f"TemporaryDirectory {temp_path!s} existence state unexpected")
1517
1518    def test_del_on_shutdown(self):
1519        # A TemporaryDirectory may be cleaned up during shutdown
1520        with self.do_create() as dir:
1521            for mod in ('builtins', 'os', 'shutil', 'sys', 'tempfile', 'warnings'):
1522                code = """if True:
1523                    import builtins
1524                    import os
1525                    import shutil
1526                    import sys
1527                    import tempfile
1528                    import warnings
1529
1530                    tmp = tempfile.TemporaryDirectory(dir={dir!r})
1531                    sys.stdout.buffer.write(tmp.name.encode())
1532
1533                    tmp2 = os.path.join(tmp.name, 'test_dir')
1534                    os.mkdir(tmp2)
1535                    with open(os.path.join(tmp2, "test0.txt"), "w") as f:
1536                        f.write("Hello world!")
1537
1538                    {mod}.tmp = tmp
1539
1540                    warnings.filterwarnings("always", category=ResourceWarning)
1541                    """.format(dir=dir, mod=mod)
1542                rc, out, err = script_helper.assert_python_ok("-c", code)
1543                tmp_name = out.decode().strip()
1544                self.assertFalse(os.path.exists(tmp_name),
1545                            "TemporaryDirectory %s exists after cleanup" % tmp_name)
1546                err = err.decode('utf-8', 'backslashreplace')
1547                self.assertNotIn("Exception ", err)
1548                self.assertIn("ResourceWarning: Implicitly cleaning up", err)
1549
1550    def test_del_on_shutdown_ignore_errors(self):
1551        """Test ignoring errors works when a tempdir is gc'ed on shutdown."""
1552        with tempfile.TemporaryDirectory() as working_dir:
1553            code = """if True:
1554                import pathlib
1555                import sys
1556                import tempfile
1557                import warnings
1558
1559                temp_dir = tempfile.TemporaryDirectory(
1560                    dir={working_dir!r}, ignore_cleanup_errors=True)
1561                sys.stdout.buffer.write(temp_dir.name.encode())
1562
1563                temp_dir_2 = pathlib.Path(temp_dir.name) / "test_dir"
1564                temp_dir_2.mkdir()
1565                with open(temp_dir_2 / "test0.txt", "w") as test_file:
1566                    test_file.write("Hello world!")
1567                open_file = open(temp_dir_2 / "open_file.txt", "w")
1568                open_file.write("Hello world!")
1569
1570                warnings.filterwarnings("always", category=ResourceWarning)
1571                """.format(working_dir=working_dir)
1572            __, out, err = script_helper.assert_python_ok("-c", code)
1573            temp_path = pathlib.Path(out.decode().strip())
1574            self.assertEqual(len(list(temp_path.glob("*"))),
1575                             int(sys.platform.startswith("win")),
1576                             "Unexpected number of files in "
1577                             f"TemporaryDirectory {temp_path!s}")
1578            self.assertEqual(
1579                temp_path.exists(),
1580                sys.platform.startswith("win"),
1581                f"TemporaryDirectory {temp_path!s} existence state unexpected")
1582            err = err.decode('utf-8', 'backslashreplace')
1583            self.assertNotIn("Exception", err)
1584            self.assertNotIn("Error", err)
1585            self.assertIn("ResourceWarning: Implicitly cleaning up", err)
1586
1587    def test_exit_on_shutdown(self):
1588        # Issue #22427
1589        with self.do_create() as dir:
1590            code = """if True:
1591                import sys
1592                import tempfile
1593                import warnings
1594
1595                def generator():
1596                    with tempfile.TemporaryDirectory(dir={dir!r}) as tmp:
1597                        yield tmp
1598                g = generator()
1599                sys.stdout.buffer.write(next(g).encode())
1600
1601                warnings.filterwarnings("always", category=ResourceWarning)
1602                """.format(dir=dir)
1603            rc, out, err = script_helper.assert_python_ok("-c", code)
1604            tmp_name = out.decode().strip()
1605            self.assertFalse(os.path.exists(tmp_name),
1606                        "TemporaryDirectory %s exists after cleanup" % tmp_name)
1607            err = err.decode('utf-8', 'backslashreplace')
1608            self.assertNotIn("Exception ", err)
1609            self.assertIn("ResourceWarning: Implicitly cleaning up", err)
1610
1611    def test_warnings_on_cleanup(self):
1612        # ResourceWarning will be triggered by __del__
1613        with self.do_create() as dir:
1614            d = self.do_create(dir=dir, recurse=3)
1615            name = d.name
1616
1617            # Check for the resource warning
1618            with warnings_helper.check_warnings(('Implicitly',
1619                                                 ResourceWarning),
1620                                                quiet=False):
1621                warnings.filterwarnings("always", category=ResourceWarning)
1622                del d
1623                support.gc_collect()
1624            self.assertFalse(os.path.exists(name),
1625                        "TemporaryDirectory %s exists after __del__" % name)
1626
1627    def test_multiple_close(self):
1628        # Can be cleaned-up many times without error
1629        d = self.do_create()
1630        d.cleanup()
1631        d.cleanup()
1632        d.cleanup()
1633
1634    def test_context_manager(self):
1635        # Can be used as a context manager
1636        d = self.do_create()
1637        with d as name:
1638            self.assertTrue(os.path.exists(name))
1639            self.assertEqual(name, d.name)
1640        self.assertFalse(os.path.exists(name))
1641
1642    def test_modes(self):
1643        for mode in range(8):
1644            mode <<= 6
1645            with self.subTest(mode=format(mode, '03o')):
1646                d = self.do_create(recurse=3, dirs=2, files=2)
1647                with d:
1648                    # Change files and directories mode recursively.
1649                    for root, dirs, files in os.walk(d.name, topdown=False):
1650                        for name in files:
1651                            os.chmod(os.path.join(root, name), mode)
1652                        os.chmod(root, mode)
1653                    d.cleanup()
1654                self.assertFalse(os.path.exists(d.name))
1655
1656    @unittest.skipUnless(hasattr(os, 'chflags'), 'requires os.lchflags')
1657    def test_flags(self):
1658        flags = stat.UF_IMMUTABLE | stat.UF_NOUNLINK
1659        d = self.do_create(recurse=3, dirs=2, files=2)
1660        with d:
1661            # Change files and directories flags recursively.
1662            for root, dirs, files in os.walk(d.name, topdown=False):
1663                for name in files:
1664                    os.chflags(os.path.join(root, name), flags)
1665                os.chflags(root, flags)
1666            d.cleanup()
1667        self.assertFalse(os.path.exists(d.name))
1668
1669
1670if __name__ == "__main__":
1671    unittest.main()
1672