1# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
3# portable than they had been thought to be.
4
5import asynchat
6import asyncore
7import codecs
8import contextlib
9import decimal
10import errno
11import fnmatch
12import fractions
13import itertools
14import locale
15import mmap
16import os
17import pickle
18import shutil
19import signal
20import socket
21import stat
22import subprocess
23import sys
24import sysconfig
25import tempfile
26import threading
27import time
28import unittest
29import uuid
30import warnings
31from test import support
32from platform import win32_is_iot
33
34try:
35    import resource
36except ImportError:
37    resource = None
38try:
39    import fcntl
40except ImportError:
41    fcntl = None
42try:
43    import _winapi
44except ImportError:
45    _winapi = None
46try:
47    import pwd
48    all_users = [u.pw_uid for u in pwd.getpwall()]
49except (ImportError, AttributeError):
50    all_users = []
51try:
52    from _testcapi import INT_MAX, PY_SSIZE_T_MAX
53except ImportError:
54    INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
55
56from test.support.script_helper import assert_python_ok
57from test.support import unix_shell, FakePath
58
59
60root_in_posix = False
61if hasattr(os, 'geteuid'):
62    root_in_posix = (os.geteuid() == 0)
63
64# Detect whether we're on a Linux system that uses the (now outdated
65# and unmaintained) linuxthreads threading library.  There's an issue
66# when combining linuxthreads with a failed execv call: see
67# http://bugs.python.org/issue4970.
68if hasattr(sys, 'thread_info') and sys.thread_info.version:
69    USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
70else:
71    USING_LINUXTHREADS = False
72
73# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
74HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
75
76
77def requires_os_func(name):
78    return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
79
80
81def create_file(filename, content=b'content'):
82    with open(filename, "xb", 0) as fp:
83        fp.write(content)
84
85
86class MiscTests(unittest.TestCase):
87    def test_getcwd(self):
88        cwd = os.getcwd()
89        self.assertIsInstance(cwd, str)
90
91    def test_getcwd_long_path(self):
92        # bpo-37412: On Linux, PATH_MAX is usually around 4096 bytes. On
93        # Windows, MAX_PATH is defined as 260 characters, but Windows supports
94        # longer path if longer paths support is enabled. Internally, the os
95        # module uses MAXPATHLEN which is at least 1024.
96        #
97        # Use a directory name of 200 characters to fit into Windows MAX_PATH
98        # limit.
99        #
100        # On Windows, the test can stop when trying to create a path longer
101        # than MAX_PATH if long paths support is disabled:
102        # see RtlAreLongPathsEnabled().
103        min_len = 2000   # characters
104        dirlen = 200     # characters
105        dirname = 'python_test_dir_'
106        dirname = dirname + ('a' * (dirlen - len(dirname)))
107
108        with tempfile.TemporaryDirectory() as tmpdir:
109            with support.change_cwd(tmpdir) as path:
110                expected = path
111
112                while True:
113                    cwd = os.getcwd()
114                    self.assertEqual(cwd, expected)
115
116                    need = min_len - (len(cwd) + len(os.path.sep))
117                    if need <= 0:
118                        break
119                    if len(dirname) > need and need > 0:
120                        dirname = dirname[:need]
121
122                    path = os.path.join(path, dirname)
123                    try:
124                        os.mkdir(path)
125                        # On Windows, chdir() can fail
126                        # even if mkdir() succeeded
127                        os.chdir(path)
128                    except FileNotFoundError:
129                        # On Windows, catch ERROR_PATH_NOT_FOUND (3) and
130                        # ERROR_FILENAME_EXCED_RANGE (206) errors
131                        # ("The filename or extension is too long")
132                        break
133                    except OSError as exc:
134                        if exc.errno == errno.ENAMETOOLONG:
135                            break
136                        else:
137                            raise
138
139                    expected = path
140
141                if support.verbose:
142                    print(f"Tested current directory length: {len(cwd)}")
143
144    def test_getcwdb(self):
145        cwd = os.getcwdb()
146        self.assertIsInstance(cwd, bytes)
147        self.assertEqual(os.fsdecode(cwd), os.getcwd())
148
149
150# Tests creating TESTFN
151class FileTests(unittest.TestCase):
152    def setUp(self):
153        if os.path.lexists(support.TESTFN):
154            os.unlink(support.TESTFN)
155    tearDown = setUp
156
157    def test_access(self):
158        f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
159        os.close(f)
160        self.assertTrue(os.access(support.TESTFN, os.W_OK))
161
162    def test_closerange(self):
163        first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
164        # We must allocate two consecutive file descriptors, otherwise
165        # it will mess up other file descriptors (perhaps even the three
166        # standard ones).
167        second = os.dup(first)
168        try:
169            retries = 0
170            while second != first + 1:
171                os.close(first)
172                retries += 1
173                if retries > 10:
174                    # XXX test skipped
175                    self.skipTest("couldn't allocate two consecutive fds")
176                first, second = second, os.dup(second)
177        finally:
178            os.close(second)
179        # close a fd that is open, and one that isn't
180        os.closerange(first, first + 2)
181        self.assertRaises(OSError, os.write, first, b"a")
182
183    @support.cpython_only
184    def test_rename(self):
185        path = support.TESTFN
186        old = sys.getrefcount(path)
187        self.assertRaises(TypeError, os.rename, path, 0)
188        new = sys.getrefcount(path)
189        self.assertEqual(old, new)
190
191    def test_read(self):
192        with open(support.TESTFN, "w+b") as fobj:
193            fobj.write(b"spam")
194            fobj.flush()
195            fd = fobj.fileno()
196            os.lseek(fd, 0, 0)
197            s = os.read(fd, 4)
198            self.assertEqual(type(s), bytes)
199            self.assertEqual(s, b"spam")
200
201    @support.cpython_only
202    # Skip the test on 32-bit platforms: the number of bytes must fit in a
203    # Py_ssize_t type
204    @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
205                         "needs INT_MAX < PY_SSIZE_T_MAX")
206    @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
207    def test_large_read(self, size):
208        self.addCleanup(support.unlink, support.TESTFN)
209        create_file(support.TESTFN, b'test')
210
211        # Issue #21932: Make sure that os.read() does not raise an
212        # OverflowError for size larger than INT_MAX
213        with open(support.TESTFN, "rb") as fp:
214            data = os.read(fp.fileno(), size)
215
216        # The test does not try to read more than 2 GiB at once because the
217        # operating system is free to return less bytes than requested.
218        self.assertEqual(data, b'test')
219
220    def test_write(self):
221        # os.write() accepts bytes- and buffer-like objects but not strings
222        fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
223        self.assertRaises(TypeError, os.write, fd, "beans")
224        os.write(fd, b"bacon\n")
225        os.write(fd, bytearray(b"eggs\n"))
226        os.write(fd, memoryview(b"spam\n"))
227        os.close(fd)
228        with open(support.TESTFN, "rb") as fobj:
229            self.assertEqual(fobj.read().splitlines(),
230                [b"bacon", b"eggs", b"spam"])
231
232    def write_windows_console(self, *args):
233        retcode = subprocess.call(args,
234            # use a new console to not flood the test output
235            creationflags=subprocess.CREATE_NEW_CONSOLE,
236            # use a shell to hide the console window (SW_HIDE)
237            shell=True)
238        self.assertEqual(retcode, 0)
239
240    @unittest.skipUnless(sys.platform == 'win32',
241                         'test specific to the Windows console')
242    def test_write_windows_console(self):
243        # Issue #11395: the Windows console returns an error (12: not enough
244        # space error) on writing into stdout if stdout mode is binary and the
245        # length is greater than 66,000 bytes (or less, depending on heap
246        # usage).
247        code = "print('x' * 100000)"
248        self.write_windows_console(sys.executable, "-c", code)
249        self.write_windows_console(sys.executable, "-u", "-c", code)
250
251    def fdopen_helper(self, *args):
252        fd = os.open(support.TESTFN, os.O_RDONLY)
253        f = os.fdopen(fd, *args)
254        f.close()
255
256    def test_fdopen(self):
257        fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
258        os.close(fd)
259
260        self.fdopen_helper()
261        self.fdopen_helper('r')
262        self.fdopen_helper('r', 100)
263
264    def test_replace(self):
265        TESTFN2 = support.TESTFN + ".2"
266        self.addCleanup(support.unlink, support.TESTFN)
267        self.addCleanup(support.unlink, TESTFN2)
268
269        create_file(support.TESTFN, b"1")
270        create_file(TESTFN2, b"2")
271
272        os.replace(support.TESTFN, TESTFN2)
273        self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
274        with open(TESTFN2, 'r') as f:
275            self.assertEqual(f.read(), "1")
276
277    def test_open_keywords(self):
278        f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
279            dir_fd=None)
280        os.close(f)
281
282    def test_symlink_keywords(self):
283        symlink = support.get_attribute(os, "symlink")
284        try:
285            symlink(src='target', dst=support.TESTFN,
286                target_is_directory=False, dir_fd=None)
287        except (NotImplementedError, OSError):
288            pass  # No OS support or unprivileged user
289
290    @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
291    def test_copy_file_range_invalid_values(self):
292        with self.assertRaises(ValueError):
293            os.copy_file_range(0, 1, -10)
294
295    @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
296    def test_copy_file_range(self):
297        TESTFN2 = support.TESTFN + ".3"
298        data = b'0123456789'
299
300        create_file(support.TESTFN, data)
301        self.addCleanup(support.unlink, support.TESTFN)
302
303        in_file = open(support.TESTFN, 'rb')
304        self.addCleanup(in_file.close)
305        in_fd = in_file.fileno()
306
307        out_file = open(TESTFN2, 'w+b')
308        self.addCleanup(support.unlink, TESTFN2)
309        self.addCleanup(out_file.close)
310        out_fd = out_file.fileno()
311
312        try:
313            i = os.copy_file_range(in_fd, out_fd, 5)
314        except OSError as e:
315            # Handle the case in which Python was compiled
316            # in a system with the syscall but without support
317            # in the kernel.
318            if e.errno != errno.ENOSYS:
319                raise
320            self.skipTest(e)
321        else:
322            # The number of copied bytes can be less than
323            # the number of bytes originally requested.
324            self.assertIn(i, range(0, 6));
325
326            with open(TESTFN2, 'rb') as in_file:
327                self.assertEqual(in_file.read(), data[:i])
328
329    @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
330    def test_copy_file_range_offset(self):
331        TESTFN4 = support.TESTFN + ".4"
332        data = b'0123456789'
333        bytes_to_copy = 6
334        in_skip = 3
335        out_seek = 5
336
337        create_file(support.TESTFN, data)
338        self.addCleanup(support.unlink, support.TESTFN)
339
340        in_file = open(support.TESTFN, 'rb')
341        self.addCleanup(in_file.close)
342        in_fd = in_file.fileno()
343
344        out_file = open(TESTFN4, 'w+b')
345        self.addCleanup(support.unlink, TESTFN4)
346        self.addCleanup(out_file.close)
347        out_fd = out_file.fileno()
348
349        try:
350            i = os.copy_file_range(in_fd, out_fd, bytes_to_copy,
351                                   offset_src=in_skip,
352                                   offset_dst=out_seek)
353        except OSError as e:
354            # Handle the case in which Python was compiled
355            # in a system with the syscall but without support
356            # in the kernel.
357            if e.errno != errno.ENOSYS:
358                raise
359            self.skipTest(e)
360        else:
361            # The number of copied bytes can be less than
362            # the number of bytes originally requested.
363            self.assertIn(i, range(0, bytes_to_copy+1));
364
365            with open(TESTFN4, 'rb') as in_file:
366                read = in_file.read()
367            # seeked bytes (5) are zero'ed
368            self.assertEqual(read[:out_seek], b'\x00'*out_seek)
369            # 012 are skipped (in_skip)
370            # 345678 are copied in the file (in_skip + bytes_to_copy)
371            self.assertEqual(read[out_seek:],
372                             data[in_skip:in_skip+i])
373
374# Test attributes on return values from os.*stat* family.
375class StatAttributeTests(unittest.TestCase):
376    def setUp(self):
377        self.fname = support.TESTFN
378        self.addCleanup(support.unlink, self.fname)
379        create_file(self.fname, b"ABC")
380
381    def check_stat_attributes(self, fname):
382        result = os.stat(fname)
383
384        # Make sure direct access works
385        self.assertEqual(result[stat.ST_SIZE], 3)
386        self.assertEqual(result.st_size, 3)
387
388        # Make sure all the attributes are there
389        members = dir(result)
390        for name in dir(stat):
391            if name[:3] == 'ST_':
392                attr = name.lower()
393                if name.endswith("TIME"):
394                    def trunc(x): return int(x)
395                else:
396                    def trunc(x): return x
397                self.assertEqual(trunc(getattr(result, attr)),
398                                  result[getattr(stat, name)])
399                self.assertIn(attr, members)
400
401        # Make sure that the st_?time and st_?time_ns fields roughly agree
402        # (they should always agree up to around tens-of-microseconds)
403        for name in 'st_atime st_mtime st_ctime'.split():
404            floaty = int(getattr(result, name) * 100000)
405            nanosecondy = getattr(result, name + "_ns") // 10000
406            self.assertAlmostEqual(floaty, nanosecondy, delta=2)
407
408        try:
409            result[200]
410            self.fail("No exception raised")
411        except IndexError:
412            pass
413
414        # Make sure that assignment fails
415        try:
416            result.st_mode = 1
417            self.fail("No exception raised")
418        except AttributeError:
419            pass
420
421        try:
422            result.st_rdev = 1
423            self.fail("No exception raised")
424        except (AttributeError, TypeError):
425            pass
426
427        try:
428            result.parrot = 1
429            self.fail("No exception raised")
430        except AttributeError:
431            pass
432
433        # Use the stat_result constructor with a too-short tuple.
434        try:
435            result2 = os.stat_result((10,))
436            self.fail("No exception raised")
437        except TypeError:
438            pass
439
440        # Use the constructor with a too-long tuple.
441        try:
442            result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
443        except TypeError:
444            pass
445
446    def test_stat_attributes(self):
447        self.check_stat_attributes(self.fname)
448
449    def test_stat_attributes_bytes(self):
450        try:
451            fname = self.fname.encode(sys.getfilesystemencoding())
452        except UnicodeEncodeError:
453            self.skipTest("cannot encode %a for the filesystem" % self.fname)
454        self.check_stat_attributes(fname)
455
456    def test_stat_result_pickle(self):
457        result = os.stat(self.fname)
458        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
459            p = pickle.dumps(result, proto)
460            self.assertIn(b'stat_result', p)
461            if proto < 4:
462                self.assertIn(b'cos\nstat_result\n', p)
463            unpickled = pickle.loads(p)
464            self.assertEqual(result, unpickled)
465
466    @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
467    def test_statvfs_attributes(self):
468        result = os.statvfs(self.fname)
469
470        # Make sure direct access works
471        self.assertEqual(result.f_bfree, result[3])
472
473        # Make sure all the attributes are there.
474        members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
475                    'ffree', 'favail', 'flag', 'namemax')
476        for value, member in enumerate(members):
477            self.assertEqual(getattr(result, 'f_' + member), result[value])
478
479        self.assertTrue(isinstance(result.f_fsid, int))
480
481        # Test that the size of the tuple doesn't change
482        self.assertEqual(len(result), 10)
483
484        # Make sure that assignment really fails
485        try:
486            result.f_bfree = 1
487            self.fail("No exception raised")
488        except AttributeError:
489            pass
490
491        try:
492            result.parrot = 1
493            self.fail("No exception raised")
494        except AttributeError:
495            pass
496
497        # Use the constructor with a too-short tuple.
498        try:
499            result2 = os.statvfs_result((10,))
500            self.fail("No exception raised")
501        except TypeError:
502            pass
503
504        # Use the constructor with a too-long tuple.
505        try:
506            result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
507        except TypeError:
508            pass
509
510    @unittest.skipUnless(hasattr(os, 'statvfs'),
511                         "need os.statvfs()")
512    def test_statvfs_result_pickle(self):
513        result = os.statvfs(self.fname)
514
515        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
516            p = pickle.dumps(result, proto)
517            self.assertIn(b'statvfs_result', p)
518            if proto < 4:
519                self.assertIn(b'cos\nstatvfs_result\n', p)
520            unpickled = pickle.loads(p)
521            self.assertEqual(result, unpickled)
522
523    @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
524    def test_1686475(self):
525        # Verify that an open file can be stat'ed
526        try:
527            os.stat(r"c:\pagefile.sys")
528        except FileNotFoundError:
529            self.skipTest(r'c:\pagefile.sys does not exist')
530        except OSError as e:
531            self.fail("Could not stat pagefile.sys")
532
533    @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
534    @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
535    def test_15261(self):
536        # Verify that stat'ing a closed fd does not cause crash
537        r, w = os.pipe()
538        try:
539            os.stat(r)          # should not raise error
540        finally:
541            os.close(r)
542            os.close(w)
543        with self.assertRaises(OSError) as ctx:
544            os.stat(r)
545        self.assertEqual(ctx.exception.errno, errno.EBADF)
546
547    def check_file_attributes(self, result):
548        self.assertTrue(hasattr(result, 'st_file_attributes'))
549        self.assertTrue(isinstance(result.st_file_attributes, int))
550        self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
551
552    @unittest.skipUnless(sys.platform == "win32",
553                         "st_file_attributes is Win32 specific")
554    def test_file_attributes(self):
555        # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
556        result = os.stat(self.fname)
557        self.check_file_attributes(result)
558        self.assertEqual(
559            result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
560            0)
561
562        # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
563        dirname = support.TESTFN + "dir"
564        os.mkdir(dirname)
565        self.addCleanup(os.rmdir, dirname)
566
567        result = os.stat(dirname)
568        self.check_file_attributes(result)
569        self.assertEqual(
570            result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
571            stat.FILE_ATTRIBUTE_DIRECTORY)
572
573    @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
574    def test_access_denied(self):
575        # Default to FindFirstFile WIN32_FIND_DATA when access is
576        # denied. See issue 28075.
577        # os.environ['TEMP'] should be located on a volume that
578        # supports file ACLs.
579        fname = os.path.join(os.environ['TEMP'], self.fname)
580        self.addCleanup(support.unlink, fname)
581        create_file(fname, b'ABC')
582        # Deny the right to [S]YNCHRONIZE on the file to
583        # force CreateFile to fail with ERROR_ACCESS_DENIED.
584        DETACHED_PROCESS = 8
585        subprocess.check_call(
586            # bpo-30584: Use security identifier *S-1-5-32-545 instead
587            # of localized "Users" to not depend on the locale.
588            ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'],
589            creationflags=DETACHED_PROCESS
590        )
591        result = os.stat(fname)
592        self.assertNotEqual(result.st_size, 0)
593
594    @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
595    def test_stat_block_device(self):
596        # bpo-38030: os.stat fails for block devices
597        # Test a filename like "//./C:"
598        fname = "//./" + os.path.splitdrive(os.getcwd())[0]
599        result = os.stat(fname)
600        self.assertEqual(result.st_mode, stat.S_IFBLK)
601
602
603class UtimeTests(unittest.TestCase):
604    def setUp(self):
605        self.dirname = support.TESTFN
606        self.fname = os.path.join(self.dirname, "f1")
607
608        self.addCleanup(support.rmtree, self.dirname)
609        os.mkdir(self.dirname)
610        create_file(self.fname)
611
612    def support_subsecond(self, filename):
613        # Heuristic to check if the filesystem supports timestamp with
614        # subsecond resolution: check if float and int timestamps are different
615        st = os.stat(filename)
616        return ((st.st_atime != st[7])
617                or (st.st_mtime != st[8])
618                or (st.st_ctime != st[9]))
619
620    def _test_utime(self, set_time, filename=None):
621        if not filename:
622            filename = self.fname
623
624        support_subsecond = self.support_subsecond(filename)
625        if support_subsecond:
626            # Timestamp with a resolution of 1 microsecond (10^-6).
627            #
628            # The resolution of the C internal function used by os.utime()
629            # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
630            # test with a resolution of 1 ns requires more work:
631            # see the issue #15745.
632            atime_ns = 1002003000   # 1.002003 seconds
633            mtime_ns = 4005006000   # 4.005006 seconds
634        else:
635            # use a resolution of 1 second
636            atime_ns = 5 * 10**9
637            mtime_ns = 8 * 10**9
638
639        set_time(filename, (atime_ns, mtime_ns))
640        st = os.stat(filename)
641
642        if support_subsecond:
643            self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
644            self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
645        else:
646            self.assertEqual(st.st_atime, atime_ns * 1e-9)
647            self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
648        self.assertEqual(st.st_atime_ns, atime_ns)
649        self.assertEqual(st.st_mtime_ns, mtime_ns)
650
651    def test_utime(self):
652        def set_time(filename, ns):
653            # test the ns keyword parameter
654            os.utime(filename, ns=ns)
655        self._test_utime(set_time)
656
657    @staticmethod
658    def ns_to_sec(ns):
659        # Convert a number of nanosecond (int) to a number of seconds (float).
660        # Round towards infinity by adding 0.5 nanosecond to avoid rounding
661        # issue, os.utime() rounds towards minus infinity.
662        return (ns * 1e-9) + 0.5e-9
663
664    def test_utime_by_indexed(self):
665        # pass times as floating point seconds as the second indexed parameter
666        def set_time(filename, ns):
667            atime_ns, mtime_ns = ns
668            atime = self.ns_to_sec(atime_ns)
669            mtime = self.ns_to_sec(mtime_ns)
670            # test utimensat(timespec), utimes(timeval), utime(utimbuf)
671            # or utime(time_t)
672            os.utime(filename, (atime, mtime))
673        self._test_utime(set_time)
674
675    def test_utime_by_times(self):
676        def set_time(filename, ns):
677            atime_ns, mtime_ns = ns
678            atime = self.ns_to_sec(atime_ns)
679            mtime = self.ns_to_sec(mtime_ns)
680            # test the times keyword parameter
681            os.utime(filename, times=(atime, mtime))
682        self._test_utime(set_time)
683
684    @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
685                         "follow_symlinks support for utime required "
686                         "for this test.")
687    def test_utime_nofollow_symlinks(self):
688        def set_time(filename, ns):
689            # use follow_symlinks=False to test utimensat(timespec)
690            # or lutimes(timeval)
691            os.utime(filename, ns=ns, follow_symlinks=False)
692        self._test_utime(set_time)
693
694    @unittest.skipUnless(os.utime in os.supports_fd,
695                         "fd support for utime required for this test.")
696    def test_utime_fd(self):
697        def set_time(filename, ns):
698            with open(filename, 'wb', 0) as fp:
699                # use a file descriptor to test futimens(timespec)
700                # or futimes(timeval)
701                os.utime(fp.fileno(), ns=ns)
702        self._test_utime(set_time)
703
704    @unittest.skipUnless(os.utime in os.supports_dir_fd,
705                         "dir_fd support for utime required for this test.")
706    def test_utime_dir_fd(self):
707        def set_time(filename, ns):
708            dirname, name = os.path.split(filename)
709            dirfd = os.open(dirname, os.O_RDONLY)
710            try:
711                # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
712                os.utime(name, dir_fd=dirfd, ns=ns)
713            finally:
714                os.close(dirfd)
715        self._test_utime(set_time)
716
717    def test_utime_directory(self):
718        def set_time(filename, ns):
719            # test calling os.utime() on a directory
720            os.utime(filename, ns=ns)
721        self._test_utime(set_time, filename=self.dirname)
722
723    def _test_utime_current(self, set_time):
724        # Get the system clock
725        current = time.time()
726
727        # Call os.utime() to set the timestamp to the current system clock
728        set_time(self.fname)
729
730        if not self.support_subsecond(self.fname):
731            delta = 1.0
732        else:
733            # On Windows, the usual resolution of time.time() is 15.6 ms.
734            # bpo-30649: Tolerate 50 ms for slow Windows buildbots.
735            #
736            # x86 Gentoo Refleaks 3.x once failed with dt=20.2 ms. So use
737            # also 50 ms on other platforms.
738            delta = 0.050
739        st = os.stat(self.fname)
740        msg = ("st_time=%r, current=%r, dt=%r"
741               % (st.st_mtime, current, st.st_mtime - current))
742        self.assertAlmostEqual(st.st_mtime, current,
743                               delta=delta, msg=msg)
744
745    def test_utime_current(self):
746        def set_time(filename):
747            # Set to the current time in the new way
748            os.utime(self.fname)
749        self._test_utime_current(set_time)
750
751    def test_utime_current_old(self):
752        def set_time(filename):
753            # Set to the current time in the old explicit way.
754            os.utime(self.fname, None)
755        self._test_utime_current(set_time)
756
757    def get_file_system(self, path):
758        if sys.platform == 'win32':
759            root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
760            import ctypes
761            kernel32 = ctypes.windll.kernel32
762            buf = ctypes.create_unicode_buffer("", 100)
763            ok = kernel32.GetVolumeInformationW(root, None, 0,
764                                                None, None, None,
765                                                buf, len(buf))
766            if ok:
767                return buf.value
768        # return None if the filesystem is unknown
769
770    def test_large_time(self):
771        # Many filesystems are limited to the year 2038. At least, the test
772        # pass with NTFS filesystem.
773        if self.get_file_system(self.dirname) != "NTFS":
774            self.skipTest("requires NTFS")
775
776        large = 5000000000   # some day in 2128
777        os.utime(self.fname, (large, large))
778        self.assertEqual(os.stat(self.fname).st_mtime, large)
779
780    def test_utime_invalid_arguments(self):
781        # seconds and nanoseconds parameters are mutually exclusive
782        with self.assertRaises(ValueError):
783            os.utime(self.fname, (5, 5), ns=(5, 5))
784        with self.assertRaises(TypeError):
785            os.utime(self.fname, [5, 5])
786        with self.assertRaises(TypeError):
787            os.utime(self.fname, (5,))
788        with self.assertRaises(TypeError):
789            os.utime(self.fname, (5, 5, 5))
790        with self.assertRaises(TypeError):
791            os.utime(self.fname, ns=[5, 5])
792        with self.assertRaises(TypeError):
793            os.utime(self.fname, ns=(5,))
794        with self.assertRaises(TypeError):
795            os.utime(self.fname, ns=(5, 5, 5))
796
797        if os.utime not in os.supports_follow_symlinks:
798            with self.assertRaises(NotImplementedError):
799                os.utime(self.fname, (5, 5), follow_symlinks=False)
800        if os.utime not in os.supports_fd:
801            with open(self.fname, 'wb', 0) as fp:
802                with self.assertRaises(TypeError):
803                    os.utime(fp.fileno(), (5, 5))
804        if os.utime not in os.supports_dir_fd:
805            with self.assertRaises(NotImplementedError):
806                os.utime(self.fname, (5, 5), dir_fd=0)
807
808    @support.cpython_only
809    def test_issue31577(self):
810        # The interpreter shouldn't crash in case utime() received a bad
811        # ns argument.
812        def get_bad_int(divmod_ret_val):
813            class BadInt:
814                def __divmod__(*args):
815                    return divmod_ret_val
816            return BadInt()
817        with self.assertRaises(TypeError):
818            os.utime(self.fname, ns=(get_bad_int(42), 1))
819        with self.assertRaises(TypeError):
820            os.utime(self.fname, ns=(get_bad_int(()), 1))
821        with self.assertRaises(TypeError):
822            os.utime(self.fname, ns=(get_bad_int((1, 2, 3)), 1))
823
824
825from test import mapping_tests
826
827class EnvironTests(mapping_tests.BasicTestMappingProtocol):
828    """check that os.environ object conform to mapping protocol"""
829    type2test = None
830
831    def setUp(self):
832        self.__save = dict(os.environ)
833        if os.supports_bytes_environ:
834            self.__saveb = dict(os.environb)
835        for key, value in self._reference().items():
836            os.environ[key] = value
837
838    def tearDown(self):
839        os.environ.clear()
840        os.environ.update(self.__save)
841        if os.supports_bytes_environ:
842            os.environb.clear()
843            os.environb.update(self.__saveb)
844
845    def _reference(self):
846        return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
847
848    def _empty_mapping(self):
849        os.environ.clear()
850        return os.environ
851
852    # Bug 1110478
853    @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
854                         'requires a shell')
855    def test_update2(self):
856        os.environ.clear()
857        os.environ.update(HELLO="World")
858        with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
859            value = popen.read().strip()
860            self.assertEqual(value, "World")
861
862    @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
863                         'requires a shell')
864    def test_os_popen_iter(self):
865        with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
866                      % unix_shell) as popen:
867            it = iter(popen)
868            self.assertEqual(next(it), "line1\n")
869            self.assertEqual(next(it), "line2\n")
870            self.assertEqual(next(it), "line3\n")
871            self.assertRaises(StopIteration, next, it)
872
873    # Verify environ keys and values from the OS are of the
874    # correct str type.
875    def test_keyvalue_types(self):
876        for key, val in os.environ.items():
877            self.assertEqual(type(key), str)
878            self.assertEqual(type(val), str)
879
880    def test_items(self):
881        for key, value in self._reference().items():
882            self.assertEqual(os.environ.get(key), value)
883
884    # Issue 7310
885    def test___repr__(self):
886        """Check that the repr() of os.environ looks like environ({...})."""
887        env = os.environ
888        self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
889            '{!r}: {!r}'.format(key, value)
890            for key, value in env.items())))
891
892    def test_get_exec_path(self):
893        defpath_list = os.defpath.split(os.pathsep)
894        test_path = ['/monty', '/python', '', '/flying/circus']
895        test_env = {'PATH': os.pathsep.join(test_path)}
896
897        saved_environ = os.environ
898        try:
899            os.environ = dict(test_env)
900            # Test that defaulting to os.environ works.
901            self.assertSequenceEqual(test_path, os.get_exec_path())
902            self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
903        finally:
904            os.environ = saved_environ
905
906        # No PATH environment variable
907        self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
908        # Empty PATH environment variable
909        self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
910        # Supplied PATH environment variable
911        self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
912
913        if os.supports_bytes_environ:
914            # env cannot contain 'PATH' and b'PATH' keys
915            try:
916                # ignore BytesWarning warning
917                with warnings.catch_warnings(record=True):
918                    mixed_env = {'PATH': '1', b'PATH': b'2'}
919            except BytesWarning:
920                # mixed_env cannot be created with python -bb
921                pass
922            else:
923                self.assertRaises(ValueError, os.get_exec_path, mixed_env)
924
925            # bytes key and/or value
926            self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
927                ['abc'])
928            self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
929                ['abc'])
930            self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
931                ['abc'])
932
933    @unittest.skipUnless(os.supports_bytes_environ,
934                         "os.environb required for this test.")
935    def test_environb(self):
936        # os.environ -> os.environb
937        value = 'euro\u20ac'
938        try:
939            value_bytes = value.encode(sys.getfilesystemencoding(),
940                                       'surrogateescape')
941        except UnicodeEncodeError:
942            msg = "U+20AC character is not encodable to %s" % (
943                sys.getfilesystemencoding(),)
944            self.skipTest(msg)
945        os.environ['unicode'] = value
946        self.assertEqual(os.environ['unicode'], value)
947        self.assertEqual(os.environb[b'unicode'], value_bytes)
948
949        # os.environb -> os.environ
950        value = b'\xff'
951        os.environb[b'bytes'] = value
952        self.assertEqual(os.environb[b'bytes'], value)
953        value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
954        self.assertEqual(os.environ['bytes'], value_str)
955
956    # On OS X < 10.6, unsetenv() doesn't return a value (bpo-13415).
957    @support.requires_mac_ver(10, 6)
958    def test_unset_error(self):
959        if sys.platform == "win32":
960            # an environment variable is limited to 32,767 characters
961            key = 'x' * 50000
962            self.assertRaises(ValueError, os.environ.__delitem__, key)
963        else:
964            # "=" is not allowed in a variable name
965            key = 'key='
966            self.assertRaises(OSError, os.environ.__delitem__, key)
967
968    def test_key_type(self):
969        missing = 'missingkey'
970        self.assertNotIn(missing, os.environ)
971
972        with self.assertRaises(KeyError) as cm:
973            os.environ[missing]
974        self.assertIs(cm.exception.args[0], missing)
975        self.assertTrue(cm.exception.__suppress_context__)
976
977        with self.assertRaises(KeyError) as cm:
978            del os.environ[missing]
979        self.assertIs(cm.exception.args[0], missing)
980        self.assertTrue(cm.exception.__suppress_context__)
981
982    def _test_environ_iteration(self, collection):
983        iterator = iter(collection)
984        new_key = "__new_key__"
985
986        next(iterator)  # start iteration over os.environ.items
987
988        # add a new key in os.environ mapping
989        os.environ[new_key] = "test_environ_iteration"
990
991        try:
992            next(iterator)  # force iteration over modified mapping
993            self.assertEqual(os.environ[new_key], "test_environ_iteration")
994        finally:
995            del os.environ[new_key]
996
997    def test_iter_error_when_changing_os_environ(self):
998        self._test_environ_iteration(os.environ)
999
1000    def test_iter_error_when_changing_os_environ_items(self):
1001        self._test_environ_iteration(os.environ.items())
1002
1003    def test_iter_error_when_changing_os_environ_values(self):
1004        self._test_environ_iteration(os.environ.values())
1005
1006
1007class WalkTests(unittest.TestCase):
1008    """Tests for os.walk()."""
1009
1010    # Wrapper to hide minor differences between os.walk and os.fwalk
1011    # to tests both functions with the same code base
1012    def walk(self, top, **kwargs):
1013        if 'follow_symlinks' in kwargs:
1014            kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1015        return os.walk(top, **kwargs)
1016
1017    def setUp(self):
1018        join = os.path.join
1019        self.addCleanup(support.rmtree, support.TESTFN)
1020
1021        # Build:
1022        #     TESTFN/
1023        #       TEST1/              a file kid and two directory kids
1024        #         tmp1
1025        #         SUB1/             a file kid and a directory kid
1026        #           tmp2
1027        #           SUB11/          no kids
1028        #         SUB2/             a file kid and a dirsymlink kid
1029        #           tmp3
1030        #           SUB21/          not readable
1031        #             tmp5
1032        #           link/           a symlink to TESTFN.2
1033        #           broken_link
1034        #           broken_link2
1035        #           broken_link3
1036        #       TEST2/
1037        #         tmp4              a lone file
1038        self.walk_path = join(support.TESTFN, "TEST1")
1039        self.sub1_path = join(self.walk_path, "SUB1")
1040        self.sub11_path = join(self.sub1_path, "SUB11")
1041        sub2_path = join(self.walk_path, "SUB2")
1042        sub21_path = join(sub2_path, "SUB21")
1043        tmp1_path = join(self.walk_path, "tmp1")
1044        tmp2_path = join(self.sub1_path, "tmp2")
1045        tmp3_path = join(sub2_path, "tmp3")
1046        tmp5_path = join(sub21_path, "tmp3")
1047        self.link_path = join(sub2_path, "link")
1048        t2_path = join(support.TESTFN, "TEST2")
1049        tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
1050        broken_link_path = join(sub2_path, "broken_link")
1051        broken_link2_path = join(sub2_path, "broken_link2")
1052        broken_link3_path = join(sub2_path, "broken_link3")
1053
1054        # Create stuff.
1055        os.makedirs(self.sub11_path)
1056        os.makedirs(sub2_path)
1057        os.makedirs(sub21_path)
1058        os.makedirs(t2_path)
1059
1060        for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
1061            with open(path, "x") as f:
1062                f.write("I'm " + path + " and proud of it.  Blame test_os.\n")
1063
1064        if support.can_symlink():
1065            os.symlink(os.path.abspath(t2_path), self.link_path)
1066            os.symlink('broken', broken_link_path, True)
1067            os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
1068            os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
1069            self.sub2_tree = (sub2_path, ["SUB21", "link"],
1070                              ["broken_link", "broken_link2", "broken_link3",
1071                               "tmp3"])
1072        else:
1073            self.sub2_tree = (sub2_path, ["SUB21"], ["tmp3"])
1074
1075        os.chmod(sub21_path, 0)
1076        try:
1077            os.listdir(sub21_path)
1078        except PermissionError:
1079            self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
1080        else:
1081            os.chmod(sub21_path, stat.S_IRWXU)
1082            os.unlink(tmp5_path)
1083            os.rmdir(sub21_path)
1084            del self.sub2_tree[1][:1]
1085
1086    def test_walk_topdown(self):
1087        # Walk top-down.
1088        all = list(self.walk(self.walk_path))
1089
1090        self.assertEqual(len(all), 4)
1091        # We can't know which order SUB1 and SUB2 will appear in.
1092        # Not flipped:  TESTFN, SUB1, SUB11, SUB2
1093        #     flipped:  TESTFN, SUB2, SUB1, SUB11
1094        flipped = all[0][1][0] != "SUB1"
1095        all[0][1].sort()
1096        all[3 - 2 * flipped][-1].sort()
1097        all[3 - 2 * flipped][1].sort()
1098        self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1099        self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
1100        self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
1101        self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
1102
1103    def test_walk_prune(self, walk_path=None):
1104        if walk_path is None:
1105            walk_path = self.walk_path
1106        # Prune the search.
1107        all = []
1108        for root, dirs, files in self.walk(walk_path):
1109            all.append((root, dirs, files))
1110            # Don't descend into SUB1.
1111            if 'SUB1' in dirs:
1112                # Note that this also mutates the dirs we appended to all!
1113                dirs.remove('SUB1')
1114
1115        self.assertEqual(len(all), 2)
1116        self.assertEqual(all[0], (self.walk_path, ["SUB2"], ["tmp1"]))
1117
1118        all[1][-1].sort()
1119        all[1][1].sort()
1120        self.assertEqual(all[1], self.sub2_tree)
1121
1122    def test_file_like_path(self):
1123        self.test_walk_prune(FakePath(self.walk_path))
1124
1125    def test_walk_bottom_up(self):
1126        # Walk bottom-up.
1127        all = list(self.walk(self.walk_path, topdown=False))
1128
1129        self.assertEqual(len(all), 4, all)
1130        # We can't know which order SUB1 and SUB2 will appear in.
1131        # Not flipped:  SUB11, SUB1, SUB2, TESTFN
1132        #     flipped:  SUB2, SUB11, SUB1, TESTFN
1133        flipped = all[3][1][0] != "SUB1"
1134        all[3][1].sort()
1135        all[2 - 2 * flipped][-1].sort()
1136        all[2 - 2 * flipped][1].sort()
1137        self.assertEqual(all[3],
1138                         (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1139        self.assertEqual(all[flipped],
1140                         (self.sub11_path, [], []))
1141        self.assertEqual(all[flipped + 1],
1142                         (self.sub1_path, ["SUB11"], ["tmp2"]))
1143        self.assertEqual(all[2 - 2 * flipped],
1144                         self.sub2_tree)
1145
1146    def test_walk_symlink(self):
1147        if not support.can_symlink():
1148            self.skipTest("need symlink support")
1149
1150        # Walk, following symlinks.
1151        walk_it = self.walk(self.walk_path, follow_symlinks=True)
1152        for root, dirs, files in walk_it:
1153            if root == self.link_path:
1154                self.assertEqual(dirs, [])
1155                self.assertEqual(files, ["tmp4"])
1156                break
1157        else:
1158            self.fail("Didn't follow symlink with followlinks=True")
1159
1160    def test_walk_bad_dir(self):
1161        # Walk top-down.
1162        errors = []
1163        walk_it = self.walk(self.walk_path, onerror=errors.append)
1164        root, dirs, files = next(walk_it)
1165        self.assertEqual(errors, [])
1166        dir1 = 'SUB1'
1167        path1 = os.path.join(root, dir1)
1168        path1new = os.path.join(root, dir1 + '.new')
1169        os.rename(path1, path1new)
1170        try:
1171            roots = [r for r, d, f in walk_it]
1172            self.assertTrue(errors)
1173            self.assertNotIn(path1, roots)
1174            self.assertNotIn(path1new, roots)
1175            for dir2 in dirs:
1176                if dir2 != dir1:
1177                    self.assertIn(os.path.join(root, dir2), roots)
1178        finally:
1179            os.rename(path1new, path1)
1180
1181    def test_walk_many_open_files(self):
1182        depth = 30
1183        base = os.path.join(support.TESTFN, 'deep')
1184        p = os.path.join(base, *(['d']*depth))
1185        os.makedirs(p)
1186
1187        iters = [self.walk(base, topdown=False) for j in range(100)]
1188        for i in range(depth + 1):
1189            expected = (p, ['d'] if i else [], [])
1190            for it in iters:
1191                self.assertEqual(next(it), expected)
1192            p = os.path.dirname(p)
1193
1194        iters = [self.walk(base, topdown=True) for j in range(100)]
1195        p = base
1196        for i in range(depth + 1):
1197            expected = (p, ['d'] if i < depth else [], [])
1198            for it in iters:
1199                self.assertEqual(next(it), expected)
1200            p = os.path.join(p, 'd')
1201
1202
1203@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1204class FwalkTests(WalkTests):
1205    """Tests for os.fwalk()."""
1206
1207    def walk(self, top, **kwargs):
1208        for root, dirs, files, root_fd in self.fwalk(top, **kwargs):
1209            yield (root, dirs, files)
1210
1211    def fwalk(self, *args, **kwargs):
1212        return os.fwalk(*args, **kwargs)
1213
1214    def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1215        """
1216        compare with walk() results.
1217        """
1218        walk_kwargs = walk_kwargs.copy()
1219        fwalk_kwargs = fwalk_kwargs.copy()
1220        for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1221            walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1222            fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
1223
1224            expected = {}
1225            for root, dirs, files in os.walk(**walk_kwargs):
1226                expected[root] = (set(dirs), set(files))
1227
1228            for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs):
1229                self.assertIn(root, expected)
1230                self.assertEqual(expected[root], (set(dirs), set(files)))
1231
1232    def test_compare_to_walk(self):
1233        kwargs = {'top': support.TESTFN}
1234        self._compare_to_walk(kwargs, kwargs)
1235
1236    def test_dir_fd(self):
1237        try:
1238            fd = os.open(".", os.O_RDONLY)
1239            walk_kwargs = {'top': support.TESTFN}
1240            fwalk_kwargs = walk_kwargs.copy()
1241            fwalk_kwargs['dir_fd'] = fd
1242            self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1243        finally:
1244            os.close(fd)
1245
1246    def test_yields_correct_dir_fd(self):
1247        # check returned file descriptors
1248        for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1249            args = support.TESTFN, topdown, None
1250            for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks):
1251                # check that the FD is valid
1252                os.fstat(rootfd)
1253                # redundant check
1254                os.stat(rootfd)
1255                # check that listdir() returns consistent information
1256                self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
1257
1258    def test_fd_leak(self):
1259        # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1260        # we both check that calling fwalk() a large number of times doesn't
1261        # yield EMFILE, and that the minimum allocated FD hasn't changed.
1262        minfd = os.dup(1)
1263        os.close(minfd)
1264        for i in range(256):
1265            for x in self.fwalk(support.TESTFN):
1266                pass
1267        newfd = os.dup(1)
1268        self.addCleanup(os.close, newfd)
1269        self.assertEqual(newfd, minfd)
1270
1271    # fwalk() keeps file descriptors open
1272    test_walk_many_open_files = None
1273
1274
1275class BytesWalkTests(WalkTests):
1276    """Tests for os.walk() with bytes."""
1277    def walk(self, top, **kwargs):
1278        if 'follow_symlinks' in kwargs:
1279            kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1280        for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1281            root = os.fsdecode(broot)
1282            dirs = list(map(os.fsdecode, bdirs))
1283            files = list(map(os.fsdecode, bfiles))
1284            yield (root, dirs, files)
1285            bdirs[:] = list(map(os.fsencode, dirs))
1286            bfiles[:] = list(map(os.fsencode, files))
1287
1288@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1289class BytesFwalkTests(FwalkTests):
1290    """Tests for os.walk() with bytes."""
1291    def fwalk(self, top='.', *args, **kwargs):
1292        for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs):
1293            root = os.fsdecode(broot)
1294            dirs = list(map(os.fsdecode, bdirs))
1295            files = list(map(os.fsdecode, bfiles))
1296            yield (root, dirs, files, topfd)
1297            bdirs[:] = list(map(os.fsencode, dirs))
1298            bfiles[:] = list(map(os.fsencode, files))
1299
1300
1301class MakedirTests(unittest.TestCase):
1302    def setUp(self):
1303        os.mkdir(support.TESTFN)
1304
1305    def test_makedir(self):
1306        base = support.TESTFN
1307        path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1308        os.makedirs(path)             # Should work
1309        path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1310        os.makedirs(path)
1311
1312        # Try paths with a '.' in them
1313        self.assertRaises(OSError, os.makedirs, os.curdir)
1314        path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1315        os.makedirs(path)
1316        path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1317                            'dir5', 'dir6')
1318        os.makedirs(path)
1319
1320    def test_mode(self):
1321        with support.temp_umask(0o002):
1322            base = support.TESTFN
1323            parent = os.path.join(base, 'dir1')
1324            path = os.path.join(parent, 'dir2')
1325            os.makedirs(path, 0o555)
1326            self.assertTrue(os.path.exists(path))
1327            self.assertTrue(os.path.isdir(path))
1328            if os.name != 'nt':
1329                self.assertEqual(os.stat(path).st_mode & 0o777, 0o555)
1330                self.assertEqual(os.stat(parent).st_mode & 0o777, 0o775)
1331
1332    def test_exist_ok_existing_directory(self):
1333        path = os.path.join(support.TESTFN, 'dir1')
1334        mode = 0o777
1335        old_mask = os.umask(0o022)
1336        os.makedirs(path, mode)
1337        self.assertRaises(OSError, os.makedirs, path, mode)
1338        self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
1339        os.makedirs(path, 0o776, exist_ok=True)
1340        os.makedirs(path, mode=mode, exist_ok=True)
1341        os.umask(old_mask)
1342
1343        # Issue #25583: A drive root could raise PermissionError on Windows
1344        os.makedirs(os.path.abspath('/'), exist_ok=True)
1345
1346    def test_exist_ok_s_isgid_directory(self):
1347        path = os.path.join(support.TESTFN, 'dir1')
1348        S_ISGID = stat.S_ISGID
1349        mode = 0o777
1350        old_mask = os.umask(0o022)
1351        try:
1352            existing_testfn_mode = stat.S_IMODE(
1353                    os.lstat(support.TESTFN).st_mode)
1354            try:
1355                os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
1356            except PermissionError:
1357                raise unittest.SkipTest('Cannot set S_ISGID for dir.')
1358            if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1359                raise unittest.SkipTest('No support for S_ISGID dir mode.')
1360            # The os should apply S_ISGID from the parent dir for us, but
1361            # this test need not depend on that behavior.  Be explicit.
1362            os.makedirs(path, mode | S_ISGID)
1363            # http://bugs.python.org/issue14992
1364            # Should not fail when the bit is already set.
1365            os.makedirs(path, mode, exist_ok=True)
1366            # remove the bit.
1367            os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
1368            # May work even when the bit is not already set when demanded.
1369            os.makedirs(path, mode | S_ISGID, exist_ok=True)
1370        finally:
1371            os.umask(old_mask)
1372
1373    def test_exist_ok_existing_regular_file(self):
1374        base = support.TESTFN
1375        path = os.path.join(support.TESTFN, 'dir1')
1376        with open(path, 'w') as f:
1377            f.write('abc')
1378        self.assertRaises(OSError, os.makedirs, path)
1379        self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1380        self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1381        os.remove(path)
1382
1383    def tearDown(self):
1384        path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
1385                            'dir4', 'dir5', 'dir6')
1386        # If the tests failed, the bottom-most directory ('../dir6')
1387        # may not have been created, so we look for the outermost directory
1388        # that exists.
1389        while not os.path.exists(path) and path != support.TESTFN:
1390            path = os.path.dirname(path)
1391
1392        os.removedirs(path)
1393
1394
1395@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1396class ChownFileTests(unittest.TestCase):
1397
1398    @classmethod
1399    def setUpClass(cls):
1400        os.mkdir(support.TESTFN)
1401
1402    def test_chown_uid_gid_arguments_must_be_index(self):
1403        stat = os.stat(support.TESTFN)
1404        uid = stat.st_uid
1405        gid = stat.st_gid
1406        for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1407            self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1408            self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1409        self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1410        self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1411
1412    @unittest.skipUnless(hasattr(os, 'getgroups'), 'need os.getgroups')
1413    def test_chown_gid(self):
1414        groups = os.getgroups()
1415        if len(groups) < 2:
1416            self.skipTest("test needs at least 2 groups")
1417
1418        gid_1, gid_2 = groups[:2]
1419        uid = os.stat(support.TESTFN).st_uid
1420
1421        os.chown(support.TESTFN, uid, gid_1)
1422        gid = os.stat(support.TESTFN).st_gid
1423        self.assertEqual(gid, gid_1)
1424
1425        os.chown(support.TESTFN, uid, gid_2)
1426        gid = os.stat(support.TESTFN).st_gid
1427        self.assertEqual(gid, gid_2)
1428
1429    @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1430                         "test needs root privilege and more than one user")
1431    def test_chown_with_root(self):
1432        uid_1, uid_2 = all_users[:2]
1433        gid = os.stat(support.TESTFN).st_gid
1434        os.chown(support.TESTFN, uid_1, gid)
1435        uid = os.stat(support.TESTFN).st_uid
1436        self.assertEqual(uid, uid_1)
1437        os.chown(support.TESTFN, uid_2, gid)
1438        uid = os.stat(support.TESTFN).st_uid
1439        self.assertEqual(uid, uid_2)
1440
1441    @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1442                         "test needs non-root account and more than one user")
1443    def test_chown_without_permission(self):
1444        uid_1, uid_2 = all_users[:2]
1445        gid = os.stat(support.TESTFN).st_gid
1446        with self.assertRaises(PermissionError):
1447            os.chown(support.TESTFN, uid_1, gid)
1448            os.chown(support.TESTFN, uid_2, gid)
1449
1450    @classmethod
1451    def tearDownClass(cls):
1452        os.rmdir(support.TESTFN)
1453
1454
1455class RemoveDirsTests(unittest.TestCase):
1456    def setUp(self):
1457        os.makedirs(support.TESTFN)
1458
1459    def tearDown(self):
1460        support.rmtree(support.TESTFN)
1461
1462    def test_remove_all(self):
1463        dira = os.path.join(support.TESTFN, 'dira')
1464        os.mkdir(dira)
1465        dirb = os.path.join(dira, 'dirb')
1466        os.mkdir(dirb)
1467        os.removedirs(dirb)
1468        self.assertFalse(os.path.exists(dirb))
1469        self.assertFalse(os.path.exists(dira))
1470        self.assertFalse(os.path.exists(support.TESTFN))
1471
1472    def test_remove_partial(self):
1473        dira = os.path.join(support.TESTFN, 'dira')
1474        os.mkdir(dira)
1475        dirb = os.path.join(dira, 'dirb')
1476        os.mkdir(dirb)
1477        create_file(os.path.join(dira, 'file.txt'))
1478        os.removedirs(dirb)
1479        self.assertFalse(os.path.exists(dirb))
1480        self.assertTrue(os.path.exists(dira))
1481        self.assertTrue(os.path.exists(support.TESTFN))
1482
1483    def test_remove_nothing(self):
1484        dira = os.path.join(support.TESTFN, 'dira')
1485        os.mkdir(dira)
1486        dirb = os.path.join(dira, 'dirb')
1487        os.mkdir(dirb)
1488        create_file(os.path.join(dirb, 'file.txt'))
1489        with self.assertRaises(OSError):
1490            os.removedirs(dirb)
1491        self.assertTrue(os.path.exists(dirb))
1492        self.assertTrue(os.path.exists(dira))
1493        self.assertTrue(os.path.exists(support.TESTFN))
1494
1495
1496class DevNullTests(unittest.TestCase):
1497    def test_devnull(self):
1498        with open(os.devnull, 'wb', 0) as f:
1499            f.write(b'hello')
1500            f.close()
1501        with open(os.devnull, 'rb') as f:
1502            self.assertEqual(f.read(), b'')
1503
1504
1505class URandomTests(unittest.TestCase):
1506    def test_urandom_length(self):
1507        self.assertEqual(len(os.urandom(0)), 0)
1508        self.assertEqual(len(os.urandom(1)), 1)
1509        self.assertEqual(len(os.urandom(10)), 10)
1510        self.assertEqual(len(os.urandom(100)), 100)
1511        self.assertEqual(len(os.urandom(1000)), 1000)
1512
1513    def test_urandom_value(self):
1514        data1 = os.urandom(16)
1515        self.assertIsInstance(data1, bytes)
1516        data2 = os.urandom(16)
1517        self.assertNotEqual(data1, data2)
1518
1519    def get_urandom_subprocess(self, count):
1520        code = '\n'.join((
1521            'import os, sys',
1522            'data = os.urandom(%s)' % count,
1523            'sys.stdout.buffer.write(data)',
1524            'sys.stdout.buffer.flush()'))
1525        out = assert_python_ok('-c', code)
1526        stdout = out[1]
1527        self.assertEqual(len(stdout), count)
1528        return stdout
1529
1530    def test_urandom_subprocess(self):
1531        data1 = self.get_urandom_subprocess(16)
1532        data2 = self.get_urandom_subprocess(16)
1533        self.assertNotEqual(data1, data2)
1534
1535
1536@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1537class GetRandomTests(unittest.TestCase):
1538    @classmethod
1539    def setUpClass(cls):
1540        try:
1541            os.getrandom(1)
1542        except OSError as exc:
1543            if exc.errno == errno.ENOSYS:
1544                # Python compiled on a more recent Linux version
1545                # than the current Linux kernel
1546                raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1547            else:
1548                raise
1549
1550    def test_getrandom_type(self):
1551        data = os.getrandom(16)
1552        self.assertIsInstance(data, bytes)
1553        self.assertEqual(len(data), 16)
1554
1555    def test_getrandom0(self):
1556        empty = os.getrandom(0)
1557        self.assertEqual(empty, b'')
1558
1559    def test_getrandom_random(self):
1560        self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1561
1562        # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1563        # resource /dev/random
1564
1565    def test_getrandom_nonblock(self):
1566        # The call must not fail. Check also that the flag exists
1567        try:
1568            os.getrandom(1, os.GRND_NONBLOCK)
1569        except BlockingIOError:
1570            # System urandom is not initialized yet
1571            pass
1572
1573    def test_getrandom_value(self):
1574        data1 = os.getrandom(16)
1575        data2 = os.getrandom(16)
1576        self.assertNotEqual(data1, data2)
1577
1578
1579# os.urandom() doesn't use a file descriptor when it is implemented with the
1580# getentropy() function, the getrandom() function or the getrandom() syscall
1581OS_URANDOM_DONT_USE_FD = (
1582    sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1583    or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1584    or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
1585
1586@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1587                 "os.random() does not use a file descriptor")
1588@unittest.skipIf(sys.platform == "vxworks",
1589                 "VxWorks can't set RLIMIT_NOFILE to 1")
1590class URandomFDTests(unittest.TestCase):
1591    @unittest.skipUnless(resource, "test requires the resource module")
1592    def test_urandom_failure(self):
1593        # Check urandom() failing when it is not able to open /dev/random.
1594        # We spawn a new process to make the test more robust (if getrlimit()
1595        # failed to restore the file descriptor limit after this, the whole
1596        # test suite would crash; this actually happened on the OS X Tiger
1597        # buildbot).
1598        code = """if 1:
1599            import errno
1600            import os
1601            import resource
1602
1603            soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1604            resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1605            try:
1606                os.urandom(16)
1607            except OSError as e:
1608                assert e.errno == errno.EMFILE, e.errno
1609            else:
1610                raise AssertionError("OSError not raised")
1611            """
1612        assert_python_ok('-c', code)
1613
1614    def test_urandom_fd_closed(self):
1615        # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1616        # closed.
1617        code = """if 1:
1618            import os
1619            import sys
1620            import test.support
1621            os.urandom(4)
1622            with test.support.SuppressCrashReport():
1623                os.closerange(3, 256)
1624            sys.stdout.buffer.write(os.urandom(4))
1625            """
1626        rc, out, err = assert_python_ok('-Sc', code)
1627
1628    def test_urandom_fd_reopened(self):
1629        # Issue #21207: urandom() should detect its fd to /dev/urandom
1630        # changed to something else, and reopen it.
1631        self.addCleanup(support.unlink, support.TESTFN)
1632        create_file(support.TESTFN, b"x" * 256)
1633
1634        code = """if 1:
1635            import os
1636            import sys
1637            import test.support
1638            os.urandom(4)
1639            with test.support.SuppressCrashReport():
1640                for fd in range(3, 256):
1641                    try:
1642                        os.close(fd)
1643                    except OSError:
1644                        pass
1645                    else:
1646                        # Found the urandom fd (XXX hopefully)
1647                        break
1648                os.closerange(3, 256)
1649            with open({TESTFN!r}, 'rb') as f:
1650                new_fd = f.fileno()
1651                # Issue #26935: posix allows new_fd and fd to be equal but
1652                # some libc implementations have dup2 return an error in this
1653                # case.
1654                if new_fd != fd:
1655                    os.dup2(new_fd, fd)
1656                sys.stdout.buffer.write(os.urandom(4))
1657                sys.stdout.buffer.write(os.urandom(4))
1658            """.format(TESTFN=support.TESTFN)
1659        rc, out, err = assert_python_ok('-Sc', code)
1660        self.assertEqual(len(out), 8)
1661        self.assertNotEqual(out[0:4], out[4:8])
1662        rc, out2, err2 = assert_python_ok('-Sc', code)
1663        self.assertEqual(len(out2), 8)
1664        self.assertNotEqual(out2, out)
1665
1666
1667@contextlib.contextmanager
1668def _execvpe_mockup(defpath=None):
1669    """
1670    Stubs out execv and execve functions when used as context manager.
1671    Records exec calls. The mock execv and execve functions always raise an
1672    exception as they would normally never return.
1673    """
1674    # A list of tuples containing (function name, first arg, args)
1675    # of calls to execv or execve that have been made.
1676    calls = []
1677
1678    def mock_execv(name, *args):
1679        calls.append(('execv', name, args))
1680        raise RuntimeError("execv called")
1681
1682    def mock_execve(name, *args):
1683        calls.append(('execve', name, args))
1684        raise OSError(errno.ENOTDIR, "execve called")
1685
1686    try:
1687        orig_execv = os.execv
1688        orig_execve = os.execve
1689        orig_defpath = os.defpath
1690        os.execv = mock_execv
1691        os.execve = mock_execve
1692        if defpath is not None:
1693            os.defpath = defpath
1694        yield calls
1695    finally:
1696        os.execv = orig_execv
1697        os.execve = orig_execve
1698        os.defpath = orig_defpath
1699
1700@unittest.skipUnless(hasattr(os, 'execv'),
1701                     "need os.execv()")
1702class ExecTests(unittest.TestCase):
1703    @unittest.skipIf(USING_LINUXTHREADS,
1704                     "avoid triggering a linuxthreads bug: see issue #4970")
1705    def test_execvpe_with_bad_program(self):
1706        self.assertRaises(OSError, os.execvpe, 'no such app-',
1707                          ['no such app-'], None)
1708
1709    def test_execv_with_bad_arglist(self):
1710        self.assertRaises(ValueError, os.execv, 'notepad', ())
1711        self.assertRaises(ValueError, os.execv, 'notepad', [])
1712        self.assertRaises(ValueError, os.execv, 'notepad', ('',))
1713        self.assertRaises(ValueError, os.execv, 'notepad', [''])
1714
1715    def test_execvpe_with_bad_arglist(self):
1716        self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
1717        self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
1718        self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
1719
1720    @unittest.skipUnless(hasattr(os, '_execvpe'),
1721                         "No internal os._execvpe function to test.")
1722    def _test_internal_execvpe(self, test_type):
1723        program_path = os.sep + 'absolutepath'
1724        if test_type is bytes:
1725            program = b'executable'
1726            fullpath = os.path.join(os.fsencode(program_path), program)
1727            native_fullpath = fullpath
1728            arguments = [b'progname', 'arg1', 'arg2']
1729        else:
1730            program = 'executable'
1731            arguments = ['progname', 'arg1', 'arg2']
1732            fullpath = os.path.join(program_path, program)
1733            if os.name != "nt":
1734                native_fullpath = os.fsencode(fullpath)
1735            else:
1736                native_fullpath = fullpath
1737        env = {'spam': 'beans'}
1738
1739        # test os._execvpe() with an absolute path
1740        with _execvpe_mockup() as calls:
1741            self.assertRaises(RuntimeError,
1742                os._execvpe, fullpath, arguments)
1743            self.assertEqual(len(calls), 1)
1744            self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1745
1746        # test os._execvpe() with a relative path:
1747        # os.get_exec_path() returns defpath
1748        with _execvpe_mockup(defpath=program_path) as calls:
1749            self.assertRaises(OSError,
1750                os._execvpe, program, arguments, env=env)
1751            self.assertEqual(len(calls), 1)
1752            self.assertSequenceEqual(calls[0],
1753                ('execve', native_fullpath, (arguments, env)))
1754
1755        # test os._execvpe() with a relative path:
1756        # os.get_exec_path() reads the 'PATH' variable
1757        with _execvpe_mockup() as calls:
1758            env_path = env.copy()
1759            if test_type is bytes:
1760                env_path[b'PATH'] = program_path
1761            else:
1762                env_path['PATH'] = program_path
1763            self.assertRaises(OSError,
1764                os._execvpe, program, arguments, env=env_path)
1765            self.assertEqual(len(calls), 1)
1766            self.assertSequenceEqual(calls[0],
1767                ('execve', native_fullpath, (arguments, env_path)))
1768
1769    def test_internal_execvpe_str(self):
1770        self._test_internal_execvpe(str)
1771        if os.name != "nt":
1772            self._test_internal_execvpe(bytes)
1773
1774    def test_execve_invalid_env(self):
1775        args = [sys.executable, '-c', 'pass']
1776
1777        # null character in the environment variable name
1778        newenv = os.environ.copy()
1779        newenv["FRUIT\0VEGETABLE"] = "cabbage"
1780        with self.assertRaises(ValueError):
1781            os.execve(args[0], args, newenv)
1782
1783        # null character in the environment variable value
1784        newenv = os.environ.copy()
1785        newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
1786        with self.assertRaises(ValueError):
1787            os.execve(args[0], args, newenv)
1788
1789        # equal character in the environment variable name
1790        newenv = os.environ.copy()
1791        newenv["FRUIT=ORANGE"] = "lemon"
1792        with self.assertRaises(ValueError):
1793            os.execve(args[0], args, newenv)
1794
1795    @unittest.skipUnless(sys.platform == "win32", "Win32-specific test")
1796    def test_execve_with_empty_path(self):
1797        # bpo-32890: Check GetLastError() misuse
1798        try:
1799            os.execve('', ['arg'], {})
1800        except OSError as e:
1801            self.assertTrue(e.winerror is None or e.winerror != 0)
1802        else:
1803            self.fail('No OSError raised')
1804
1805
1806@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1807class Win32ErrorTests(unittest.TestCase):
1808    def setUp(self):
1809        try:
1810            os.stat(support.TESTFN)
1811        except FileNotFoundError:
1812            exists = False
1813        except OSError as exc:
1814            exists = True
1815            self.fail("file %s must not exist; os.stat failed with %s"
1816                      % (support.TESTFN, exc))
1817        else:
1818            self.fail("file %s must not exist" % support.TESTFN)
1819
1820    def test_rename(self):
1821        self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
1822
1823    def test_remove(self):
1824        self.assertRaises(OSError, os.remove, support.TESTFN)
1825
1826    def test_chdir(self):
1827        self.assertRaises(OSError, os.chdir, support.TESTFN)
1828
1829    def test_mkdir(self):
1830        self.addCleanup(support.unlink, support.TESTFN)
1831
1832        with open(support.TESTFN, "x") as f:
1833            self.assertRaises(OSError, os.mkdir, support.TESTFN)
1834
1835    def test_utime(self):
1836        self.assertRaises(OSError, os.utime, support.TESTFN, None)
1837
1838    def test_chmod(self):
1839        self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
1840
1841
1842class TestInvalidFD(unittest.TestCase):
1843    singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
1844               "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1845    #singles.append("close")
1846    #We omit close because it doesn't raise an exception on some platforms
1847    def get_single(f):
1848        def helper(self):
1849            if  hasattr(os, f):
1850                self.check(getattr(os, f))
1851        return helper
1852    for f in singles:
1853        locals()["test_"+f] = get_single(f)
1854
1855    def check(self, f, *args):
1856        try:
1857            f(support.make_bad_fd(), *args)
1858        except OSError as e:
1859            self.assertEqual(e.errno, errno.EBADF)
1860        else:
1861            self.fail("%r didn't raise an OSError with a bad file descriptor"
1862                      % f)
1863
1864    @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
1865    def test_isatty(self):
1866        self.assertEqual(os.isatty(support.make_bad_fd()), False)
1867
1868    @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
1869    def test_closerange(self):
1870        fd = support.make_bad_fd()
1871        # Make sure none of the descriptors we are about to close are
1872        # currently valid (issue 6542).
1873        for i in range(10):
1874            try: os.fstat(fd+i)
1875            except OSError:
1876                pass
1877            else:
1878                break
1879        if i < 2:
1880            raise unittest.SkipTest(
1881                "Unable to acquire a range of invalid file descriptors")
1882        self.assertEqual(os.closerange(fd, fd + i-1), None)
1883
1884    @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
1885    def test_dup2(self):
1886        self.check(os.dup2, 20)
1887
1888    @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
1889    def test_fchmod(self):
1890        self.check(os.fchmod, 0)
1891
1892    @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
1893    def test_fchown(self):
1894        self.check(os.fchown, -1, -1)
1895
1896    @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
1897    def test_fpathconf(self):
1898        self.check(os.pathconf, "PC_NAME_MAX")
1899        self.check(os.fpathconf, "PC_NAME_MAX")
1900
1901    @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
1902    def test_ftruncate(self):
1903        self.check(os.truncate, 0)
1904        self.check(os.ftruncate, 0)
1905
1906    @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
1907    def test_lseek(self):
1908        self.check(os.lseek, 0, 0)
1909
1910    @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
1911    def test_read(self):
1912        self.check(os.read, 1)
1913
1914    @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1915    def test_readv(self):
1916        buf = bytearray(10)
1917        self.check(os.readv, [buf])
1918
1919    @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
1920    def test_tcsetpgrpt(self):
1921        self.check(os.tcsetpgrp, 0)
1922
1923    @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
1924    def test_write(self):
1925        self.check(os.write, b" ")
1926
1927    @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1928    def test_writev(self):
1929        self.check(os.writev, [b'abc'])
1930
1931    def test_inheritable(self):
1932        self.check(os.get_inheritable)
1933        self.check(os.set_inheritable, True)
1934
1935    @unittest.skipUnless(hasattr(os, 'get_blocking'),
1936                         'needs os.get_blocking() and os.set_blocking()')
1937    def test_blocking(self):
1938        self.check(os.get_blocking)
1939        self.check(os.set_blocking, True)
1940
1941
1942class LinkTests(unittest.TestCase):
1943    def setUp(self):
1944        self.file1 = support.TESTFN
1945        self.file2 = os.path.join(support.TESTFN + "2")
1946
1947    def tearDown(self):
1948        for file in (self.file1, self.file2):
1949            if os.path.exists(file):
1950                os.unlink(file)
1951
1952    def _test_link(self, file1, file2):
1953        create_file(file1)
1954
1955        try:
1956            os.link(file1, file2)
1957        except PermissionError as e:
1958            self.skipTest('os.link(): %s' % e)
1959        with open(file1, "r") as f1, open(file2, "r") as f2:
1960            self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1961
1962    def test_link(self):
1963        self._test_link(self.file1, self.file2)
1964
1965    def test_link_bytes(self):
1966        self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1967                        bytes(self.file2, sys.getfilesystemencoding()))
1968
1969    def test_unicode_name(self):
1970        try:
1971            os.fsencode("\xf1")
1972        except UnicodeError:
1973            raise unittest.SkipTest("Unable to encode for this platform.")
1974
1975        self.file1 += "\xf1"
1976        self.file2 = self.file1 + "2"
1977        self._test_link(self.file1, self.file2)
1978
1979@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1980class PosixUidGidTests(unittest.TestCase):
1981    # uid_t and gid_t are 32-bit unsigned integers on Linux
1982    UID_OVERFLOW = (1 << 32)
1983    GID_OVERFLOW = (1 << 32)
1984
1985    @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1986    def test_setuid(self):
1987        if os.getuid() != 0:
1988            self.assertRaises(OSError, os.setuid, 0)
1989        self.assertRaises(TypeError, os.setuid, 'not an int')
1990        self.assertRaises(OverflowError, os.setuid, self.UID_OVERFLOW)
1991
1992    @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1993    def test_setgid(self):
1994        if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1995            self.assertRaises(OSError, os.setgid, 0)
1996        self.assertRaises(TypeError, os.setgid, 'not an int')
1997        self.assertRaises(OverflowError, os.setgid, self.GID_OVERFLOW)
1998
1999    @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
2000    def test_seteuid(self):
2001        if os.getuid() != 0:
2002            self.assertRaises(OSError, os.seteuid, 0)
2003        self.assertRaises(TypeError, os.setegid, 'not an int')
2004        self.assertRaises(OverflowError, os.seteuid, self.UID_OVERFLOW)
2005
2006    @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
2007    def test_setegid(self):
2008        if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2009            self.assertRaises(OSError, os.setegid, 0)
2010        self.assertRaises(TypeError, os.setegid, 'not an int')
2011        self.assertRaises(OverflowError, os.setegid, self.GID_OVERFLOW)
2012
2013    @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
2014    def test_setreuid(self):
2015        if os.getuid() != 0:
2016            self.assertRaises(OSError, os.setreuid, 0, 0)
2017        self.assertRaises(TypeError, os.setreuid, 'not an int', 0)
2018        self.assertRaises(TypeError, os.setreuid, 0, 'not an int')
2019        self.assertRaises(OverflowError, os.setreuid, self.UID_OVERFLOW, 0)
2020        self.assertRaises(OverflowError, os.setreuid, 0, self.UID_OVERFLOW)
2021
2022    @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
2023    def test_setreuid_neg1(self):
2024        # Needs to accept -1.  We run this in a subprocess to avoid
2025        # altering the test runner's process state (issue8045).
2026        subprocess.check_call([
2027                sys.executable, '-c',
2028                'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
2029
2030    @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
2031    def test_setregid(self):
2032        if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2033            self.assertRaises(OSError, os.setregid, 0, 0)
2034        self.assertRaises(TypeError, os.setregid, 'not an int', 0)
2035        self.assertRaises(TypeError, os.setregid, 0, 'not an int')
2036        self.assertRaises(OverflowError, os.setregid, self.GID_OVERFLOW, 0)
2037        self.assertRaises(OverflowError, os.setregid, 0, self.GID_OVERFLOW)
2038
2039    @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
2040    def test_setregid_neg1(self):
2041        # Needs to accept -1.  We run this in a subprocess to avoid
2042        # altering the test runner's process state (issue8045).
2043        subprocess.check_call([
2044                sys.executable, '-c',
2045                'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
2046
2047@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
2048class Pep383Tests(unittest.TestCase):
2049    def setUp(self):
2050        if support.TESTFN_UNENCODABLE:
2051            self.dir = support.TESTFN_UNENCODABLE
2052        elif support.TESTFN_NONASCII:
2053            self.dir = support.TESTFN_NONASCII
2054        else:
2055            self.dir = support.TESTFN
2056        self.bdir = os.fsencode(self.dir)
2057
2058        bytesfn = []
2059        def add_filename(fn):
2060            try:
2061                fn = os.fsencode(fn)
2062            except UnicodeEncodeError:
2063                return
2064            bytesfn.append(fn)
2065        add_filename(support.TESTFN_UNICODE)
2066        if support.TESTFN_UNENCODABLE:
2067            add_filename(support.TESTFN_UNENCODABLE)
2068        if support.TESTFN_NONASCII:
2069            add_filename(support.TESTFN_NONASCII)
2070        if not bytesfn:
2071            self.skipTest("couldn't create any non-ascii filename")
2072
2073        self.unicodefn = set()
2074        os.mkdir(self.dir)
2075        try:
2076            for fn in bytesfn:
2077                support.create_empty_file(os.path.join(self.bdir, fn))
2078                fn = os.fsdecode(fn)
2079                if fn in self.unicodefn:
2080                    raise ValueError("duplicate filename")
2081                self.unicodefn.add(fn)
2082        except:
2083            shutil.rmtree(self.dir)
2084            raise
2085
2086    def tearDown(self):
2087        shutil.rmtree(self.dir)
2088
2089    def test_listdir(self):
2090        expected = self.unicodefn
2091        found = set(os.listdir(self.dir))
2092        self.assertEqual(found, expected)
2093        # test listdir without arguments
2094        current_directory = os.getcwd()
2095        try:
2096            os.chdir(os.sep)
2097            self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
2098        finally:
2099            os.chdir(current_directory)
2100
2101    def test_open(self):
2102        for fn in self.unicodefn:
2103            f = open(os.path.join(self.dir, fn), 'rb')
2104            f.close()
2105
2106    @unittest.skipUnless(hasattr(os, 'statvfs'),
2107                            "need os.statvfs()")
2108    def test_statvfs(self):
2109        # issue #9645
2110        for fn in self.unicodefn:
2111            # should not fail with file not found error
2112            fullname = os.path.join(self.dir, fn)
2113            os.statvfs(fullname)
2114
2115    def test_stat(self):
2116        for fn in self.unicodefn:
2117            os.stat(os.path.join(self.dir, fn))
2118
2119@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2120class Win32KillTests(unittest.TestCase):
2121    def _kill(self, sig):
2122        # Start sys.executable as a subprocess and communicate from the
2123        # subprocess to the parent that the interpreter is ready. When it
2124        # becomes ready, send *sig* via os.kill to the subprocess and check
2125        # that the return code is equal to *sig*.
2126        import ctypes
2127        from ctypes import wintypes
2128        import msvcrt
2129
2130        # Since we can't access the contents of the process' stdout until the
2131        # process has exited, use PeekNamedPipe to see what's inside stdout
2132        # without waiting. This is done so we can tell that the interpreter
2133        # is started and running at a point where it could handle a signal.
2134        PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
2135        PeekNamedPipe.restype = wintypes.BOOL
2136        PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
2137                                  ctypes.POINTER(ctypes.c_char), # stdout buf
2138                                  wintypes.DWORD, # Buffer size
2139                                  ctypes.POINTER(wintypes.DWORD), # bytes read
2140                                  ctypes.POINTER(wintypes.DWORD), # bytes avail
2141                                  ctypes.POINTER(wintypes.DWORD)) # bytes left
2142        msg = "running"
2143        proc = subprocess.Popen([sys.executable, "-c",
2144                                 "import sys;"
2145                                 "sys.stdout.write('{}');"
2146                                 "sys.stdout.flush();"
2147                                 "input()".format(msg)],
2148                                stdout=subprocess.PIPE,
2149                                stderr=subprocess.PIPE,
2150                                stdin=subprocess.PIPE)
2151        self.addCleanup(proc.stdout.close)
2152        self.addCleanup(proc.stderr.close)
2153        self.addCleanup(proc.stdin.close)
2154
2155        count, max = 0, 100
2156        while count < max and proc.poll() is None:
2157            # Create a string buffer to store the result of stdout from the pipe
2158            buf = ctypes.create_string_buffer(len(msg))
2159            # Obtain the text currently in proc.stdout
2160            # Bytes read/avail/left are left as NULL and unused
2161            rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
2162                                 buf, ctypes.sizeof(buf), None, None, None)
2163            self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
2164            if buf.value:
2165                self.assertEqual(msg, buf.value.decode())
2166                break
2167            time.sleep(0.1)
2168            count += 1
2169        else:
2170            self.fail("Did not receive communication from the subprocess")
2171
2172        os.kill(proc.pid, sig)
2173        self.assertEqual(proc.wait(), sig)
2174
2175    def test_kill_sigterm(self):
2176        # SIGTERM doesn't mean anything special, but make sure it works
2177        self._kill(signal.SIGTERM)
2178
2179    def test_kill_int(self):
2180        # os.kill on Windows can take an int which gets set as the exit code
2181        self._kill(100)
2182
2183    def _kill_with_event(self, event, name):
2184        tagname = "test_os_%s" % uuid.uuid1()
2185        m = mmap.mmap(-1, 1, tagname)
2186        m[0] = 0
2187        # Run a script which has console control handling enabled.
2188        proc = subprocess.Popen([sys.executable,
2189                   os.path.join(os.path.dirname(__file__),
2190                                "win_console_handler.py"), tagname],
2191                   creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
2192        # Let the interpreter startup before we send signals. See #3137.
2193        count, max = 0, 100
2194        while count < max and proc.poll() is None:
2195            if m[0] == 1:
2196                break
2197            time.sleep(0.1)
2198            count += 1
2199        else:
2200            # Forcefully kill the process if we weren't able to signal it.
2201            os.kill(proc.pid, signal.SIGINT)
2202            self.fail("Subprocess didn't finish initialization")
2203        os.kill(proc.pid, event)
2204        # proc.send_signal(event) could also be done here.
2205        # Allow time for the signal to be passed and the process to exit.
2206        time.sleep(0.5)
2207        if not proc.poll():
2208            # Forcefully kill the process if we weren't able to signal it.
2209            os.kill(proc.pid, signal.SIGINT)
2210            self.fail("subprocess did not stop on {}".format(name))
2211
2212    @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
2213    def test_CTRL_C_EVENT(self):
2214        from ctypes import wintypes
2215        import ctypes
2216
2217        # Make a NULL value by creating a pointer with no argument.
2218        NULL = ctypes.POINTER(ctypes.c_int)()
2219        SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
2220        SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
2221                                          wintypes.BOOL)
2222        SetConsoleCtrlHandler.restype = wintypes.BOOL
2223
2224        # Calling this with NULL and FALSE causes the calling process to
2225        # handle Ctrl+C, rather than ignore it. This property is inherited
2226        # by subprocesses.
2227        SetConsoleCtrlHandler(NULL, 0)
2228
2229        self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
2230
2231    def test_CTRL_BREAK_EVENT(self):
2232        self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
2233
2234
2235@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2236class Win32ListdirTests(unittest.TestCase):
2237    """Test listdir on Windows."""
2238
2239    def setUp(self):
2240        self.created_paths = []
2241        for i in range(2):
2242            dir_name = 'SUB%d' % i
2243            dir_path = os.path.join(support.TESTFN, dir_name)
2244            file_name = 'FILE%d' % i
2245            file_path = os.path.join(support.TESTFN, file_name)
2246            os.makedirs(dir_path)
2247            with open(file_path, 'w') as f:
2248                f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
2249            self.created_paths.extend([dir_name, file_name])
2250        self.created_paths.sort()
2251
2252    def tearDown(self):
2253        shutil.rmtree(support.TESTFN)
2254
2255    def test_listdir_no_extended_path(self):
2256        """Test when the path is not an "extended" path."""
2257        # unicode
2258        self.assertEqual(
2259                sorted(os.listdir(support.TESTFN)),
2260                self.created_paths)
2261
2262        # bytes
2263        self.assertEqual(
2264                sorted(os.listdir(os.fsencode(support.TESTFN))),
2265                [os.fsencode(path) for path in self.created_paths])
2266
2267    def test_listdir_extended_path(self):
2268        """Test when the path starts with '\\\\?\\'."""
2269        # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
2270        # unicode
2271        path = '\\\\?\\' + os.path.abspath(support.TESTFN)
2272        self.assertEqual(
2273                sorted(os.listdir(path)),
2274                self.created_paths)
2275
2276        # bytes
2277        path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
2278        self.assertEqual(
2279                sorted(os.listdir(path)),
2280                [os.fsencode(path) for path in self.created_paths])
2281
2282
2283@unittest.skipUnless(hasattr(os, 'readlink'), 'needs os.readlink()')
2284class ReadlinkTests(unittest.TestCase):
2285    filelink = 'readlinktest'
2286    filelink_target = os.path.abspath(__file__)
2287    filelinkb = os.fsencode(filelink)
2288    filelinkb_target = os.fsencode(filelink_target)
2289
2290    def assertPathEqual(self, left, right):
2291        left = os.path.normcase(left)
2292        right = os.path.normcase(right)
2293        if sys.platform == 'win32':
2294            # Bad practice to blindly strip the prefix as it may be required to
2295            # correctly refer to the file, but we're only comparing paths here.
2296            has_prefix = lambda p: p.startswith(
2297                b'\\\\?\\' if isinstance(p, bytes) else '\\\\?\\')
2298            if has_prefix(left):
2299                left = left[4:]
2300            if has_prefix(right):
2301                right = right[4:]
2302        self.assertEqual(left, right)
2303
2304    def setUp(self):
2305        self.assertTrue(os.path.exists(self.filelink_target))
2306        self.assertTrue(os.path.exists(self.filelinkb_target))
2307        self.assertFalse(os.path.exists(self.filelink))
2308        self.assertFalse(os.path.exists(self.filelinkb))
2309
2310    def test_not_symlink(self):
2311        filelink_target = FakePath(self.filelink_target)
2312        self.assertRaises(OSError, os.readlink, self.filelink_target)
2313        self.assertRaises(OSError, os.readlink, filelink_target)
2314
2315    def test_missing_link(self):
2316        self.assertRaises(FileNotFoundError, os.readlink, 'missing-link')
2317        self.assertRaises(FileNotFoundError, os.readlink,
2318                          FakePath('missing-link'))
2319
2320    @support.skip_unless_symlink
2321    def test_pathlike(self):
2322        os.symlink(self.filelink_target, self.filelink)
2323        self.addCleanup(support.unlink, self.filelink)
2324        filelink = FakePath(self.filelink)
2325        self.assertPathEqual(os.readlink(filelink), self.filelink_target)
2326
2327    @support.skip_unless_symlink
2328    def test_pathlike_bytes(self):
2329        os.symlink(self.filelinkb_target, self.filelinkb)
2330        self.addCleanup(support.unlink, self.filelinkb)
2331        path = os.readlink(FakePath(self.filelinkb))
2332        self.assertPathEqual(path, self.filelinkb_target)
2333        self.assertIsInstance(path, bytes)
2334
2335    @support.skip_unless_symlink
2336    def test_bytes(self):
2337        os.symlink(self.filelinkb_target, self.filelinkb)
2338        self.addCleanup(support.unlink, self.filelinkb)
2339        path = os.readlink(self.filelinkb)
2340        self.assertPathEqual(path, self.filelinkb_target)
2341        self.assertIsInstance(path, bytes)
2342
2343
2344@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2345@support.skip_unless_symlink
2346class Win32SymlinkTests(unittest.TestCase):
2347    filelink = 'filelinktest'
2348    filelink_target = os.path.abspath(__file__)
2349    dirlink = 'dirlinktest'
2350    dirlink_target = os.path.dirname(filelink_target)
2351    missing_link = 'missing link'
2352
2353    def setUp(self):
2354        assert os.path.exists(self.dirlink_target)
2355        assert os.path.exists(self.filelink_target)
2356        assert not os.path.exists(self.dirlink)
2357        assert not os.path.exists(self.filelink)
2358        assert not os.path.exists(self.missing_link)
2359
2360    def tearDown(self):
2361        if os.path.exists(self.filelink):
2362            os.remove(self.filelink)
2363        if os.path.exists(self.dirlink):
2364            os.rmdir(self.dirlink)
2365        if os.path.lexists(self.missing_link):
2366            os.remove(self.missing_link)
2367
2368    def test_directory_link(self):
2369        os.symlink(self.dirlink_target, self.dirlink)
2370        self.assertTrue(os.path.exists(self.dirlink))
2371        self.assertTrue(os.path.isdir(self.dirlink))
2372        self.assertTrue(os.path.islink(self.dirlink))
2373        self.check_stat(self.dirlink, self.dirlink_target)
2374
2375    def test_file_link(self):
2376        os.symlink(self.filelink_target, self.filelink)
2377        self.assertTrue(os.path.exists(self.filelink))
2378        self.assertTrue(os.path.isfile(self.filelink))
2379        self.assertTrue(os.path.islink(self.filelink))
2380        self.check_stat(self.filelink, self.filelink_target)
2381
2382    def _create_missing_dir_link(self):
2383        'Create a "directory" link to a non-existent target'
2384        linkname = self.missing_link
2385        if os.path.lexists(linkname):
2386            os.remove(linkname)
2387        target = r'c:\\target does not exist.29r3c740'
2388        assert not os.path.exists(target)
2389        target_is_dir = True
2390        os.symlink(target, linkname, target_is_dir)
2391
2392    def test_remove_directory_link_to_missing_target(self):
2393        self._create_missing_dir_link()
2394        # For compatibility with Unix, os.remove will check the
2395        #  directory status and call RemoveDirectory if the symlink
2396        #  was created with target_is_dir==True.
2397        os.remove(self.missing_link)
2398
2399    def test_isdir_on_directory_link_to_missing_target(self):
2400        self._create_missing_dir_link()
2401        self.assertFalse(os.path.isdir(self.missing_link))
2402
2403    def test_rmdir_on_directory_link_to_missing_target(self):
2404        self._create_missing_dir_link()
2405        os.rmdir(self.missing_link)
2406
2407    def check_stat(self, link, target):
2408        self.assertEqual(os.stat(link), os.stat(target))
2409        self.assertNotEqual(os.lstat(link), os.stat(link))
2410
2411        bytes_link = os.fsencode(link)
2412        self.assertEqual(os.stat(bytes_link), os.stat(target))
2413        self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
2414
2415    def test_12084(self):
2416        level1 = os.path.abspath(support.TESTFN)
2417        level2 = os.path.join(level1, "level2")
2418        level3 = os.path.join(level2, "level3")
2419        self.addCleanup(support.rmtree, level1)
2420
2421        os.mkdir(level1)
2422        os.mkdir(level2)
2423        os.mkdir(level3)
2424
2425        file1 = os.path.abspath(os.path.join(level1, "file1"))
2426        create_file(file1)
2427
2428        orig_dir = os.getcwd()
2429        try:
2430            os.chdir(level2)
2431            link = os.path.join(level2, "link")
2432            os.symlink(os.path.relpath(file1), "link")
2433            self.assertIn("link", os.listdir(os.getcwd()))
2434
2435            # Check os.stat calls from the same dir as the link
2436            self.assertEqual(os.stat(file1), os.stat("link"))
2437
2438            # Check os.stat calls from a dir below the link
2439            os.chdir(level1)
2440            self.assertEqual(os.stat(file1),
2441                             os.stat(os.path.relpath(link)))
2442
2443            # Check os.stat calls from a dir above the link
2444            os.chdir(level3)
2445            self.assertEqual(os.stat(file1),
2446                             os.stat(os.path.relpath(link)))
2447        finally:
2448            os.chdir(orig_dir)
2449
2450    @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users')
2451                            and os.path.exists(r'C:\ProgramData'),
2452                            'Test directories not found')
2453    def test_29248(self):
2454        # os.symlink() calls CreateSymbolicLink, which creates
2455        # the reparse data buffer with the print name stored
2456        # first, so the offset is always 0. CreateSymbolicLink
2457        # stores the "PrintName" DOS path (e.g. "C:\") first,
2458        # with an offset of 0, followed by the "SubstituteName"
2459        # NT path (e.g. "\??\C:\"). The "All Users" link, on
2460        # the other hand, seems to have been created manually
2461        # with an inverted order.
2462        target = os.readlink(r'C:\Users\All Users')
2463        self.assertTrue(os.path.samefile(target, r'C:\ProgramData'))
2464
2465    def test_buffer_overflow(self):
2466        # Older versions would have a buffer overflow when detecting
2467        # whether a link source was a directory. This test ensures we
2468        # no longer crash, but does not otherwise validate the behavior
2469        segment = 'X' * 27
2470        path = os.path.join(*[segment] * 10)
2471        test_cases = [
2472            # overflow with absolute src
2473            ('\\' + path, segment),
2474            # overflow dest with relative src
2475            (segment, path),
2476            # overflow when joining src
2477            (path[:180], path[:180]),
2478        ]
2479        for src, dest in test_cases:
2480            try:
2481                os.symlink(src, dest)
2482            except FileNotFoundError:
2483                pass
2484            else:
2485                try:
2486                    os.remove(dest)
2487                except OSError:
2488                    pass
2489            # Also test with bytes, since that is a separate code path.
2490            try:
2491                os.symlink(os.fsencode(src), os.fsencode(dest))
2492            except FileNotFoundError:
2493                pass
2494            else:
2495                try:
2496                    os.remove(dest)
2497                except OSError:
2498                    pass
2499
2500    def test_appexeclink(self):
2501        root = os.path.expandvars(r'%LOCALAPPDATA%\Microsoft\WindowsApps')
2502        if not os.path.isdir(root):
2503            self.skipTest("test requires a WindowsApps directory")
2504
2505        aliases = [os.path.join(root, a)
2506                   for a in fnmatch.filter(os.listdir(root), '*.exe')]
2507
2508        for alias in aliases:
2509            if support.verbose:
2510                print()
2511                print("Testing with", alias)
2512            st = os.lstat(alias)
2513            self.assertEqual(st, os.stat(alias))
2514            self.assertFalse(stat.S_ISLNK(st.st_mode))
2515            self.assertEqual(st.st_reparse_tag, stat.IO_REPARSE_TAG_APPEXECLINK)
2516            # testing the first one we see is sufficient
2517            break
2518        else:
2519            self.skipTest("test requires an app execution alias")
2520
2521@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2522class Win32JunctionTests(unittest.TestCase):
2523    junction = 'junctiontest'
2524    junction_target = os.path.dirname(os.path.abspath(__file__))
2525
2526    def setUp(self):
2527        assert os.path.exists(self.junction_target)
2528        assert not os.path.lexists(self.junction)
2529
2530    def tearDown(self):
2531        if os.path.lexists(self.junction):
2532            os.unlink(self.junction)
2533
2534    def test_create_junction(self):
2535        _winapi.CreateJunction(self.junction_target, self.junction)
2536        self.assertTrue(os.path.lexists(self.junction))
2537        self.assertTrue(os.path.exists(self.junction))
2538        self.assertTrue(os.path.isdir(self.junction))
2539        self.assertNotEqual(os.stat(self.junction), os.lstat(self.junction))
2540        self.assertEqual(os.stat(self.junction), os.stat(self.junction_target))
2541
2542        # bpo-37834: Junctions are not recognized as links.
2543        self.assertFalse(os.path.islink(self.junction))
2544        self.assertEqual(os.path.normcase("\\\\?\\" + self.junction_target),
2545                         os.path.normcase(os.readlink(self.junction)))
2546
2547    def test_unlink_removes_junction(self):
2548        _winapi.CreateJunction(self.junction_target, self.junction)
2549        self.assertTrue(os.path.exists(self.junction))
2550        self.assertTrue(os.path.lexists(self.junction))
2551
2552        os.unlink(self.junction)
2553        self.assertFalse(os.path.exists(self.junction))
2554
2555@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2556class Win32NtTests(unittest.TestCase):
2557    def test_getfinalpathname_handles(self):
2558        nt = support.import_module('nt')
2559        ctypes = support.import_module('ctypes')
2560        import ctypes.wintypes
2561
2562        kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True)
2563        kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE
2564
2565        kernel.GetProcessHandleCount.restype = ctypes.wintypes.BOOL
2566        kernel.GetProcessHandleCount.argtypes = (ctypes.wintypes.HANDLE,
2567                                                 ctypes.wintypes.LPDWORD)
2568
2569        # This is a pseudo-handle that doesn't need to be closed
2570        hproc = kernel.GetCurrentProcess()
2571
2572        handle_count = ctypes.wintypes.DWORD()
2573        ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2574        self.assertEqual(1, ok)
2575
2576        before_count = handle_count.value
2577
2578        # The first two test the error path, __file__ tests the success path
2579        filenames = [
2580            r'\\?\C:',
2581            r'\\?\NUL',
2582            r'\\?\CONIN',
2583            __file__,
2584        ]
2585
2586        for _ in range(10):
2587            for name in filenames:
2588                try:
2589                    nt._getfinalpathname(name)
2590                except Exception:
2591                    # Failure is expected
2592                    pass
2593                try:
2594                    os.stat(name)
2595                except Exception:
2596                    pass
2597
2598        ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2599        self.assertEqual(1, ok)
2600
2601        handle_delta = handle_count.value - before_count
2602
2603        self.assertEqual(0, handle_delta)
2604
2605@support.skip_unless_symlink
2606class NonLocalSymlinkTests(unittest.TestCase):
2607
2608    def setUp(self):
2609        r"""
2610        Create this structure:
2611
2612        base
2613         \___ some_dir
2614        """
2615        os.makedirs('base/some_dir')
2616
2617    def tearDown(self):
2618        shutil.rmtree('base')
2619
2620    def test_directory_link_nonlocal(self):
2621        """
2622        The symlink target should resolve relative to the link, not relative
2623        to the current directory.
2624
2625        Then, link base/some_link -> base/some_dir and ensure that some_link
2626        is resolved as a directory.
2627
2628        In issue13772, it was discovered that directory detection failed if
2629        the symlink target was not specified relative to the current
2630        directory, which was a defect in the implementation.
2631        """
2632        src = os.path.join('base', 'some_link')
2633        os.symlink('some_dir', src)
2634        assert os.path.isdir(src)
2635
2636
2637class FSEncodingTests(unittest.TestCase):
2638    def test_nop(self):
2639        self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2640        self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
2641
2642    def test_identity(self):
2643        # assert fsdecode(fsencode(x)) == x
2644        for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2645            try:
2646                bytesfn = os.fsencode(fn)
2647            except UnicodeEncodeError:
2648                continue
2649            self.assertEqual(os.fsdecode(bytesfn), fn)
2650
2651
2652
2653class DeviceEncodingTests(unittest.TestCase):
2654
2655    def test_bad_fd(self):
2656        # Return None when an fd doesn't actually exist.
2657        self.assertIsNone(os.device_encoding(123456))
2658
2659    @unittest.skipUnless(os.isatty(0) and not win32_is_iot() and (sys.platform.startswith('win') or
2660            (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
2661            'test requires a tty and either Windows or nl_langinfo(CODESET)')
2662    def test_device_encoding(self):
2663        encoding = os.device_encoding(0)
2664        self.assertIsNotNone(encoding)
2665        self.assertTrue(codecs.lookup(encoding))
2666
2667
2668class PidTests(unittest.TestCase):
2669    @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2670    def test_getppid(self):
2671        p = subprocess.Popen([sys.executable, '-c',
2672                              'import os; print(os.getppid())'],
2673                             stdout=subprocess.PIPE)
2674        stdout, _ = p.communicate()
2675        # We are the parent of our subprocess
2676        self.assertEqual(int(stdout), os.getpid())
2677
2678    def check_waitpid(self, code, exitcode):
2679        if sys.platform == 'win32':
2680            # On Windows, os.spawnv() simply joins arguments with spaces:
2681            # arguments need to be quoted
2682            args = [f'"{sys.executable}"', '-c', f'"{code}"']
2683        else:
2684            args = [sys.executable, '-c', code]
2685        pid = os.spawnv(os.P_NOWAIT, sys.executable, args)
2686
2687        pid2, status = os.waitpid(pid, 0)
2688        if sys.platform == 'win32':
2689            self.assertEqual(status, exitcode << 8)
2690        else:
2691            self.assertTrue(os.WIFEXITED(status), status)
2692            self.assertEqual(os.WEXITSTATUS(status), exitcode)
2693        self.assertEqual(pid2, pid)
2694
2695    def test_waitpid(self):
2696        self.check_waitpid(code='pass', exitcode=0)
2697
2698    def test_waitpid_exitcode(self):
2699        exitcode = 23
2700        code = f'import sys; sys.exit({exitcode})'
2701        self.check_waitpid(code, exitcode=exitcode)
2702
2703    @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
2704    def test_waitpid_windows(self):
2705        # bpo-40138: test os.waitpid() with exit code larger than INT_MAX.
2706        STATUS_CONTROL_C_EXIT = 0xC000013A
2707        code = f'import _winapi; _winapi.ExitProcess({STATUS_CONTROL_C_EXIT})'
2708        self.check_waitpid(code, exitcode=STATUS_CONTROL_C_EXIT)
2709
2710
2711class SpawnTests(unittest.TestCase):
2712    def create_args(self, *, with_env=False, use_bytes=False):
2713        self.exitcode = 17
2714
2715        filename = support.TESTFN
2716        self.addCleanup(support.unlink, filename)
2717
2718        if not with_env:
2719            code = 'import sys; sys.exit(%s)' % self.exitcode
2720        else:
2721            self.env = dict(os.environ)
2722            # create an unique key
2723            self.key = str(uuid.uuid4())
2724            self.env[self.key] = self.key
2725            # read the variable from os.environ to check that it exists
2726            code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
2727                    % (self.key, self.exitcode))
2728
2729        with open(filename, "w") as fp:
2730            fp.write(code)
2731
2732        args = [sys.executable, filename]
2733        if use_bytes:
2734            args = [os.fsencode(a) for a in args]
2735            self.env = {os.fsencode(k): os.fsencode(v)
2736                        for k, v in self.env.items()}
2737
2738        return args
2739
2740    @requires_os_func('spawnl')
2741    def test_spawnl(self):
2742        args = self.create_args()
2743        exitcode = os.spawnl(os.P_WAIT, args[0], *args)
2744        self.assertEqual(exitcode, self.exitcode)
2745
2746    @requires_os_func('spawnle')
2747    def test_spawnle(self):
2748        args = self.create_args(with_env=True)
2749        exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
2750        self.assertEqual(exitcode, self.exitcode)
2751
2752    @requires_os_func('spawnlp')
2753    def test_spawnlp(self):
2754        args = self.create_args()
2755        exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
2756        self.assertEqual(exitcode, self.exitcode)
2757
2758    @requires_os_func('spawnlpe')
2759    def test_spawnlpe(self):
2760        args = self.create_args(with_env=True)
2761        exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
2762        self.assertEqual(exitcode, self.exitcode)
2763
2764    @requires_os_func('spawnv')
2765    def test_spawnv(self):
2766        args = self.create_args()
2767        exitcode = os.spawnv(os.P_WAIT, args[0], args)
2768        self.assertEqual(exitcode, self.exitcode)
2769
2770    @requires_os_func('spawnve')
2771    def test_spawnve(self):
2772        args = self.create_args(with_env=True)
2773        exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2774        self.assertEqual(exitcode, self.exitcode)
2775
2776    @requires_os_func('spawnvp')
2777    def test_spawnvp(self):
2778        args = self.create_args()
2779        exitcode = os.spawnvp(os.P_WAIT, args[0], args)
2780        self.assertEqual(exitcode, self.exitcode)
2781
2782    @requires_os_func('spawnvpe')
2783    def test_spawnvpe(self):
2784        args = self.create_args(with_env=True)
2785        exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
2786        self.assertEqual(exitcode, self.exitcode)
2787
2788    @requires_os_func('spawnv')
2789    def test_nowait(self):
2790        args = self.create_args()
2791        pid = os.spawnv(os.P_NOWAIT, args[0], args)
2792        result = os.waitpid(pid, 0)
2793        self.assertEqual(result[0], pid)
2794        status = result[1]
2795        if hasattr(os, 'WIFEXITED'):
2796            self.assertTrue(os.WIFEXITED(status))
2797            self.assertEqual(os.WEXITSTATUS(status), self.exitcode)
2798        else:
2799            self.assertEqual(status, self.exitcode << 8)
2800
2801    @requires_os_func('spawnve')
2802    def test_spawnve_bytes(self):
2803        # Test bytes handling in parse_arglist and parse_envlist (#28114)
2804        args = self.create_args(with_env=True, use_bytes=True)
2805        exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2806        self.assertEqual(exitcode, self.exitcode)
2807
2808    @requires_os_func('spawnl')
2809    def test_spawnl_noargs(self):
2810        args = self.create_args()
2811        self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
2812        self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
2813
2814    @requires_os_func('spawnle')
2815    def test_spawnle_noargs(self):
2816        args = self.create_args()
2817        self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
2818        self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
2819
2820    @requires_os_func('spawnv')
2821    def test_spawnv_noargs(self):
2822        args = self.create_args()
2823        self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
2824        self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
2825        self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
2826        self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
2827
2828    @requires_os_func('spawnve')
2829    def test_spawnve_noargs(self):
2830        args = self.create_args()
2831        self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
2832        self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
2833        self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
2834        self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
2835
2836    def _test_invalid_env(self, spawn):
2837        args = [sys.executable, '-c', 'pass']
2838
2839        # null character in the environment variable name
2840        newenv = os.environ.copy()
2841        newenv["FRUIT\0VEGETABLE"] = "cabbage"
2842        try:
2843            exitcode = spawn(os.P_WAIT, args[0], args, newenv)
2844        except ValueError:
2845            pass
2846        else:
2847            self.assertEqual(exitcode, 127)
2848
2849        # null character in the environment variable value
2850        newenv = os.environ.copy()
2851        newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
2852        try:
2853            exitcode = spawn(os.P_WAIT, args[0], args, newenv)
2854        except ValueError:
2855            pass
2856        else:
2857            self.assertEqual(exitcode, 127)
2858
2859        # equal character in the environment variable name
2860        newenv = os.environ.copy()
2861        newenv["FRUIT=ORANGE"] = "lemon"
2862        try:
2863            exitcode = spawn(os.P_WAIT, args[0], args, newenv)
2864        except ValueError:
2865            pass
2866        else:
2867            self.assertEqual(exitcode, 127)
2868
2869        # equal character in the environment variable value
2870        filename = support.TESTFN
2871        self.addCleanup(support.unlink, filename)
2872        with open(filename, "w") as fp:
2873            fp.write('import sys, os\n'
2874                     'if os.getenv("FRUIT") != "orange=lemon":\n'
2875                     '    raise AssertionError')
2876        args = [sys.executable, filename]
2877        newenv = os.environ.copy()
2878        newenv["FRUIT"] = "orange=lemon"
2879        exitcode = spawn(os.P_WAIT, args[0], args, newenv)
2880        self.assertEqual(exitcode, 0)
2881
2882    @requires_os_func('spawnve')
2883    def test_spawnve_invalid_env(self):
2884        self._test_invalid_env(os.spawnve)
2885
2886    @requires_os_func('spawnvpe')
2887    def test_spawnvpe_invalid_env(self):
2888        self._test_invalid_env(os.spawnvpe)
2889
2890
2891# The introduction of this TestCase caused at least two different errors on
2892# *nix buildbots. Temporarily skip this to let the buildbots move along.
2893@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
2894@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2895class LoginTests(unittest.TestCase):
2896    def test_getlogin(self):
2897        user_name = os.getlogin()
2898        self.assertNotEqual(len(user_name), 0)
2899
2900
2901@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2902                     "needs os.getpriority and os.setpriority")
2903class ProgramPriorityTests(unittest.TestCase):
2904    """Tests for os.getpriority() and os.setpriority()."""
2905
2906    def test_set_get_priority(self):
2907
2908        base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2909        os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2910        try:
2911            new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2912            if base >= 19 and new_prio <= 19:
2913                raise unittest.SkipTest("unable to reliably test setpriority "
2914                                        "at current nice level of %s" % base)
2915            else:
2916                self.assertEqual(new_prio, base + 1)
2917        finally:
2918            try:
2919                os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2920            except OSError as err:
2921                if err.errno != errno.EACCES:
2922                    raise
2923
2924
2925class SendfileTestServer(asyncore.dispatcher, threading.Thread):
2926
2927    class Handler(asynchat.async_chat):
2928
2929        def __init__(self, conn):
2930            asynchat.async_chat.__init__(self, conn)
2931            self.in_buffer = []
2932            self.accumulate = True
2933            self.closed = False
2934            self.push(b"220 ready\r\n")
2935
2936        def handle_read(self):
2937            data = self.recv(4096)
2938            if self.accumulate:
2939                self.in_buffer.append(data)
2940
2941        def get_data(self):
2942            return b''.join(self.in_buffer)
2943
2944        def handle_close(self):
2945            self.close()
2946            self.closed = True
2947
2948        def handle_error(self):
2949            raise
2950
2951    def __init__(self, address):
2952        threading.Thread.__init__(self)
2953        asyncore.dispatcher.__init__(self)
2954        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2955        self.bind(address)
2956        self.listen(5)
2957        self.host, self.port = self.socket.getsockname()[:2]
2958        self.handler_instance = None
2959        self._active = False
2960        self._active_lock = threading.Lock()
2961
2962    # --- public API
2963
2964    @property
2965    def running(self):
2966        return self._active
2967
2968    def start(self):
2969        assert not self.running
2970        self.__flag = threading.Event()
2971        threading.Thread.start(self)
2972        self.__flag.wait()
2973
2974    def stop(self):
2975        assert self.running
2976        self._active = False
2977        self.join()
2978
2979    def wait(self):
2980        # wait for handler connection to be closed, then stop the server
2981        while not getattr(self.handler_instance, "closed", False):
2982            time.sleep(0.001)
2983        self.stop()
2984
2985    # --- internals
2986
2987    def run(self):
2988        self._active = True
2989        self.__flag.set()
2990        while self._active and asyncore.socket_map:
2991            self._active_lock.acquire()
2992            asyncore.loop(timeout=0.001, count=1)
2993            self._active_lock.release()
2994        asyncore.close_all()
2995
2996    def handle_accept(self):
2997        conn, addr = self.accept()
2998        self.handler_instance = self.Handler(conn)
2999
3000    def handle_connect(self):
3001        self.close()
3002    handle_read = handle_connect
3003
3004    def writable(self):
3005        return 0
3006
3007    def handle_error(self):
3008        raise
3009
3010
3011@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
3012class TestSendfile(unittest.TestCase):
3013
3014    DATA = b"12345abcde" * 16 * 1024  # 160 KiB
3015    SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
3016                               not sys.platform.startswith("solaris") and \
3017                               not sys.platform.startswith("sunos")
3018    requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
3019            'requires headers and trailers support')
3020    requires_32b = unittest.skipUnless(sys.maxsize < 2**32,
3021            'test is only meaningful on 32-bit builds')
3022
3023    @classmethod
3024    def setUpClass(cls):
3025        cls.key = support.threading_setup()
3026        create_file(support.TESTFN, cls.DATA)
3027
3028    @classmethod
3029    def tearDownClass(cls):
3030        support.threading_cleanup(*cls.key)
3031        support.unlink(support.TESTFN)
3032
3033    def setUp(self):
3034        self.server = SendfileTestServer((support.HOST, 0))
3035        self.server.start()
3036        self.client = socket.socket()
3037        self.client.connect((self.server.host, self.server.port))
3038        self.client.settimeout(1)
3039        # synchronize by waiting for "220 ready" response
3040        self.client.recv(1024)
3041        self.sockno = self.client.fileno()
3042        self.file = open(support.TESTFN, 'rb')
3043        self.fileno = self.file.fileno()
3044
3045    def tearDown(self):
3046        self.file.close()
3047        self.client.close()
3048        if self.server.running:
3049            self.server.stop()
3050        self.server = None
3051
3052    def sendfile_wrapper(self, *args, **kwargs):
3053        """A higher level wrapper representing how an application is
3054        supposed to use sendfile().
3055        """
3056        while True:
3057            try:
3058                return os.sendfile(*args, **kwargs)
3059            except OSError as err:
3060                if err.errno == errno.ECONNRESET:
3061                    # disconnected
3062                    raise
3063                elif err.errno in (errno.EAGAIN, errno.EBUSY):
3064                    # we have to retry send data
3065                    continue
3066                else:
3067                    raise
3068
3069    def test_send_whole_file(self):
3070        # normal send
3071        total_sent = 0
3072        offset = 0
3073        nbytes = 4096
3074        while total_sent < len(self.DATA):
3075            sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
3076            if sent == 0:
3077                break
3078            offset += sent
3079            total_sent += sent
3080            self.assertTrue(sent <= nbytes)
3081            self.assertEqual(offset, total_sent)
3082
3083        self.assertEqual(total_sent, len(self.DATA))
3084        self.client.shutdown(socket.SHUT_RDWR)
3085        self.client.close()
3086        self.server.wait()
3087        data = self.server.handler_instance.get_data()
3088        self.assertEqual(len(data), len(self.DATA))
3089        self.assertEqual(data, self.DATA)
3090
3091    def test_send_at_certain_offset(self):
3092        # start sending a file at a certain offset
3093        total_sent = 0
3094        offset = len(self.DATA) // 2
3095        must_send = len(self.DATA) - offset
3096        nbytes = 4096
3097        while total_sent < must_send:
3098            sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
3099            if sent == 0:
3100                break
3101            offset += sent
3102            total_sent += sent
3103            self.assertTrue(sent <= nbytes)
3104
3105        self.client.shutdown(socket.SHUT_RDWR)
3106        self.client.close()
3107        self.server.wait()
3108        data = self.server.handler_instance.get_data()
3109        expected = self.DATA[len(self.DATA) // 2:]
3110        self.assertEqual(total_sent, len(expected))
3111        self.assertEqual(len(data), len(expected))
3112        self.assertEqual(data, expected)
3113
3114    def test_offset_overflow(self):
3115        # specify an offset > file size
3116        offset = len(self.DATA) + 4096
3117        try:
3118            sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
3119        except OSError as e:
3120            # Solaris can raise EINVAL if offset >= file length, ignore.
3121            if e.errno != errno.EINVAL:
3122                raise
3123        else:
3124            self.assertEqual(sent, 0)
3125        self.client.shutdown(socket.SHUT_RDWR)
3126        self.client.close()
3127        self.server.wait()
3128        data = self.server.handler_instance.get_data()
3129        self.assertEqual(data, b'')
3130
3131    def test_invalid_offset(self):
3132        with self.assertRaises(OSError) as cm:
3133            os.sendfile(self.sockno, self.fileno, -1, 4096)
3134        self.assertEqual(cm.exception.errno, errno.EINVAL)
3135
3136    def test_keywords(self):
3137        # Keyword arguments should be supported
3138        os.sendfile(out=self.sockno, offset=0, count=4096,
3139            **{'in': self.fileno})
3140        if self.SUPPORT_HEADERS_TRAILERS:
3141            os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
3142                headers=(), trailers=(), flags=0)
3143
3144    # --- headers / trailers tests
3145
3146    @requires_headers_trailers
3147    def test_headers(self):
3148        total_sent = 0
3149        expected_data = b"x" * 512 + b"y" * 256 + self.DATA[:-1]
3150        sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
3151                            headers=[b"x" * 512, b"y" * 256])
3152        self.assertLessEqual(sent, 512 + 256 + 4096)
3153        total_sent += sent
3154        offset = 4096
3155        while total_sent < len(expected_data):
3156            nbytes = min(len(expected_data) - total_sent, 4096)
3157            sent = self.sendfile_wrapper(self.sockno, self.fileno,
3158                                                    offset, nbytes)
3159            if sent == 0:
3160                break
3161            self.assertLessEqual(sent, nbytes)
3162            total_sent += sent
3163            offset += sent
3164
3165        self.assertEqual(total_sent, len(expected_data))
3166        self.client.close()
3167        self.server.wait()
3168        data = self.server.handler_instance.get_data()
3169        self.assertEqual(hash(data), hash(expected_data))
3170
3171    @requires_headers_trailers
3172    def test_trailers(self):
3173        TESTFN2 = support.TESTFN + "2"
3174        file_data = b"abcdef"
3175
3176        self.addCleanup(support.unlink, TESTFN2)
3177        create_file(TESTFN2, file_data)
3178
3179        with open(TESTFN2, 'rb') as f:
3180            os.sendfile(self.sockno, f.fileno(), 0, 5,
3181                        trailers=[b"123456", b"789"])
3182            self.client.close()
3183            self.server.wait()
3184            data = self.server.handler_instance.get_data()
3185            self.assertEqual(data, b"abcde123456789")
3186
3187    @requires_headers_trailers
3188    @requires_32b
3189    def test_headers_overflow_32bits(self):
3190        self.server.handler_instance.accumulate = False
3191        with self.assertRaises(OSError) as cm:
3192            os.sendfile(self.sockno, self.fileno, 0, 0,
3193                        headers=[b"x" * 2**16] * 2**15)
3194        self.assertEqual(cm.exception.errno, errno.EINVAL)
3195
3196    @requires_headers_trailers
3197    @requires_32b
3198    def test_trailers_overflow_32bits(self):
3199        self.server.handler_instance.accumulate = False
3200        with self.assertRaises(OSError) as cm:
3201            os.sendfile(self.sockno, self.fileno, 0, 0,
3202                        trailers=[b"x" * 2**16] * 2**15)
3203        self.assertEqual(cm.exception.errno, errno.EINVAL)
3204
3205    @requires_headers_trailers
3206    @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
3207                         'test needs os.SF_NODISKIO')
3208    def test_flags(self):
3209        try:
3210            os.sendfile(self.sockno, self.fileno, 0, 4096,
3211                        flags=os.SF_NODISKIO)
3212        except OSError as err:
3213            if err.errno not in (errno.EBUSY, errno.EAGAIN):
3214                raise
3215
3216
3217def supports_extended_attributes():
3218    if not hasattr(os, "setxattr"):
3219        return False
3220
3221    try:
3222        with open(support.TESTFN, "xb", 0) as fp:
3223            try:
3224                os.setxattr(fp.fileno(), b"user.test", b"")
3225            except OSError:
3226                return False
3227    finally:
3228        support.unlink(support.TESTFN)
3229
3230    return True
3231
3232
3233@unittest.skipUnless(supports_extended_attributes(),
3234                     "no non-broken extended attribute support")
3235# Kernels < 2.6.39 don't respect setxattr flags.
3236@support.requires_linux_version(2, 6, 39)
3237class ExtendedAttributeTests(unittest.TestCase):
3238
3239    def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
3240        fn = support.TESTFN
3241        self.addCleanup(support.unlink, fn)
3242        create_file(fn)
3243
3244        with self.assertRaises(OSError) as cm:
3245            getxattr(fn, s("user.test"), **kwargs)
3246        self.assertEqual(cm.exception.errno, errno.ENODATA)
3247
3248        init_xattr = listxattr(fn)
3249        self.assertIsInstance(init_xattr, list)
3250
3251        setxattr(fn, s("user.test"), b"", **kwargs)
3252        xattr = set(init_xattr)
3253        xattr.add("user.test")
3254        self.assertEqual(set(listxattr(fn)), xattr)
3255        self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
3256        setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
3257        self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
3258
3259        with self.assertRaises(OSError) as cm:
3260            setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
3261        self.assertEqual(cm.exception.errno, errno.EEXIST)
3262
3263        with self.assertRaises(OSError) as cm:
3264            setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
3265        self.assertEqual(cm.exception.errno, errno.ENODATA)
3266
3267        setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
3268        xattr.add("user.test2")
3269        self.assertEqual(set(listxattr(fn)), xattr)
3270        removexattr(fn, s("user.test"), **kwargs)
3271
3272        with self.assertRaises(OSError) as cm:
3273            getxattr(fn, s("user.test"), **kwargs)
3274        self.assertEqual(cm.exception.errno, errno.ENODATA)
3275
3276        xattr.remove("user.test")
3277        self.assertEqual(set(listxattr(fn)), xattr)
3278        self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
3279        setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
3280        self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
3281        removexattr(fn, s("user.test"), **kwargs)
3282        many = sorted("user.test{}".format(i) for i in range(100))
3283        for thing in many:
3284            setxattr(fn, thing, b"x", **kwargs)
3285        self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
3286
3287    def _check_xattrs(self, *args, **kwargs):
3288        self._check_xattrs_str(str, *args, **kwargs)
3289        support.unlink(support.TESTFN)
3290
3291        self._check_xattrs_str(os.fsencode, *args, **kwargs)
3292        support.unlink(support.TESTFN)
3293
3294    def test_simple(self):
3295        self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3296                           os.listxattr)
3297
3298    def test_lpath(self):
3299        self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3300                           os.listxattr, follow_symlinks=False)
3301
3302    def test_fds(self):
3303        def getxattr(path, *args):
3304            with open(path, "rb") as fp:
3305                return os.getxattr(fp.fileno(), *args)
3306        def setxattr(path, *args):
3307            with open(path, "wb", 0) as fp:
3308                os.setxattr(fp.fileno(), *args)
3309        def removexattr(path, *args):
3310            with open(path, "wb", 0) as fp:
3311                os.removexattr(fp.fileno(), *args)
3312        def listxattr(path, *args):
3313            with open(path, "rb") as fp:
3314                return os.listxattr(fp.fileno(), *args)
3315        self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
3316
3317
3318@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
3319class TermsizeTests(unittest.TestCase):
3320    def test_does_not_crash(self):
3321        """Check if get_terminal_size() returns a meaningful value.
3322
3323        There's no easy portable way to actually check the size of the
3324        terminal, so let's check if it returns something sensible instead.
3325        """
3326        try:
3327            size = os.get_terminal_size()
3328        except OSError as e:
3329            if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
3330                # Under win32 a generic OSError can be thrown if the
3331                # handle cannot be retrieved
3332                self.skipTest("failed to query terminal size")
3333            raise
3334
3335        self.assertGreaterEqual(size.columns, 0)
3336        self.assertGreaterEqual(size.lines, 0)
3337
3338    def test_stty_match(self):
3339        """Check if stty returns the same results
3340
3341        stty actually tests stdin, so get_terminal_size is invoked on
3342        stdin explicitly. If stty succeeded, then get_terminal_size()
3343        should work too.
3344        """
3345        try:
3346            size = subprocess.check_output(['stty', 'size']).decode().split()
3347        except (FileNotFoundError, subprocess.CalledProcessError,
3348                PermissionError):
3349            self.skipTest("stty invocation failed")
3350        expected = (int(size[1]), int(size[0])) # reversed order
3351
3352        try:
3353            actual = os.get_terminal_size(sys.__stdin__.fileno())
3354        except OSError as e:
3355            if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
3356                # Under win32 a generic OSError can be thrown if the
3357                # handle cannot be retrieved
3358                self.skipTest("failed to query terminal size")
3359            raise
3360        self.assertEqual(expected, actual)
3361
3362
3363@unittest.skipUnless(hasattr(os, 'memfd_create'), 'requires os.memfd_create')
3364@support.requires_linux_version(3, 17)
3365class MemfdCreateTests(unittest.TestCase):
3366    def test_memfd_create(self):
3367        fd = os.memfd_create("Hi", os.MFD_CLOEXEC)
3368        self.assertNotEqual(fd, -1)
3369        self.addCleanup(os.close, fd)
3370        self.assertFalse(os.get_inheritable(fd))
3371        with open(fd, "wb", closefd=False) as f:
3372            f.write(b'memfd_create')
3373            self.assertEqual(f.tell(), 12)
3374
3375        fd2 = os.memfd_create("Hi")
3376        self.addCleanup(os.close, fd2)
3377        self.assertFalse(os.get_inheritable(fd2))
3378
3379
3380class OSErrorTests(unittest.TestCase):
3381    def setUp(self):
3382        class Str(str):
3383            pass
3384
3385        self.bytes_filenames = []
3386        self.unicode_filenames = []
3387        if support.TESTFN_UNENCODABLE is not None:
3388            decoded = support.TESTFN_UNENCODABLE
3389        else:
3390            decoded = support.TESTFN
3391        self.unicode_filenames.append(decoded)
3392        self.unicode_filenames.append(Str(decoded))
3393        if support.TESTFN_UNDECODABLE is not None:
3394            encoded = support.TESTFN_UNDECODABLE
3395        else:
3396            encoded = os.fsencode(support.TESTFN)
3397        self.bytes_filenames.append(encoded)
3398        self.bytes_filenames.append(bytearray(encoded))
3399        self.bytes_filenames.append(memoryview(encoded))
3400
3401        self.filenames = self.bytes_filenames + self.unicode_filenames
3402
3403    def test_oserror_filename(self):
3404        funcs = [
3405            (self.filenames, os.chdir,),
3406            (self.filenames, os.chmod, 0o777),
3407            (self.filenames, os.lstat,),
3408            (self.filenames, os.open, os.O_RDONLY),
3409            (self.filenames, os.rmdir,),
3410            (self.filenames, os.stat,),
3411            (self.filenames, os.unlink,),
3412        ]
3413        if sys.platform == "win32":
3414            funcs.extend((
3415                (self.bytes_filenames, os.rename, b"dst"),
3416                (self.bytes_filenames, os.replace, b"dst"),
3417                (self.unicode_filenames, os.rename, "dst"),
3418                (self.unicode_filenames, os.replace, "dst"),
3419                (self.unicode_filenames, os.listdir, ),
3420            ))
3421        else:
3422            funcs.extend((
3423                (self.filenames, os.listdir,),
3424                (self.filenames, os.rename, "dst"),
3425                (self.filenames, os.replace, "dst"),
3426            ))
3427        if hasattr(os, "chown"):
3428            funcs.append((self.filenames, os.chown, 0, 0))
3429        if hasattr(os, "lchown"):
3430            funcs.append((self.filenames, os.lchown, 0, 0))
3431        if hasattr(os, "truncate"):
3432            funcs.append((self.filenames, os.truncate, 0))
3433        if hasattr(os, "chflags"):
3434            funcs.append((self.filenames, os.chflags, 0))
3435        if hasattr(os, "lchflags"):
3436            funcs.append((self.filenames, os.lchflags, 0))
3437        if hasattr(os, "chroot"):
3438            funcs.append((self.filenames, os.chroot,))
3439        if hasattr(os, "link"):
3440            if sys.platform == "win32":
3441                funcs.append((self.bytes_filenames, os.link, b"dst"))
3442                funcs.append((self.unicode_filenames, os.link, "dst"))
3443            else:
3444                funcs.append((self.filenames, os.link, "dst"))
3445        if hasattr(os, "listxattr"):
3446            funcs.extend((
3447                (self.filenames, os.listxattr,),
3448                (self.filenames, os.getxattr, "user.test"),
3449                (self.filenames, os.setxattr, "user.test", b'user'),
3450                (self.filenames, os.removexattr, "user.test"),
3451            ))
3452        if hasattr(os, "lchmod"):
3453            funcs.append((self.filenames, os.lchmod, 0o777))
3454        if hasattr(os, "readlink"):
3455            funcs.append((self.filenames, os.readlink,))
3456
3457
3458        for filenames, func, *func_args in funcs:
3459            for name in filenames:
3460                try:
3461                    if isinstance(name, (str, bytes)):
3462                        func(name, *func_args)
3463                    else:
3464                        with self.assertWarnsRegex(DeprecationWarning, 'should be'):
3465                            func(name, *func_args)
3466                except OSError as err:
3467                    self.assertIs(err.filename, name, str(func))
3468                except UnicodeDecodeError:
3469                    pass
3470                else:
3471                    self.fail("No exception thrown by {}".format(func))
3472
3473class CPUCountTests(unittest.TestCase):
3474    def test_cpu_count(self):
3475        cpus = os.cpu_count()
3476        if cpus is not None:
3477            self.assertIsInstance(cpus, int)
3478            self.assertGreater(cpus, 0)
3479        else:
3480            self.skipTest("Could not determine the number of CPUs")
3481
3482
3483class FDInheritanceTests(unittest.TestCase):
3484    def test_get_set_inheritable(self):
3485        fd = os.open(__file__, os.O_RDONLY)
3486        self.addCleanup(os.close, fd)
3487        self.assertEqual(os.get_inheritable(fd), False)
3488
3489        os.set_inheritable(fd, True)
3490        self.assertEqual(os.get_inheritable(fd), True)
3491
3492    @unittest.skipIf(fcntl is None, "need fcntl")
3493    def test_get_inheritable_cloexec(self):
3494        fd = os.open(__file__, os.O_RDONLY)
3495        self.addCleanup(os.close, fd)
3496        self.assertEqual(os.get_inheritable(fd), False)
3497
3498        # clear FD_CLOEXEC flag
3499        flags = fcntl.fcntl(fd, fcntl.F_GETFD)
3500        flags &= ~fcntl.FD_CLOEXEC
3501        fcntl.fcntl(fd, fcntl.F_SETFD, flags)
3502
3503        self.assertEqual(os.get_inheritable(fd), True)
3504
3505    @unittest.skipIf(fcntl is None, "need fcntl")
3506    def test_set_inheritable_cloexec(self):
3507        fd = os.open(__file__, os.O_RDONLY)
3508        self.addCleanup(os.close, fd)
3509        self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3510                         fcntl.FD_CLOEXEC)
3511
3512        os.set_inheritable(fd, True)
3513        self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3514                         0)
3515
3516    @unittest.skipUnless(hasattr(os, 'O_PATH'), "need os.O_PATH")
3517    def test_get_set_inheritable_o_path(self):
3518        fd = os.open(__file__, os.O_PATH)
3519        self.addCleanup(os.close, fd)
3520        self.assertEqual(os.get_inheritable(fd), False)
3521
3522        os.set_inheritable(fd, True)
3523        self.assertEqual(os.get_inheritable(fd), True)
3524
3525        os.set_inheritable(fd, False)
3526        self.assertEqual(os.get_inheritable(fd), False)
3527
3528    def test_get_set_inheritable_badf(self):
3529        fd = support.make_bad_fd()
3530
3531        with self.assertRaises(OSError) as ctx:
3532            os.get_inheritable(fd)
3533        self.assertEqual(ctx.exception.errno, errno.EBADF)
3534
3535        with self.assertRaises(OSError) as ctx:
3536            os.set_inheritable(fd, True)
3537        self.assertEqual(ctx.exception.errno, errno.EBADF)
3538
3539        with self.assertRaises(OSError) as ctx:
3540            os.set_inheritable(fd, False)
3541        self.assertEqual(ctx.exception.errno, errno.EBADF)
3542
3543    def test_open(self):
3544        fd = os.open(__file__, os.O_RDONLY)
3545        self.addCleanup(os.close, fd)
3546        self.assertEqual(os.get_inheritable(fd), False)
3547
3548    @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
3549    def test_pipe(self):
3550        rfd, wfd = os.pipe()
3551        self.addCleanup(os.close, rfd)
3552        self.addCleanup(os.close, wfd)
3553        self.assertEqual(os.get_inheritable(rfd), False)
3554        self.assertEqual(os.get_inheritable(wfd), False)
3555
3556    def test_dup(self):
3557        fd1 = os.open(__file__, os.O_RDONLY)
3558        self.addCleanup(os.close, fd1)
3559
3560        fd2 = os.dup(fd1)
3561        self.addCleanup(os.close, fd2)
3562        self.assertEqual(os.get_inheritable(fd2), False)
3563
3564    def test_dup_standard_stream(self):
3565        fd = os.dup(1)
3566        self.addCleanup(os.close, fd)
3567        self.assertGreater(fd, 0)
3568
3569    @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
3570    def test_dup_nul(self):
3571        # os.dup() was creating inheritable fds for character files.
3572        fd1 = os.open('NUL', os.O_RDONLY)
3573        self.addCleanup(os.close, fd1)
3574        fd2 = os.dup(fd1)
3575        self.addCleanup(os.close, fd2)
3576        self.assertFalse(os.get_inheritable(fd2))
3577
3578    @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
3579    def test_dup2(self):
3580        fd = os.open(__file__, os.O_RDONLY)
3581        self.addCleanup(os.close, fd)
3582
3583        # inheritable by default
3584        fd2 = os.open(__file__, os.O_RDONLY)
3585        self.addCleanup(os.close, fd2)
3586        self.assertEqual(os.dup2(fd, fd2), fd2)
3587        self.assertTrue(os.get_inheritable(fd2))
3588
3589        # force non-inheritable
3590        fd3 = os.open(__file__, os.O_RDONLY)
3591        self.addCleanup(os.close, fd3)
3592        self.assertEqual(os.dup2(fd, fd3, inheritable=False), fd3)
3593        self.assertFalse(os.get_inheritable(fd3))
3594
3595    @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
3596    def test_openpty(self):
3597        master_fd, slave_fd = os.openpty()
3598        self.addCleanup(os.close, master_fd)
3599        self.addCleanup(os.close, slave_fd)
3600        self.assertEqual(os.get_inheritable(master_fd), False)
3601        self.assertEqual(os.get_inheritable(slave_fd), False)
3602
3603
3604class PathTConverterTests(unittest.TestCase):
3605    # tuples of (function name, allows fd arguments, additional arguments to
3606    # function, cleanup function)
3607    functions = [
3608        ('stat', True, (), None),
3609        ('lstat', False, (), None),
3610        ('access', False, (os.F_OK,), None),
3611        ('chflags', False, (0,), None),
3612        ('lchflags', False, (0,), None),
3613        ('open', False, (0,), getattr(os, 'close', None)),
3614    ]
3615
3616    def test_path_t_converter(self):
3617        str_filename = support.TESTFN
3618        if os.name == 'nt':
3619            bytes_fspath = bytes_filename = None
3620        else:
3621            bytes_filename = support.TESTFN.encode('ascii')
3622            bytes_fspath = FakePath(bytes_filename)
3623        fd = os.open(FakePath(str_filename), os.O_WRONLY|os.O_CREAT)
3624        self.addCleanup(support.unlink, support.TESTFN)
3625        self.addCleanup(os.close, fd)
3626
3627        int_fspath = FakePath(fd)
3628        str_fspath = FakePath(str_filename)
3629
3630        for name, allow_fd, extra_args, cleanup_fn in self.functions:
3631            with self.subTest(name=name):
3632                try:
3633                    fn = getattr(os, name)
3634                except AttributeError:
3635                    continue
3636
3637                for path in (str_filename, bytes_filename, str_fspath,
3638                             bytes_fspath):
3639                    if path is None:
3640                        continue
3641                    with self.subTest(name=name, path=path):
3642                        result = fn(path, *extra_args)
3643                        if cleanup_fn is not None:
3644                            cleanup_fn(result)
3645
3646                with self.assertRaisesRegex(
3647                        TypeError, 'to return str or bytes'):
3648                    fn(int_fspath, *extra_args)
3649
3650                if allow_fd:
3651                    result = fn(fd, *extra_args)  # should not fail
3652                    if cleanup_fn is not None:
3653                        cleanup_fn(result)
3654                else:
3655                    with self.assertRaisesRegex(
3656                            TypeError,
3657                            'os.PathLike'):
3658                        fn(fd, *extra_args)
3659
3660    def test_path_t_converter_and_custom_class(self):
3661        msg = r'__fspath__\(\) to return str or bytes, not %s'
3662        with self.assertRaisesRegex(TypeError, msg % r'int'):
3663            os.stat(FakePath(2))
3664        with self.assertRaisesRegex(TypeError, msg % r'float'):
3665            os.stat(FakePath(2.34))
3666        with self.assertRaisesRegex(TypeError, msg % r'object'):
3667            os.stat(FakePath(object()))
3668
3669
3670@unittest.skipUnless(hasattr(os, 'get_blocking'),
3671                     'needs os.get_blocking() and os.set_blocking()')
3672class BlockingTests(unittest.TestCase):
3673    def test_blocking(self):
3674        fd = os.open(__file__, os.O_RDONLY)
3675        self.addCleanup(os.close, fd)
3676        self.assertEqual(os.get_blocking(fd), True)
3677
3678        os.set_blocking(fd, False)
3679        self.assertEqual(os.get_blocking(fd), False)
3680
3681        os.set_blocking(fd, True)
3682        self.assertEqual(os.get_blocking(fd), True)
3683
3684
3685
3686class ExportsTests(unittest.TestCase):
3687    def test_os_all(self):
3688        self.assertIn('open', os.__all__)
3689        self.assertIn('walk', os.__all__)
3690
3691
3692class TestScandir(unittest.TestCase):
3693    check_no_resource_warning = support.check_no_resource_warning
3694
3695    def setUp(self):
3696        self.path = os.path.realpath(support.TESTFN)
3697        self.bytes_path = os.fsencode(self.path)
3698        self.addCleanup(support.rmtree, self.path)
3699        os.mkdir(self.path)
3700
3701    def create_file(self, name="file.txt"):
3702        path = self.bytes_path if isinstance(name, bytes) else self.path
3703        filename = os.path.join(path, name)
3704        create_file(filename, b'python')
3705        return filename
3706
3707    def get_entries(self, names):
3708        entries = dict((entry.name, entry)
3709                       for entry in os.scandir(self.path))
3710        self.assertEqual(sorted(entries.keys()), names)
3711        return entries
3712
3713    def assert_stat_equal(self, stat1, stat2, skip_fields):
3714        if skip_fields:
3715            for attr in dir(stat1):
3716                if not attr.startswith("st_"):
3717                    continue
3718                if attr in ("st_dev", "st_ino", "st_nlink"):
3719                    continue
3720                self.assertEqual(getattr(stat1, attr),
3721                                 getattr(stat2, attr),
3722                                 (stat1, stat2, attr))
3723        else:
3724            self.assertEqual(stat1, stat2)
3725
3726    def check_entry(self, entry, name, is_dir, is_file, is_symlink):
3727        self.assertIsInstance(entry, os.DirEntry)
3728        self.assertEqual(entry.name, name)
3729        self.assertEqual(entry.path, os.path.join(self.path, name))
3730        self.assertEqual(entry.inode(),
3731                         os.stat(entry.path, follow_symlinks=False).st_ino)
3732
3733        entry_stat = os.stat(entry.path)
3734        self.assertEqual(entry.is_dir(),
3735                         stat.S_ISDIR(entry_stat.st_mode))
3736        self.assertEqual(entry.is_file(),
3737                         stat.S_ISREG(entry_stat.st_mode))
3738        self.assertEqual(entry.is_symlink(),
3739                         os.path.islink(entry.path))
3740
3741        entry_lstat = os.stat(entry.path, follow_symlinks=False)
3742        self.assertEqual(entry.is_dir(follow_symlinks=False),
3743                         stat.S_ISDIR(entry_lstat.st_mode))
3744        self.assertEqual(entry.is_file(follow_symlinks=False),
3745                         stat.S_ISREG(entry_lstat.st_mode))
3746
3747        self.assert_stat_equal(entry.stat(),
3748                               entry_stat,
3749                               os.name == 'nt' and not is_symlink)
3750        self.assert_stat_equal(entry.stat(follow_symlinks=False),
3751                               entry_lstat,
3752                               os.name == 'nt')
3753
3754    def test_attributes(self):
3755        link = hasattr(os, 'link')
3756        symlink = support.can_symlink()
3757
3758        dirname = os.path.join(self.path, "dir")
3759        os.mkdir(dirname)
3760        filename = self.create_file("file.txt")
3761        if link:
3762            try:
3763                os.link(filename, os.path.join(self.path, "link_file.txt"))
3764            except PermissionError as e:
3765                self.skipTest('os.link(): %s' % e)
3766        if symlink:
3767            os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
3768                       target_is_directory=True)
3769            os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
3770
3771        names = ['dir', 'file.txt']
3772        if link:
3773            names.append('link_file.txt')
3774        if symlink:
3775            names.extend(('symlink_dir', 'symlink_file.txt'))
3776        entries = self.get_entries(names)
3777
3778        entry = entries['dir']
3779        self.check_entry(entry, 'dir', True, False, False)
3780
3781        entry = entries['file.txt']
3782        self.check_entry(entry, 'file.txt', False, True, False)
3783
3784        if link:
3785            entry = entries['link_file.txt']
3786            self.check_entry(entry, 'link_file.txt', False, True, False)
3787
3788        if symlink:
3789            entry = entries['symlink_dir']
3790            self.check_entry(entry, 'symlink_dir', True, False, True)
3791
3792            entry = entries['symlink_file.txt']
3793            self.check_entry(entry, 'symlink_file.txt', False, True, True)
3794
3795    def get_entry(self, name):
3796        path = self.bytes_path if isinstance(name, bytes) else self.path
3797        entries = list(os.scandir(path))
3798        self.assertEqual(len(entries), 1)
3799
3800        entry = entries[0]
3801        self.assertEqual(entry.name, name)
3802        return entry
3803
3804    def create_file_entry(self, name='file.txt'):
3805        filename = self.create_file(name=name)
3806        return self.get_entry(os.path.basename(filename))
3807
3808    def test_current_directory(self):
3809        filename = self.create_file()
3810        old_dir = os.getcwd()
3811        try:
3812            os.chdir(self.path)
3813
3814            # call scandir() without parameter: it must list the content
3815            # of the current directory
3816            entries = dict((entry.name, entry) for entry in os.scandir())
3817            self.assertEqual(sorted(entries.keys()),
3818                             [os.path.basename(filename)])
3819        finally:
3820            os.chdir(old_dir)
3821
3822    def test_repr(self):
3823        entry = self.create_file_entry()
3824        self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
3825
3826    def test_fspath_protocol(self):
3827        entry = self.create_file_entry()
3828        self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
3829
3830    def test_fspath_protocol_bytes(self):
3831        bytes_filename = os.fsencode('bytesfile.txt')
3832        bytes_entry = self.create_file_entry(name=bytes_filename)
3833        fspath = os.fspath(bytes_entry)
3834        self.assertIsInstance(fspath, bytes)
3835        self.assertEqual(fspath,
3836                         os.path.join(os.fsencode(self.path),bytes_filename))
3837
3838    def test_removed_dir(self):
3839        path = os.path.join(self.path, 'dir')
3840
3841        os.mkdir(path)
3842        entry = self.get_entry('dir')
3843        os.rmdir(path)
3844
3845        # On POSIX, is_dir() result depends if scandir() filled d_type or not
3846        if os.name == 'nt':
3847            self.assertTrue(entry.is_dir())
3848        self.assertFalse(entry.is_file())
3849        self.assertFalse(entry.is_symlink())
3850        if os.name == 'nt':
3851            self.assertRaises(FileNotFoundError, entry.inode)
3852            # don't fail
3853            entry.stat()
3854            entry.stat(follow_symlinks=False)
3855        else:
3856            self.assertGreater(entry.inode(), 0)
3857            self.assertRaises(FileNotFoundError, entry.stat)
3858            self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3859
3860    def test_removed_file(self):
3861        entry = self.create_file_entry()
3862        os.unlink(entry.path)
3863
3864        self.assertFalse(entry.is_dir())
3865        # On POSIX, is_dir() result depends if scandir() filled d_type or not
3866        if os.name == 'nt':
3867            self.assertTrue(entry.is_file())
3868        self.assertFalse(entry.is_symlink())
3869        if os.name == 'nt':
3870            self.assertRaises(FileNotFoundError, entry.inode)
3871            # don't fail
3872            entry.stat()
3873            entry.stat(follow_symlinks=False)
3874        else:
3875            self.assertGreater(entry.inode(), 0)
3876            self.assertRaises(FileNotFoundError, entry.stat)
3877            self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3878
3879    def test_broken_symlink(self):
3880        if not support.can_symlink():
3881            return self.skipTest('cannot create symbolic link')
3882
3883        filename = self.create_file("file.txt")
3884        os.symlink(filename,
3885                   os.path.join(self.path, "symlink.txt"))
3886        entries = self.get_entries(['file.txt', 'symlink.txt'])
3887        entry = entries['symlink.txt']
3888        os.unlink(filename)
3889
3890        self.assertGreater(entry.inode(), 0)
3891        self.assertFalse(entry.is_dir())
3892        self.assertFalse(entry.is_file())  # broken symlink returns False
3893        self.assertFalse(entry.is_dir(follow_symlinks=False))
3894        self.assertFalse(entry.is_file(follow_symlinks=False))
3895        self.assertTrue(entry.is_symlink())
3896        self.assertRaises(FileNotFoundError, entry.stat)
3897        # don't fail
3898        entry.stat(follow_symlinks=False)
3899
3900    def test_bytes(self):
3901        self.create_file("file.txt")
3902
3903        path_bytes = os.fsencode(self.path)
3904        entries = list(os.scandir(path_bytes))
3905        self.assertEqual(len(entries), 1, entries)
3906        entry = entries[0]
3907
3908        self.assertEqual(entry.name, b'file.txt')
3909        self.assertEqual(entry.path,
3910                         os.fsencode(os.path.join(self.path, 'file.txt')))
3911
3912    def test_bytes_like(self):
3913        self.create_file("file.txt")
3914
3915        for cls in bytearray, memoryview:
3916            path_bytes = cls(os.fsencode(self.path))
3917            with self.assertWarns(DeprecationWarning):
3918                entries = list(os.scandir(path_bytes))
3919            self.assertEqual(len(entries), 1, entries)
3920            entry = entries[0]
3921
3922            self.assertEqual(entry.name, b'file.txt')
3923            self.assertEqual(entry.path,
3924                             os.fsencode(os.path.join(self.path, 'file.txt')))
3925            self.assertIs(type(entry.name), bytes)
3926            self.assertIs(type(entry.path), bytes)
3927
3928    @unittest.skipUnless(os.listdir in os.supports_fd,
3929                         'fd support for listdir required for this test.')
3930    def test_fd(self):
3931        self.assertIn(os.scandir, os.supports_fd)
3932        self.create_file('file.txt')
3933        expected_names = ['file.txt']
3934        if support.can_symlink():
3935            os.symlink('file.txt', os.path.join(self.path, 'link'))
3936            expected_names.append('link')
3937
3938        fd = os.open(self.path, os.O_RDONLY)
3939        try:
3940            with os.scandir(fd) as it:
3941                entries = list(it)
3942            names = [entry.name for entry in entries]
3943            self.assertEqual(sorted(names), expected_names)
3944            self.assertEqual(names, os.listdir(fd))
3945            for entry in entries:
3946                self.assertEqual(entry.path, entry.name)
3947                self.assertEqual(os.fspath(entry), entry.name)
3948                self.assertEqual(entry.is_symlink(), entry.name == 'link')
3949                if os.stat in os.supports_dir_fd:
3950                    st = os.stat(entry.name, dir_fd=fd)
3951                    self.assertEqual(entry.stat(), st)
3952                    st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False)
3953                    self.assertEqual(entry.stat(follow_symlinks=False), st)
3954        finally:
3955            os.close(fd)
3956
3957    def test_empty_path(self):
3958        self.assertRaises(FileNotFoundError, os.scandir, '')
3959
3960    def test_consume_iterator_twice(self):
3961        self.create_file("file.txt")
3962        iterator = os.scandir(self.path)
3963
3964        entries = list(iterator)
3965        self.assertEqual(len(entries), 1, entries)
3966
3967        # check than consuming the iterator twice doesn't raise exception
3968        entries2 = list(iterator)
3969        self.assertEqual(len(entries2), 0, entries2)
3970
3971    def test_bad_path_type(self):
3972        for obj in [1.234, {}, []]:
3973            self.assertRaises(TypeError, os.scandir, obj)
3974
3975    def test_close(self):
3976        self.create_file("file.txt")
3977        self.create_file("file2.txt")
3978        iterator = os.scandir(self.path)
3979        next(iterator)
3980        iterator.close()
3981        # multiple closes
3982        iterator.close()
3983        with self.check_no_resource_warning():
3984            del iterator
3985
3986    def test_context_manager(self):
3987        self.create_file("file.txt")
3988        self.create_file("file2.txt")
3989        with os.scandir(self.path) as iterator:
3990            next(iterator)
3991        with self.check_no_resource_warning():
3992            del iterator
3993
3994    def test_context_manager_close(self):
3995        self.create_file("file.txt")
3996        self.create_file("file2.txt")
3997        with os.scandir(self.path) as iterator:
3998            next(iterator)
3999            iterator.close()
4000
4001    def test_context_manager_exception(self):
4002        self.create_file("file.txt")
4003        self.create_file("file2.txt")
4004        with self.assertRaises(ZeroDivisionError):
4005            with os.scandir(self.path) as iterator:
4006                next(iterator)
4007                1/0
4008        with self.check_no_resource_warning():
4009            del iterator
4010
4011    def test_resource_warning(self):
4012        self.create_file("file.txt")
4013        self.create_file("file2.txt")
4014        iterator = os.scandir(self.path)
4015        next(iterator)
4016        with self.assertWarns(ResourceWarning):
4017            del iterator
4018            support.gc_collect()
4019        # exhausted iterator
4020        iterator = os.scandir(self.path)
4021        list(iterator)
4022        with self.check_no_resource_warning():
4023            del iterator
4024
4025
4026class TestPEP519(unittest.TestCase):
4027
4028    # Abstracted so it can be overridden to test pure Python implementation
4029    # if a C version is provided.
4030    fspath = staticmethod(os.fspath)
4031
4032    def test_return_bytes(self):
4033        for b in b'hello', b'goodbye', b'some/path/and/file':
4034            self.assertEqual(b, self.fspath(b))
4035
4036    def test_return_string(self):
4037        for s in 'hello', 'goodbye', 'some/path/and/file':
4038            self.assertEqual(s, self.fspath(s))
4039
4040    def test_fsencode_fsdecode(self):
4041        for p in "path/like/object", b"path/like/object":
4042            pathlike = FakePath(p)
4043
4044            self.assertEqual(p, self.fspath(pathlike))
4045            self.assertEqual(b"path/like/object", os.fsencode(pathlike))
4046            self.assertEqual("path/like/object", os.fsdecode(pathlike))
4047
4048    def test_pathlike(self):
4049        self.assertEqual('#feelthegil', self.fspath(FakePath('#feelthegil')))
4050        self.assertTrue(issubclass(FakePath, os.PathLike))
4051        self.assertTrue(isinstance(FakePath('x'), os.PathLike))
4052
4053    def test_garbage_in_exception_out(self):
4054        vapor = type('blah', (), {})
4055        for o in int, type, os, vapor():
4056            self.assertRaises(TypeError, self.fspath, o)
4057
4058    def test_argument_required(self):
4059        self.assertRaises(TypeError, self.fspath)
4060
4061    def test_bad_pathlike(self):
4062        # __fspath__ returns a value other than str or bytes.
4063        self.assertRaises(TypeError, self.fspath, FakePath(42))
4064        # __fspath__ attribute that is not callable.
4065        c = type('foo', (), {})
4066        c.__fspath__ = 1
4067        self.assertRaises(TypeError, self.fspath, c())
4068        # __fspath__ raises an exception.
4069        self.assertRaises(ZeroDivisionError, self.fspath,
4070                          FakePath(ZeroDivisionError()))
4071
4072    def test_pathlike_subclasshook(self):
4073        # bpo-38878: subclasshook causes subclass checks
4074        # true on abstract implementation.
4075        class A(os.PathLike):
4076            pass
4077        self.assertFalse(issubclass(FakePath, A))
4078        self.assertTrue(issubclass(FakePath, os.PathLike))
4079
4080
4081class TimesTests(unittest.TestCase):
4082    def test_times(self):
4083        times = os.times()
4084        self.assertIsInstance(times, os.times_result)
4085
4086        for field in ('user', 'system', 'children_user', 'children_system',
4087                      'elapsed'):
4088            value = getattr(times, field)
4089            self.assertIsInstance(value, float)
4090
4091        if os.name == 'nt':
4092            self.assertEqual(times.children_user, 0)
4093            self.assertEqual(times.children_system, 0)
4094            self.assertEqual(times.elapsed, 0)
4095
4096
4097# Only test if the C version is provided, otherwise TestPEP519 already tested
4098# the pure Python implementation.
4099if hasattr(os, "_fspath"):
4100    class TestPEP519PurePython(TestPEP519):
4101
4102        """Explicitly test the pure Python implementation of os.fspath()."""
4103
4104        fspath = staticmethod(os._fspath)
4105
4106
4107if __name__ == "__main__":
4108    unittest.main()
4109