1# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
3# portable than they had been thought to be.
4
5import asynchat
6import asyncore
7import codecs
8import contextlib
9import decimal
10import errno
11import fractions
12import getpass
13import itertools
14import locale
15import mmap
16import os
17import pickle
18import shutil
19import signal
20import socket
21import stat
22import subprocess
23import sys
24import sysconfig
25import threading
26import time
27import unittest
28import uuid
29import warnings
30from test import support
31
32try:
33    import resource
34except ImportError:
35    resource = None
36try:
37    import fcntl
38except ImportError:
39    fcntl = None
40try:
41    import _winapi
42except ImportError:
43    _winapi = None
44try:
45    import pwd
46    all_users = [u.pw_uid for u in pwd.getpwall()]
47except (ImportError, AttributeError):
48    all_users = []
49try:
50    from _testcapi import INT_MAX, PY_SSIZE_T_MAX
51except ImportError:
52    INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
53
54from test.support.script_helper import assert_python_ok
55from test.support import unix_shell, FakePath
56
57
58root_in_posix = False
59if hasattr(os, 'geteuid'):
60    root_in_posix = (os.geteuid() == 0)
61
62# Detect whether we're on a Linux system that uses the (now outdated
63# and unmaintained) linuxthreads threading library.  There's an issue
64# when combining linuxthreads with a failed execv call: see
65# http://bugs.python.org/issue4970.
66if hasattr(sys, 'thread_info') and sys.thread_info.version:
67    USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
68else:
69    USING_LINUXTHREADS = False
70
71# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
72HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
73
74
75def requires_os_func(name):
76    return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
77
78
79def create_file(filename, content=b'content'):
80    with open(filename, "xb", 0) as fp:
81        fp.write(content)
82
83
84# Tests creating TESTFN
85class FileTests(unittest.TestCase):
86    def setUp(self):
87        if os.path.lexists(support.TESTFN):
88            os.unlink(support.TESTFN)
89    tearDown = setUp
90
91    def test_access(self):
92        f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
93        os.close(f)
94        self.assertTrue(os.access(support.TESTFN, os.W_OK))
95
96    def test_closerange(self):
97        first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
98        # We must allocate two consecutive file descriptors, otherwise
99        # it will mess up other file descriptors (perhaps even the three
100        # standard ones).
101        second = os.dup(first)
102        try:
103            retries = 0
104            while second != first + 1:
105                os.close(first)
106                retries += 1
107                if retries > 10:
108                    # XXX test skipped
109                    self.skipTest("couldn't allocate two consecutive fds")
110                first, second = second, os.dup(second)
111        finally:
112            os.close(second)
113        # close a fd that is open, and one that isn't
114        os.closerange(first, first + 2)
115        self.assertRaises(OSError, os.write, first, b"a")
116
117    @support.cpython_only
118    def test_rename(self):
119        path = support.TESTFN
120        old = sys.getrefcount(path)
121        self.assertRaises(TypeError, os.rename, path, 0)
122        new = sys.getrefcount(path)
123        self.assertEqual(old, new)
124
125    def test_read(self):
126        with open(support.TESTFN, "w+b") as fobj:
127            fobj.write(b"spam")
128            fobj.flush()
129            fd = fobj.fileno()
130            os.lseek(fd, 0, 0)
131            s = os.read(fd, 4)
132            self.assertEqual(type(s), bytes)
133            self.assertEqual(s, b"spam")
134
135    @support.cpython_only
136    # Skip the test on 32-bit platforms: the number of bytes must fit in a
137    # Py_ssize_t type
138    @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
139                         "needs INT_MAX < PY_SSIZE_T_MAX")
140    @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
141    def test_large_read(self, size):
142        self.addCleanup(support.unlink, support.TESTFN)
143        create_file(support.TESTFN, b'test')
144
145        # Issue #21932: Make sure that os.read() does not raise an
146        # OverflowError for size larger than INT_MAX
147        with open(support.TESTFN, "rb") as fp:
148            data = os.read(fp.fileno(), size)
149
150        # The test does not try to read more than 2 GiB at once because the
151        # operating system is free to return less bytes than requested.
152        self.assertEqual(data, b'test')
153
154    def test_write(self):
155        # os.write() accepts bytes- and buffer-like objects but not strings
156        fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
157        self.assertRaises(TypeError, os.write, fd, "beans")
158        os.write(fd, b"bacon\n")
159        os.write(fd, bytearray(b"eggs\n"))
160        os.write(fd, memoryview(b"spam\n"))
161        os.close(fd)
162        with open(support.TESTFN, "rb") as fobj:
163            self.assertEqual(fobj.read().splitlines(),
164                [b"bacon", b"eggs", b"spam"])
165
166    def write_windows_console(self, *args):
167        retcode = subprocess.call(args,
168            # use a new console to not flood the test output
169            creationflags=subprocess.CREATE_NEW_CONSOLE,
170            # use a shell to hide the console window (SW_HIDE)
171            shell=True)
172        self.assertEqual(retcode, 0)
173
174    @unittest.skipUnless(sys.platform == 'win32',
175                         'test specific to the Windows console')
176    def test_write_windows_console(self):
177        # Issue #11395: the Windows console returns an error (12: not enough
178        # space error) on writing into stdout if stdout mode is binary and the
179        # length is greater than 66,000 bytes (or less, depending on heap
180        # usage).
181        code = "print('x' * 100000)"
182        self.write_windows_console(sys.executable, "-c", code)
183        self.write_windows_console(sys.executable, "-u", "-c", code)
184
185    def fdopen_helper(self, *args):
186        fd = os.open(support.TESTFN, os.O_RDONLY)
187        f = os.fdopen(fd, *args)
188        f.close()
189
190    def test_fdopen(self):
191        fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
192        os.close(fd)
193
194        self.fdopen_helper()
195        self.fdopen_helper('r')
196        self.fdopen_helper('r', 100)
197
198    def test_replace(self):
199        TESTFN2 = support.TESTFN + ".2"
200        self.addCleanup(support.unlink, support.TESTFN)
201        self.addCleanup(support.unlink, TESTFN2)
202
203        create_file(support.TESTFN, b"1")
204        create_file(TESTFN2, b"2")
205
206        os.replace(support.TESTFN, TESTFN2)
207        self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
208        with open(TESTFN2, 'r') as f:
209            self.assertEqual(f.read(), "1")
210
211    def test_open_keywords(self):
212        f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
213            dir_fd=None)
214        os.close(f)
215
216    def test_symlink_keywords(self):
217        symlink = support.get_attribute(os, "symlink")
218        try:
219            symlink(src='target', dst=support.TESTFN,
220                target_is_directory=False, dir_fd=None)
221        except (NotImplementedError, OSError):
222            pass  # No OS support or unprivileged user
223
224
225# Test attributes on return values from os.*stat* family.
226class StatAttributeTests(unittest.TestCase):
227    def setUp(self):
228        self.fname = support.TESTFN
229        self.addCleanup(support.unlink, self.fname)
230        create_file(self.fname, b"ABC")
231
232    @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
233    def check_stat_attributes(self, fname):
234        result = os.stat(fname)
235
236        # Make sure direct access works
237        self.assertEqual(result[stat.ST_SIZE], 3)
238        self.assertEqual(result.st_size, 3)
239
240        # Make sure all the attributes are there
241        members = dir(result)
242        for name in dir(stat):
243            if name[:3] == 'ST_':
244                attr = name.lower()
245                if name.endswith("TIME"):
246                    def trunc(x): return int(x)
247                else:
248                    def trunc(x): return x
249                self.assertEqual(trunc(getattr(result, attr)),
250                                  result[getattr(stat, name)])
251                self.assertIn(attr, members)
252
253        # Make sure that the st_?time and st_?time_ns fields roughly agree
254        # (they should always agree up to around tens-of-microseconds)
255        for name in 'st_atime st_mtime st_ctime'.split():
256            floaty = int(getattr(result, name) * 100000)
257            nanosecondy = getattr(result, name + "_ns") // 10000
258            self.assertAlmostEqual(floaty, nanosecondy, delta=2)
259
260        try:
261            result[200]
262            self.fail("No exception raised")
263        except IndexError:
264            pass
265
266        # Make sure that assignment fails
267        try:
268            result.st_mode = 1
269            self.fail("No exception raised")
270        except AttributeError:
271            pass
272
273        try:
274            result.st_rdev = 1
275            self.fail("No exception raised")
276        except (AttributeError, TypeError):
277            pass
278
279        try:
280            result.parrot = 1
281            self.fail("No exception raised")
282        except AttributeError:
283            pass
284
285        # Use the stat_result constructor with a too-short tuple.
286        try:
287            result2 = os.stat_result((10,))
288            self.fail("No exception raised")
289        except TypeError:
290            pass
291
292        # Use the constructor with a too-long tuple.
293        try:
294            result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
295        except TypeError:
296            pass
297
298    def test_stat_attributes(self):
299        self.check_stat_attributes(self.fname)
300
301    def test_stat_attributes_bytes(self):
302        try:
303            fname = self.fname.encode(sys.getfilesystemencoding())
304        except UnicodeEncodeError:
305            self.skipTest("cannot encode %a for the filesystem" % self.fname)
306        self.check_stat_attributes(fname)
307
308    def test_stat_result_pickle(self):
309        result = os.stat(self.fname)
310        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
311            p = pickle.dumps(result, proto)
312            self.assertIn(b'stat_result', p)
313            if proto < 4:
314                self.assertIn(b'cos\nstat_result\n', p)
315            unpickled = pickle.loads(p)
316            self.assertEqual(result, unpickled)
317
318    @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
319    def test_statvfs_attributes(self):
320        result = os.statvfs(self.fname)
321
322        # Make sure direct access works
323        self.assertEqual(result.f_bfree, result[3])
324
325        # Make sure all the attributes are there.
326        members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
327                    'ffree', 'favail', 'flag', 'namemax')
328        for value, member in enumerate(members):
329            self.assertEqual(getattr(result, 'f_' + member), result[value])
330
331        self.assertTrue(isinstance(result.f_fsid, int))
332
333        # Test that the size of the tuple doesn't change
334        self.assertEqual(len(result), 10)
335
336        # Make sure that assignment really fails
337        try:
338            result.f_bfree = 1
339            self.fail("No exception raised")
340        except AttributeError:
341            pass
342
343        try:
344            result.parrot = 1
345            self.fail("No exception raised")
346        except AttributeError:
347            pass
348
349        # Use the constructor with a too-short tuple.
350        try:
351            result2 = os.statvfs_result((10,))
352            self.fail("No exception raised")
353        except TypeError:
354            pass
355
356        # Use the constructor with a too-long tuple.
357        try:
358            result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
359        except TypeError:
360            pass
361
362    @unittest.skipUnless(hasattr(os, 'statvfs'),
363                         "need os.statvfs()")
364    def test_statvfs_result_pickle(self):
365        result = os.statvfs(self.fname)
366
367        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
368            p = pickle.dumps(result, proto)
369            self.assertIn(b'statvfs_result', p)
370            if proto < 4:
371                self.assertIn(b'cos\nstatvfs_result\n', p)
372            unpickled = pickle.loads(p)
373            self.assertEqual(result, unpickled)
374
375    @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
376    def test_1686475(self):
377        # Verify that an open file can be stat'ed
378        try:
379            os.stat(r"c:\pagefile.sys")
380        except FileNotFoundError:
381            self.skipTest(r'c:\pagefile.sys does not exist')
382        except OSError as e:
383            self.fail("Could not stat pagefile.sys")
384
385    @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
386    @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
387    def test_15261(self):
388        # Verify that stat'ing a closed fd does not cause crash
389        r, w = os.pipe()
390        try:
391            os.stat(r)          # should not raise error
392        finally:
393            os.close(r)
394            os.close(w)
395        with self.assertRaises(OSError) as ctx:
396            os.stat(r)
397        self.assertEqual(ctx.exception.errno, errno.EBADF)
398
399    def check_file_attributes(self, result):
400        self.assertTrue(hasattr(result, 'st_file_attributes'))
401        self.assertTrue(isinstance(result.st_file_attributes, int))
402        self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
403
404    @unittest.skipUnless(sys.platform == "win32",
405                         "st_file_attributes is Win32 specific")
406    def test_file_attributes(self):
407        # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
408        result = os.stat(self.fname)
409        self.check_file_attributes(result)
410        self.assertEqual(
411            result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
412            0)
413
414        # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
415        dirname = support.TESTFN + "dir"
416        os.mkdir(dirname)
417        self.addCleanup(os.rmdir, dirname)
418
419        result = os.stat(dirname)
420        self.check_file_attributes(result)
421        self.assertEqual(
422            result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
423            stat.FILE_ATTRIBUTE_DIRECTORY)
424
425    @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
426    def test_access_denied(self):
427        # Default to FindFirstFile WIN32_FIND_DATA when access is
428        # denied. See issue 28075.
429        # os.environ['TEMP'] should be located on a volume that
430        # supports file ACLs.
431        fname = os.path.join(os.environ['TEMP'], self.fname)
432        self.addCleanup(support.unlink, fname)
433        create_file(fname, b'ABC')
434        # Deny the right to [S]YNCHRONIZE on the file to
435        # force CreateFile to fail with ERROR_ACCESS_DENIED.
436        DETACHED_PROCESS = 8
437        subprocess.check_call(
438            # bpo-30584: Use security identifier *S-1-5-32-545 instead
439            # of localized "Users" to not depend on the locale.
440            ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'],
441            creationflags=DETACHED_PROCESS
442        )
443        result = os.stat(fname)
444        self.assertNotEqual(result.st_size, 0)
445
446
447class UtimeTests(unittest.TestCase):
448    def setUp(self):
449        self.dirname = support.TESTFN
450        self.fname = os.path.join(self.dirname, "f1")
451
452        self.addCleanup(support.rmtree, self.dirname)
453        os.mkdir(self.dirname)
454        create_file(self.fname)
455
456    def support_subsecond(self, filename):
457        # Heuristic to check if the filesystem supports timestamp with
458        # subsecond resolution: check if float and int timestamps are different
459        st = os.stat(filename)
460        return ((st.st_atime != st[7])
461                or (st.st_mtime != st[8])
462                or (st.st_ctime != st[9]))
463
464    def _test_utime(self, set_time, filename=None):
465        if not filename:
466            filename = self.fname
467
468        support_subsecond = self.support_subsecond(filename)
469        if support_subsecond:
470            # Timestamp with a resolution of 1 microsecond (10^-6).
471            #
472            # The resolution of the C internal function used by os.utime()
473            # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
474            # test with a resolution of 1 ns requires more work:
475            # see the issue #15745.
476            atime_ns = 1002003000   # 1.002003 seconds
477            mtime_ns = 4005006000   # 4.005006 seconds
478        else:
479            # use a resolution of 1 second
480            atime_ns = 5 * 10**9
481            mtime_ns = 8 * 10**9
482
483        set_time(filename, (atime_ns, mtime_ns))
484        st = os.stat(filename)
485
486        if support_subsecond:
487            self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
488            self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
489        else:
490            self.assertEqual(st.st_atime, atime_ns * 1e-9)
491            self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
492        self.assertEqual(st.st_atime_ns, atime_ns)
493        self.assertEqual(st.st_mtime_ns, mtime_ns)
494
495    def test_utime(self):
496        def set_time(filename, ns):
497            # test the ns keyword parameter
498            os.utime(filename, ns=ns)
499        self._test_utime(set_time)
500
501    @staticmethod
502    def ns_to_sec(ns):
503        # Convert a number of nanosecond (int) to a number of seconds (float).
504        # Round towards infinity by adding 0.5 nanosecond to avoid rounding
505        # issue, os.utime() rounds towards minus infinity.
506        return (ns * 1e-9) + 0.5e-9
507
508    def test_utime_by_indexed(self):
509        # pass times as floating point seconds as the second indexed parameter
510        def set_time(filename, ns):
511            atime_ns, mtime_ns = ns
512            atime = self.ns_to_sec(atime_ns)
513            mtime = self.ns_to_sec(mtime_ns)
514            # test utimensat(timespec), utimes(timeval), utime(utimbuf)
515            # or utime(time_t)
516            os.utime(filename, (atime, mtime))
517        self._test_utime(set_time)
518
519    def test_utime_by_times(self):
520        def set_time(filename, ns):
521            atime_ns, mtime_ns = ns
522            atime = self.ns_to_sec(atime_ns)
523            mtime = self.ns_to_sec(mtime_ns)
524            # test the times keyword parameter
525            os.utime(filename, times=(atime, mtime))
526        self._test_utime(set_time)
527
528    @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
529                         "follow_symlinks support for utime required "
530                         "for this test.")
531    def test_utime_nofollow_symlinks(self):
532        def set_time(filename, ns):
533            # use follow_symlinks=False to test utimensat(timespec)
534            # or lutimes(timeval)
535            os.utime(filename, ns=ns, follow_symlinks=False)
536        self._test_utime(set_time)
537
538    @unittest.skipUnless(os.utime in os.supports_fd,
539                         "fd support for utime required for this test.")
540    def test_utime_fd(self):
541        def set_time(filename, ns):
542            with open(filename, 'wb', 0) as fp:
543                # use a file descriptor to test futimens(timespec)
544                # or futimes(timeval)
545                os.utime(fp.fileno(), ns=ns)
546        self._test_utime(set_time)
547
548    @unittest.skipUnless(os.utime in os.supports_dir_fd,
549                         "dir_fd support for utime required for this test.")
550    def test_utime_dir_fd(self):
551        def set_time(filename, ns):
552            dirname, name = os.path.split(filename)
553            dirfd = os.open(dirname, os.O_RDONLY)
554            try:
555                # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
556                os.utime(name, dir_fd=dirfd, ns=ns)
557            finally:
558                os.close(dirfd)
559        self._test_utime(set_time)
560
561    def test_utime_directory(self):
562        def set_time(filename, ns):
563            # test calling os.utime() on a directory
564            os.utime(filename, ns=ns)
565        self._test_utime(set_time, filename=self.dirname)
566
567    def _test_utime_current(self, set_time):
568        # Get the system clock
569        current = time.time()
570
571        # Call os.utime() to set the timestamp to the current system clock
572        set_time(self.fname)
573
574        if not self.support_subsecond(self.fname):
575            delta = 1.0
576        else:
577            # On Windows, the usual resolution of time.time() is 15.6 ms.
578            # bpo-30649: Tolerate 50 ms for slow Windows buildbots.
579            #
580            # x86 Gentoo Refleaks 3.x once failed with dt=20.2 ms. So use
581            # also 50 ms on other platforms.
582            delta = 0.050
583        st = os.stat(self.fname)
584        msg = ("st_time=%r, current=%r, dt=%r"
585               % (st.st_mtime, current, st.st_mtime - current))
586        self.assertAlmostEqual(st.st_mtime, current,
587                               delta=delta, msg=msg)
588
589    def test_utime_current(self):
590        def set_time(filename):
591            # Set to the current time in the new way
592            os.utime(self.fname)
593        self._test_utime_current(set_time)
594
595    def test_utime_current_old(self):
596        def set_time(filename):
597            # Set to the current time in the old explicit way.
598            os.utime(self.fname, None)
599        self._test_utime_current(set_time)
600
601    def get_file_system(self, path):
602        if sys.platform == 'win32':
603            root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
604            import ctypes
605            kernel32 = ctypes.windll.kernel32
606            buf = ctypes.create_unicode_buffer("", 100)
607            ok = kernel32.GetVolumeInformationW(root, None, 0,
608                                                None, None, None,
609                                                buf, len(buf))
610            if ok:
611                return buf.value
612        # return None if the filesystem is unknown
613
614    def test_large_time(self):
615        # Many filesystems are limited to the year 2038. At least, the test
616        # pass with NTFS filesystem.
617        if self.get_file_system(self.dirname) != "NTFS":
618            self.skipTest("requires NTFS")
619
620        large = 5000000000   # some day in 2128
621        os.utime(self.fname, (large, large))
622        self.assertEqual(os.stat(self.fname).st_mtime, large)
623
624    def test_utime_invalid_arguments(self):
625        # seconds and nanoseconds parameters are mutually exclusive
626        with self.assertRaises(ValueError):
627            os.utime(self.fname, (5, 5), ns=(5, 5))
628        with self.assertRaises(TypeError):
629            os.utime(self.fname, [5, 5])
630        with self.assertRaises(TypeError):
631            os.utime(self.fname, (5,))
632        with self.assertRaises(TypeError):
633            os.utime(self.fname, (5, 5, 5))
634        with self.assertRaises(TypeError):
635            os.utime(self.fname, ns=[5, 5])
636        with self.assertRaises(TypeError):
637            os.utime(self.fname, ns=(5,))
638        with self.assertRaises(TypeError):
639            os.utime(self.fname, ns=(5, 5, 5))
640
641        if os.utime not in os.supports_follow_symlinks:
642            with self.assertRaises(NotImplementedError):
643                os.utime(self.fname, (5, 5), follow_symlinks=False)
644        if os.utime not in os.supports_fd:
645            with open(self.fname, 'wb', 0) as fp:
646                with self.assertRaises(TypeError):
647                    os.utime(fp.fileno(), (5, 5))
648        if os.utime not in os.supports_dir_fd:
649            with self.assertRaises(NotImplementedError):
650                os.utime(self.fname, (5, 5), dir_fd=0)
651
652    @support.cpython_only
653    def test_issue31577(self):
654        # The interpreter shouldn't crash in case utime() received a bad
655        # ns argument.
656        def get_bad_int(divmod_ret_val):
657            class BadInt:
658                def __divmod__(*args):
659                    return divmod_ret_val
660            return BadInt()
661        with self.assertRaises(TypeError):
662            os.utime(self.fname, ns=(get_bad_int(42), 1))
663        with self.assertRaises(TypeError):
664            os.utime(self.fname, ns=(get_bad_int(()), 1))
665        with self.assertRaises(TypeError):
666            os.utime(self.fname, ns=(get_bad_int((1, 2, 3)), 1))
667
668
669from test import mapping_tests
670
671class EnvironTests(mapping_tests.BasicTestMappingProtocol):
672    """check that os.environ object conform to mapping protocol"""
673    type2test = None
674
675    def setUp(self):
676        self.__save = dict(os.environ)
677        if os.supports_bytes_environ:
678            self.__saveb = dict(os.environb)
679        for key, value in self._reference().items():
680            os.environ[key] = value
681
682    def tearDown(self):
683        os.environ.clear()
684        os.environ.update(self.__save)
685        if os.supports_bytes_environ:
686            os.environb.clear()
687            os.environb.update(self.__saveb)
688
689    def _reference(self):
690        return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
691
692    def _empty_mapping(self):
693        os.environ.clear()
694        return os.environ
695
696    # Bug 1110478
697    @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
698                         'requires a shell')
699    def test_update2(self):
700        os.environ.clear()
701        os.environ.update(HELLO="World")
702        with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
703            value = popen.read().strip()
704            self.assertEqual(value, "World")
705
706    @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
707                         'requires a shell')
708    def test_os_popen_iter(self):
709        with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
710                      % unix_shell) as popen:
711            it = iter(popen)
712            self.assertEqual(next(it), "line1\n")
713            self.assertEqual(next(it), "line2\n")
714            self.assertEqual(next(it), "line3\n")
715            self.assertRaises(StopIteration, next, it)
716
717    # Verify environ keys and values from the OS are of the
718    # correct str type.
719    def test_keyvalue_types(self):
720        for key, val in os.environ.items():
721            self.assertEqual(type(key), str)
722            self.assertEqual(type(val), str)
723
724    def test_items(self):
725        for key, value in self._reference().items():
726            self.assertEqual(os.environ.get(key), value)
727
728    # Issue 7310
729    def test___repr__(self):
730        """Check that the repr() of os.environ looks like environ({...})."""
731        env = os.environ
732        self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
733            '{!r}: {!r}'.format(key, value)
734            for key, value in env.items())))
735
736    def test_get_exec_path(self):
737        defpath_list = os.defpath.split(os.pathsep)
738        test_path = ['/monty', '/python', '', '/flying/circus']
739        test_env = {'PATH': os.pathsep.join(test_path)}
740
741        saved_environ = os.environ
742        try:
743            os.environ = dict(test_env)
744            # Test that defaulting to os.environ works.
745            self.assertSequenceEqual(test_path, os.get_exec_path())
746            self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
747        finally:
748            os.environ = saved_environ
749
750        # No PATH environment variable
751        self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
752        # Empty PATH environment variable
753        self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
754        # Supplied PATH environment variable
755        self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
756
757        if os.supports_bytes_environ:
758            # env cannot contain 'PATH' and b'PATH' keys
759            try:
760                # ignore BytesWarning warning
761                with warnings.catch_warnings(record=True):
762                    mixed_env = {'PATH': '1', b'PATH': b'2'}
763            except BytesWarning:
764                # mixed_env cannot be created with python -bb
765                pass
766            else:
767                self.assertRaises(ValueError, os.get_exec_path, mixed_env)
768
769            # bytes key and/or value
770            self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
771                ['abc'])
772            self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
773                ['abc'])
774            self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
775                ['abc'])
776
777    @unittest.skipUnless(os.supports_bytes_environ,
778                         "os.environb required for this test.")
779    def test_environb(self):
780        # os.environ -> os.environb
781        value = 'euro\u20ac'
782        try:
783            value_bytes = value.encode(sys.getfilesystemencoding(),
784                                       'surrogateescape')
785        except UnicodeEncodeError:
786            msg = "U+20AC character is not encodable to %s" % (
787                sys.getfilesystemencoding(),)
788            self.skipTest(msg)
789        os.environ['unicode'] = value
790        self.assertEqual(os.environ['unicode'], value)
791        self.assertEqual(os.environb[b'unicode'], value_bytes)
792
793        # os.environb -> os.environ
794        value = b'\xff'
795        os.environb[b'bytes'] = value
796        self.assertEqual(os.environb[b'bytes'], value)
797        value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
798        self.assertEqual(os.environ['bytes'], value_str)
799
800    # On OS X < 10.6, unsetenv() doesn't return a value (bpo-13415).
801    @support.requires_mac_ver(10, 6)
802    def test_unset_error(self):
803        if sys.platform == "win32":
804            # an environment variable is limited to 32,767 characters
805            key = 'x' * 50000
806            self.assertRaises(ValueError, os.environ.__delitem__, key)
807        else:
808            # "=" is not allowed in a variable name
809            key = 'key='
810            self.assertRaises(OSError, os.environ.__delitem__, key)
811
812    def test_key_type(self):
813        missing = 'missingkey'
814        self.assertNotIn(missing, os.environ)
815
816        with self.assertRaises(KeyError) as cm:
817            os.environ[missing]
818        self.assertIs(cm.exception.args[0], missing)
819        self.assertTrue(cm.exception.__suppress_context__)
820
821        with self.assertRaises(KeyError) as cm:
822            del os.environ[missing]
823        self.assertIs(cm.exception.args[0], missing)
824        self.assertTrue(cm.exception.__suppress_context__)
825
826    def _test_environ_iteration(self, collection):
827        iterator = iter(collection)
828        new_key = "__new_key__"
829
830        next(iterator)  # start iteration over os.environ.items
831
832        # add a new key in os.environ mapping
833        os.environ[new_key] = "test_environ_iteration"
834
835        try:
836            next(iterator)  # force iteration over modified mapping
837            self.assertEqual(os.environ[new_key], "test_environ_iteration")
838        finally:
839            del os.environ[new_key]
840
841    def test_iter_error_when_changing_os_environ(self):
842        self._test_environ_iteration(os.environ)
843
844    def test_iter_error_when_changing_os_environ_items(self):
845        self._test_environ_iteration(os.environ.items())
846
847    def test_iter_error_when_changing_os_environ_values(self):
848        self._test_environ_iteration(os.environ.values())
849
850
851class WalkTests(unittest.TestCase):
852    """Tests for os.walk()."""
853
854    # Wrapper to hide minor differences between os.walk and os.fwalk
855    # to tests both functions with the same code base
856    def walk(self, top, **kwargs):
857        if 'follow_symlinks' in kwargs:
858            kwargs['followlinks'] = kwargs.pop('follow_symlinks')
859        return os.walk(top, **kwargs)
860
861    def setUp(self):
862        join = os.path.join
863        self.addCleanup(support.rmtree, support.TESTFN)
864
865        # Build:
866        #     TESTFN/
867        #       TEST1/              a file kid and two directory kids
868        #         tmp1
869        #         SUB1/             a file kid and a directory kid
870        #           tmp2
871        #           SUB11/          no kids
872        #         SUB2/             a file kid and a dirsymlink kid
873        #           tmp3
874        #           SUB21/          not readable
875        #             tmp5
876        #           link/           a symlink to TESTFN.2
877        #           broken_link
878        #           broken_link2
879        #           broken_link3
880        #       TEST2/
881        #         tmp4              a lone file
882        self.walk_path = join(support.TESTFN, "TEST1")
883        self.sub1_path = join(self.walk_path, "SUB1")
884        self.sub11_path = join(self.sub1_path, "SUB11")
885        sub2_path = join(self.walk_path, "SUB2")
886        sub21_path = join(sub2_path, "SUB21")
887        tmp1_path = join(self.walk_path, "tmp1")
888        tmp2_path = join(self.sub1_path, "tmp2")
889        tmp3_path = join(sub2_path, "tmp3")
890        tmp5_path = join(sub21_path, "tmp3")
891        self.link_path = join(sub2_path, "link")
892        t2_path = join(support.TESTFN, "TEST2")
893        tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
894        broken_link_path = join(sub2_path, "broken_link")
895        broken_link2_path = join(sub2_path, "broken_link2")
896        broken_link3_path = join(sub2_path, "broken_link3")
897
898        # Create stuff.
899        os.makedirs(self.sub11_path)
900        os.makedirs(sub2_path)
901        os.makedirs(sub21_path)
902        os.makedirs(t2_path)
903
904        for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
905            with open(path, "x") as f:
906                f.write("I'm " + path + " and proud of it.  Blame test_os.\n")
907
908        if support.can_symlink():
909            os.symlink(os.path.abspath(t2_path), self.link_path)
910            os.symlink('broken', broken_link_path, True)
911            os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
912            os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
913            self.sub2_tree = (sub2_path, ["SUB21", "link"],
914                              ["broken_link", "broken_link2", "broken_link3",
915                               "tmp3"])
916        else:
917            self.sub2_tree = (sub2_path, ["SUB21"], ["tmp3"])
918
919        os.chmod(sub21_path, 0)
920        try:
921            os.listdir(sub21_path)
922        except PermissionError:
923            self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
924        else:
925            os.chmod(sub21_path, stat.S_IRWXU)
926            os.unlink(tmp5_path)
927            os.rmdir(sub21_path)
928            del self.sub2_tree[1][:1]
929
930    def test_walk_topdown(self):
931        # Walk top-down.
932        all = list(self.walk(self.walk_path))
933
934        self.assertEqual(len(all), 4)
935        # We can't know which order SUB1 and SUB2 will appear in.
936        # Not flipped:  TESTFN, SUB1, SUB11, SUB2
937        #     flipped:  TESTFN, SUB2, SUB1, SUB11
938        flipped = all[0][1][0] != "SUB1"
939        all[0][1].sort()
940        all[3 - 2 * flipped][-1].sort()
941        all[3 - 2 * flipped][1].sort()
942        self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
943        self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
944        self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
945        self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
946
947    def test_walk_prune(self, walk_path=None):
948        if walk_path is None:
949            walk_path = self.walk_path
950        # Prune the search.
951        all = []
952        for root, dirs, files in self.walk(walk_path):
953            all.append((root, dirs, files))
954            # Don't descend into SUB1.
955            if 'SUB1' in dirs:
956                # Note that this also mutates the dirs we appended to all!
957                dirs.remove('SUB1')
958
959        self.assertEqual(len(all), 2)
960        self.assertEqual(all[0], (self.walk_path, ["SUB2"], ["tmp1"]))
961
962        all[1][-1].sort()
963        all[1][1].sort()
964        self.assertEqual(all[1], self.sub2_tree)
965
966    def test_file_like_path(self):
967        self.test_walk_prune(FakePath(self.walk_path))
968
969    def test_walk_bottom_up(self):
970        # Walk bottom-up.
971        all = list(self.walk(self.walk_path, topdown=False))
972
973        self.assertEqual(len(all), 4, all)
974        # We can't know which order SUB1 and SUB2 will appear in.
975        # Not flipped:  SUB11, SUB1, SUB2, TESTFN
976        #     flipped:  SUB2, SUB11, SUB1, TESTFN
977        flipped = all[3][1][0] != "SUB1"
978        all[3][1].sort()
979        all[2 - 2 * flipped][-1].sort()
980        all[2 - 2 * flipped][1].sort()
981        self.assertEqual(all[3],
982                         (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
983        self.assertEqual(all[flipped],
984                         (self.sub11_path, [], []))
985        self.assertEqual(all[flipped + 1],
986                         (self.sub1_path, ["SUB11"], ["tmp2"]))
987        self.assertEqual(all[2 - 2 * flipped],
988                         self.sub2_tree)
989
990    def test_walk_symlink(self):
991        if not support.can_symlink():
992            self.skipTest("need symlink support")
993
994        # Walk, following symlinks.
995        walk_it = self.walk(self.walk_path, follow_symlinks=True)
996        for root, dirs, files in walk_it:
997            if root == self.link_path:
998                self.assertEqual(dirs, [])
999                self.assertEqual(files, ["tmp4"])
1000                break
1001        else:
1002            self.fail("Didn't follow symlink with followlinks=True")
1003
1004    def test_walk_bad_dir(self):
1005        # Walk top-down.
1006        errors = []
1007        walk_it = self.walk(self.walk_path, onerror=errors.append)
1008        root, dirs, files = next(walk_it)
1009        self.assertEqual(errors, [])
1010        dir1 = 'SUB1'
1011        path1 = os.path.join(root, dir1)
1012        path1new = os.path.join(root, dir1 + '.new')
1013        os.rename(path1, path1new)
1014        try:
1015            roots = [r for r, d, f in walk_it]
1016            self.assertTrue(errors)
1017            self.assertNotIn(path1, roots)
1018            self.assertNotIn(path1new, roots)
1019            for dir2 in dirs:
1020                if dir2 != dir1:
1021                    self.assertIn(os.path.join(root, dir2), roots)
1022        finally:
1023            os.rename(path1new, path1)
1024
1025
1026@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1027class FwalkTests(WalkTests):
1028    """Tests for os.fwalk()."""
1029
1030    def walk(self, top, **kwargs):
1031        for root, dirs, files, root_fd in self.fwalk(top, **kwargs):
1032            yield (root, dirs, files)
1033
1034    def fwalk(self, *args, **kwargs):
1035        return os.fwalk(*args, **kwargs)
1036
1037    def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1038        """
1039        compare with walk() results.
1040        """
1041        walk_kwargs = walk_kwargs.copy()
1042        fwalk_kwargs = fwalk_kwargs.copy()
1043        for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1044            walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1045            fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
1046
1047            expected = {}
1048            for root, dirs, files in os.walk(**walk_kwargs):
1049                expected[root] = (set(dirs), set(files))
1050
1051            for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs):
1052                self.assertIn(root, expected)
1053                self.assertEqual(expected[root], (set(dirs), set(files)))
1054
1055    def test_compare_to_walk(self):
1056        kwargs = {'top': support.TESTFN}
1057        self._compare_to_walk(kwargs, kwargs)
1058
1059    def test_dir_fd(self):
1060        try:
1061            fd = os.open(".", os.O_RDONLY)
1062            walk_kwargs = {'top': support.TESTFN}
1063            fwalk_kwargs = walk_kwargs.copy()
1064            fwalk_kwargs['dir_fd'] = fd
1065            self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1066        finally:
1067            os.close(fd)
1068
1069    def test_yields_correct_dir_fd(self):
1070        # check returned file descriptors
1071        for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1072            args = support.TESTFN, topdown, None
1073            for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks):
1074                # check that the FD is valid
1075                os.fstat(rootfd)
1076                # redundant check
1077                os.stat(rootfd)
1078                # check that listdir() returns consistent information
1079                self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
1080
1081    def test_fd_leak(self):
1082        # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1083        # we both check that calling fwalk() a large number of times doesn't
1084        # yield EMFILE, and that the minimum allocated FD hasn't changed.
1085        minfd = os.dup(1)
1086        os.close(minfd)
1087        for i in range(256):
1088            for x in self.fwalk(support.TESTFN):
1089                pass
1090        newfd = os.dup(1)
1091        self.addCleanup(os.close, newfd)
1092        self.assertEqual(newfd, minfd)
1093
1094class BytesWalkTests(WalkTests):
1095    """Tests for os.walk() with bytes."""
1096    def walk(self, top, **kwargs):
1097        if 'follow_symlinks' in kwargs:
1098            kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1099        for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1100            root = os.fsdecode(broot)
1101            dirs = list(map(os.fsdecode, bdirs))
1102            files = list(map(os.fsdecode, bfiles))
1103            yield (root, dirs, files)
1104            bdirs[:] = list(map(os.fsencode, dirs))
1105            bfiles[:] = list(map(os.fsencode, files))
1106
1107@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1108class BytesFwalkTests(FwalkTests):
1109    """Tests for os.walk() with bytes."""
1110    def fwalk(self, top='.', *args, **kwargs):
1111        for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs):
1112            root = os.fsdecode(broot)
1113            dirs = list(map(os.fsdecode, bdirs))
1114            files = list(map(os.fsdecode, bfiles))
1115            yield (root, dirs, files, topfd)
1116            bdirs[:] = list(map(os.fsencode, dirs))
1117            bfiles[:] = list(map(os.fsencode, files))
1118
1119
1120class MakedirTests(unittest.TestCase):
1121    def setUp(self):
1122        os.mkdir(support.TESTFN)
1123
1124    def test_makedir(self):
1125        base = support.TESTFN
1126        path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1127        os.makedirs(path)             # Should work
1128        path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1129        os.makedirs(path)
1130
1131        # Try paths with a '.' in them
1132        self.assertRaises(OSError, os.makedirs, os.curdir)
1133        path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1134        os.makedirs(path)
1135        path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1136                            'dir5', 'dir6')
1137        os.makedirs(path)
1138
1139    def test_mode(self):
1140        with support.temp_umask(0o002):
1141            base = support.TESTFN
1142            parent = os.path.join(base, 'dir1')
1143            path = os.path.join(parent, 'dir2')
1144            os.makedirs(path, 0o555)
1145            self.assertTrue(os.path.exists(path))
1146            self.assertTrue(os.path.isdir(path))
1147            if os.name != 'nt':
1148                self.assertEqual(os.stat(path).st_mode & 0o777, 0o555)
1149                self.assertEqual(os.stat(parent).st_mode & 0o777, 0o775)
1150
1151    def test_exist_ok_existing_directory(self):
1152        path = os.path.join(support.TESTFN, 'dir1')
1153        mode = 0o777
1154        old_mask = os.umask(0o022)
1155        os.makedirs(path, mode)
1156        self.assertRaises(OSError, os.makedirs, path, mode)
1157        self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
1158        os.makedirs(path, 0o776, exist_ok=True)
1159        os.makedirs(path, mode=mode, exist_ok=True)
1160        os.umask(old_mask)
1161
1162        # Issue #25583: A drive root could raise PermissionError on Windows
1163        os.makedirs(os.path.abspath('/'), exist_ok=True)
1164
1165    def test_exist_ok_s_isgid_directory(self):
1166        path = os.path.join(support.TESTFN, 'dir1')
1167        S_ISGID = stat.S_ISGID
1168        mode = 0o777
1169        old_mask = os.umask(0o022)
1170        try:
1171            existing_testfn_mode = stat.S_IMODE(
1172                    os.lstat(support.TESTFN).st_mode)
1173            try:
1174                os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
1175            except PermissionError:
1176                raise unittest.SkipTest('Cannot set S_ISGID for dir.')
1177            if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1178                raise unittest.SkipTest('No support for S_ISGID dir mode.')
1179            # The os should apply S_ISGID from the parent dir for us, but
1180            # this test need not depend on that behavior.  Be explicit.
1181            os.makedirs(path, mode | S_ISGID)
1182            # http://bugs.python.org/issue14992
1183            # Should not fail when the bit is already set.
1184            os.makedirs(path, mode, exist_ok=True)
1185            # remove the bit.
1186            os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
1187            # May work even when the bit is not already set when demanded.
1188            os.makedirs(path, mode | S_ISGID, exist_ok=True)
1189        finally:
1190            os.umask(old_mask)
1191
1192    def test_exist_ok_existing_regular_file(self):
1193        base = support.TESTFN
1194        path = os.path.join(support.TESTFN, 'dir1')
1195        f = open(path, 'w')
1196        f.write('abc')
1197        f.close()
1198        self.assertRaises(OSError, os.makedirs, path)
1199        self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1200        self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1201        os.remove(path)
1202
1203    def tearDown(self):
1204        path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
1205                            'dir4', 'dir5', 'dir6')
1206        # If the tests failed, the bottom-most directory ('../dir6')
1207        # may not have been created, so we look for the outermost directory
1208        # that exists.
1209        while not os.path.exists(path) and path != support.TESTFN:
1210            path = os.path.dirname(path)
1211
1212        os.removedirs(path)
1213
1214
1215@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1216class ChownFileTests(unittest.TestCase):
1217
1218    @classmethod
1219    def setUpClass(cls):
1220        os.mkdir(support.TESTFN)
1221
1222    def test_chown_uid_gid_arguments_must_be_index(self):
1223        stat = os.stat(support.TESTFN)
1224        uid = stat.st_uid
1225        gid = stat.st_gid
1226        for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1227            self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1228            self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1229        self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1230        self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1231
1232    @unittest.skipUnless(hasattr(os, 'getgroups'), 'need os.getgroups')
1233    def test_chown_gid(self):
1234        groups = os.getgroups()
1235        if len(groups) < 2:
1236            self.skipTest("test needs at least 2 groups")
1237
1238        gid_1, gid_2 = groups[:2]
1239        uid = os.stat(support.TESTFN).st_uid
1240
1241        os.chown(support.TESTFN, uid, gid_1)
1242        gid = os.stat(support.TESTFN).st_gid
1243        self.assertEqual(gid, gid_1)
1244
1245        os.chown(support.TESTFN, uid, gid_2)
1246        gid = os.stat(support.TESTFN).st_gid
1247        self.assertEqual(gid, gid_2)
1248
1249    @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1250                         "test needs root privilege and more than one user")
1251    def test_chown_with_root(self):
1252        uid_1, uid_2 = all_users[:2]
1253        gid = os.stat(support.TESTFN).st_gid
1254        os.chown(support.TESTFN, uid_1, gid)
1255        uid = os.stat(support.TESTFN).st_uid
1256        self.assertEqual(uid, uid_1)
1257        os.chown(support.TESTFN, uid_2, gid)
1258        uid = os.stat(support.TESTFN).st_uid
1259        self.assertEqual(uid, uid_2)
1260
1261    @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1262                         "test needs non-root account and more than one user")
1263    def test_chown_without_permission(self):
1264        uid_1, uid_2 = all_users[:2]
1265        gid = os.stat(support.TESTFN).st_gid
1266        with self.assertRaises(PermissionError):
1267            os.chown(support.TESTFN, uid_1, gid)
1268            os.chown(support.TESTFN, uid_2, gid)
1269
1270    @classmethod
1271    def tearDownClass(cls):
1272        os.rmdir(support.TESTFN)
1273
1274
1275class RemoveDirsTests(unittest.TestCase):
1276    def setUp(self):
1277        os.makedirs(support.TESTFN)
1278
1279    def tearDown(self):
1280        support.rmtree(support.TESTFN)
1281
1282    def test_remove_all(self):
1283        dira = os.path.join(support.TESTFN, 'dira')
1284        os.mkdir(dira)
1285        dirb = os.path.join(dira, 'dirb')
1286        os.mkdir(dirb)
1287        os.removedirs(dirb)
1288        self.assertFalse(os.path.exists(dirb))
1289        self.assertFalse(os.path.exists(dira))
1290        self.assertFalse(os.path.exists(support.TESTFN))
1291
1292    def test_remove_partial(self):
1293        dira = os.path.join(support.TESTFN, 'dira')
1294        os.mkdir(dira)
1295        dirb = os.path.join(dira, 'dirb')
1296        os.mkdir(dirb)
1297        create_file(os.path.join(dira, 'file.txt'))
1298        os.removedirs(dirb)
1299        self.assertFalse(os.path.exists(dirb))
1300        self.assertTrue(os.path.exists(dira))
1301        self.assertTrue(os.path.exists(support.TESTFN))
1302
1303    def test_remove_nothing(self):
1304        dira = os.path.join(support.TESTFN, 'dira')
1305        os.mkdir(dira)
1306        dirb = os.path.join(dira, 'dirb')
1307        os.mkdir(dirb)
1308        create_file(os.path.join(dirb, 'file.txt'))
1309        with self.assertRaises(OSError):
1310            os.removedirs(dirb)
1311        self.assertTrue(os.path.exists(dirb))
1312        self.assertTrue(os.path.exists(dira))
1313        self.assertTrue(os.path.exists(support.TESTFN))
1314
1315
1316class DevNullTests(unittest.TestCase):
1317    def test_devnull(self):
1318        with open(os.devnull, 'wb', 0) as f:
1319            f.write(b'hello')
1320            f.close()
1321        with open(os.devnull, 'rb') as f:
1322            self.assertEqual(f.read(), b'')
1323
1324
1325class URandomTests(unittest.TestCase):
1326    def test_urandom_length(self):
1327        self.assertEqual(len(os.urandom(0)), 0)
1328        self.assertEqual(len(os.urandom(1)), 1)
1329        self.assertEqual(len(os.urandom(10)), 10)
1330        self.assertEqual(len(os.urandom(100)), 100)
1331        self.assertEqual(len(os.urandom(1000)), 1000)
1332
1333    def test_urandom_value(self):
1334        data1 = os.urandom(16)
1335        self.assertIsInstance(data1, bytes)
1336        data2 = os.urandom(16)
1337        self.assertNotEqual(data1, data2)
1338
1339    def get_urandom_subprocess(self, count):
1340        code = '\n'.join((
1341            'import os, sys',
1342            'data = os.urandom(%s)' % count,
1343            'sys.stdout.buffer.write(data)',
1344            'sys.stdout.buffer.flush()'))
1345        out = assert_python_ok('-c', code)
1346        stdout = out[1]
1347        self.assertEqual(len(stdout), count)
1348        return stdout
1349
1350    def test_urandom_subprocess(self):
1351        data1 = self.get_urandom_subprocess(16)
1352        data2 = self.get_urandom_subprocess(16)
1353        self.assertNotEqual(data1, data2)
1354
1355
1356@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1357class GetRandomTests(unittest.TestCase):
1358    @classmethod
1359    def setUpClass(cls):
1360        try:
1361            os.getrandom(1)
1362        except OSError as exc:
1363            if exc.errno == errno.ENOSYS:
1364                # Python compiled on a more recent Linux version
1365                # than the current Linux kernel
1366                raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1367            else:
1368                raise
1369
1370    def test_getrandom_type(self):
1371        data = os.getrandom(16)
1372        self.assertIsInstance(data, bytes)
1373        self.assertEqual(len(data), 16)
1374
1375    def test_getrandom0(self):
1376        empty = os.getrandom(0)
1377        self.assertEqual(empty, b'')
1378
1379    def test_getrandom_random(self):
1380        self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1381
1382        # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1383        # resource /dev/random
1384
1385    def test_getrandom_nonblock(self):
1386        # The call must not fail. Check also that the flag exists
1387        try:
1388            os.getrandom(1, os.GRND_NONBLOCK)
1389        except BlockingIOError:
1390            # System urandom is not initialized yet
1391            pass
1392
1393    def test_getrandom_value(self):
1394        data1 = os.getrandom(16)
1395        data2 = os.getrandom(16)
1396        self.assertNotEqual(data1, data2)
1397
1398
1399# os.urandom() doesn't use a file descriptor when it is implemented with the
1400# getentropy() function, the getrandom() function or the getrandom() syscall
1401OS_URANDOM_DONT_USE_FD = (
1402    sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1403    or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1404    or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
1405
1406@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1407                 "os.random() does not use a file descriptor")
1408class URandomFDTests(unittest.TestCase):
1409    @unittest.skipUnless(resource, "test requires the resource module")
1410    def test_urandom_failure(self):
1411        # Check urandom() failing when it is not able to open /dev/random.
1412        # We spawn a new process to make the test more robust (if getrlimit()
1413        # failed to restore the file descriptor limit after this, the whole
1414        # test suite would crash; this actually happened on the OS X Tiger
1415        # buildbot).
1416        code = """if 1:
1417            import errno
1418            import os
1419            import resource
1420
1421            soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1422            resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1423            try:
1424                os.urandom(16)
1425            except OSError as e:
1426                assert e.errno == errno.EMFILE, e.errno
1427            else:
1428                raise AssertionError("OSError not raised")
1429            """
1430        assert_python_ok('-c', code)
1431
1432    def test_urandom_fd_closed(self):
1433        # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1434        # closed.
1435        code = """if 1:
1436            import os
1437            import sys
1438            import test.support
1439            os.urandom(4)
1440            with test.support.SuppressCrashReport():
1441                os.closerange(3, 256)
1442            sys.stdout.buffer.write(os.urandom(4))
1443            """
1444        rc, out, err = assert_python_ok('-Sc', code)
1445
1446    def test_urandom_fd_reopened(self):
1447        # Issue #21207: urandom() should detect its fd to /dev/urandom
1448        # changed to something else, and reopen it.
1449        self.addCleanup(support.unlink, support.TESTFN)
1450        create_file(support.TESTFN, b"x" * 256)
1451
1452        code = """if 1:
1453            import os
1454            import sys
1455            import test.support
1456            os.urandom(4)
1457            with test.support.SuppressCrashReport():
1458                for fd in range(3, 256):
1459                    try:
1460                        os.close(fd)
1461                    except OSError:
1462                        pass
1463                    else:
1464                        # Found the urandom fd (XXX hopefully)
1465                        break
1466                os.closerange(3, 256)
1467            with open({TESTFN!r}, 'rb') as f:
1468                new_fd = f.fileno()
1469                # Issue #26935: posix allows new_fd and fd to be equal but
1470                # some libc implementations have dup2 return an error in this
1471                # case.
1472                if new_fd != fd:
1473                    os.dup2(new_fd, fd)
1474                sys.stdout.buffer.write(os.urandom(4))
1475                sys.stdout.buffer.write(os.urandom(4))
1476            """.format(TESTFN=support.TESTFN)
1477        rc, out, err = assert_python_ok('-Sc', code)
1478        self.assertEqual(len(out), 8)
1479        self.assertNotEqual(out[0:4], out[4:8])
1480        rc, out2, err2 = assert_python_ok('-Sc', code)
1481        self.assertEqual(len(out2), 8)
1482        self.assertNotEqual(out2, out)
1483
1484
1485@contextlib.contextmanager
1486def _execvpe_mockup(defpath=None):
1487    """
1488    Stubs out execv and execve functions when used as context manager.
1489    Records exec calls. The mock execv and execve functions always raise an
1490    exception as they would normally never return.
1491    """
1492    # A list of tuples containing (function name, first arg, args)
1493    # of calls to execv or execve that have been made.
1494    calls = []
1495
1496    def mock_execv(name, *args):
1497        calls.append(('execv', name, args))
1498        raise RuntimeError("execv called")
1499
1500    def mock_execve(name, *args):
1501        calls.append(('execve', name, args))
1502        raise OSError(errno.ENOTDIR, "execve called")
1503
1504    try:
1505        orig_execv = os.execv
1506        orig_execve = os.execve
1507        orig_defpath = os.defpath
1508        os.execv = mock_execv
1509        os.execve = mock_execve
1510        if defpath is not None:
1511            os.defpath = defpath
1512        yield calls
1513    finally:
1514        os.execv = orig_execv
1515        os.execve = orig_execve
1516        os.defpath = orig_defpath
1517
1518
1519class ExecTests(unittest.TestCase):
1520    @unittest.skipIf(USING_LINUXTHREADS,
1521                     "avoid triggering a linuxthreads bug: see issue #4970")
1522    def test_execvpe_with_bad_program(self):
1523        self.assertRaises(OSError, os.execvpe, 'no such app-',
1524                          ['no such app-'], None)
1525
1526    def test_execv_with_bad_arglist(self):
1527        self.assertRaises(ValueError, os.execv, 'notepad', ())
1528        self.assertRaises(ValueError, os.execv, 'notepad', [])
1529        self.assertRaises(ValueError, os.execv, 'notepad', ('',))
1530        self.assertRaises(ValueError, os.execv, 'notepad', [''])
1531
1532    def test_execvpe_with_bad_arglist(self):
1533        self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
1534        self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
1535        self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
1536
1537    @unittest.skipUnless(hasattr(os, '_execvpe'),
1538                         "No internal os._execvpe function to test.")
1539    def _test_internal_execvpe(self, test_type):
1540        program_path = os.sep + 'absolutepath'
1541        if test_type is bytes:
1542            program = b'executable'
1543            fullpath = os.path.join(os.fsencode(program_path), program)
1544            native_fullpath = fullpath
1545            arguments = [b'progname', 'arg1', 'arg2']
1546        else:
1547            program = 'executable'
1548            arguments = ['progname', 'arg1', 'arg2']
1549            fullpath = os.path.join(program_path, program)
1550            if os.name != "nt":
1551                native_fullpath = os.fsencode(fullpath)
1552            else:
1553                native_fullpath = fullpath
1554        env = {'spam': 'beans'}
1555
1556        # test os._execvpe() with an absolute path
1557        with _execvpe_mockup() as calls:
1558            self.assertRaises(RuntimeError,
1559                os._execvpe, fullpath, arguments)
1560            self.assertEqual(len(calls), 1)
1561            self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1562
1563        # test os._execvpe() with a relative path:
1564        # os.get_exec_path() returns defpath
1565        with _execvpe_mockup(defpath=program_path) as calls:
1566            self.assertRaises(OSError,
1567                os._execvpe, program, arguments, env=env)
1568            self.assertEqual(len(calls), 1)
1569            self.assertSequenceEqual(calls[0],
1570                ('execve', native_fullpath, (arguments, env)))
1571
1572        # test os._execvpe() with a relative path:
1573        # os.get_exec_path() reads the 'PATH' variable
1574        with _execvpe_mockup() as calls:
1575            env_path = env.copy()
1576            if test_type is bytes:
1577                env_path[b'PATH'] = program_path
1578            else:
1579                env_path['PATH'] = program_path
1580            self.assertRaises(OSError,
1581                os._execvpe, program, arguments, env=env_path)
1582            self.assertEqual(len(calls), 1)
1583            self.assertSequenceEqual(calls[0],
1584                ('execve', native_fullpath, (arguments, env_path)))
1585
1586    def test_internal_execvpe_str(self):
1587        self._test_internal_execvpe(str)
1588        if os.name != "nt":
1589            self._test_internal_execvpe(bytes)
1590
1591    def test_execve_invalid_env(self):
1592        args = [sys.executable, '-c', 'pass']
1593
1594        # null character in the environment variable name
1595        newenv = os.environ.copy()
1596        newenv["FRUIT\0VEGETABLE"] = "cabbage"
1597        with self.assertRaises(ValueError):
1598            os.execve(args[0], args, newenv)
1599
1600        # null character in the environment variable value
1601        newenv = os.environ.copy()
1602        newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
1603        with self.assertRaises(ValueError):
1604            os.execve(args[0], args, newenv)
1605
1606        # equal character in the environment variable name
1607        newenv = os.environ.copy()
1608        newenv["FRUIT=ORANGE"] = "lemon"
1609        with self.assertRaises(ValueError):
1610            os.execve(args[0], args, newenv)
1611
1612    @unittest.skipUnless(sys.platform == "win32", "Win32-specific test")
1613    def test_execve_with_empty_path(self):
1614        # bpo-32890: Check GetLastError() misuse
1615        try:
1616            os.execve('', ['arg'], {})
1617        except OSError as e:
1618            self.assertTrue(e.winerror is None or e.winerror != 0)
1619        else:
1620            self.fail('No OSError raised')
1621
1622
1623@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1624class Win32ErrorTests(unittest.TestCase):
1625    def setUp(self):
1626        try:
1627            os.stat(support.TESTFN)
1628        except FileNotFoundError:
1629            exists = False
1630        except OSError as exc:
1631            exists = True
1632            self.fail("file %s must not exist; os.stat failed with %s"
1633                      % (support.TESTFN, exc))
1634        else:
1635            self.fail("file %s must not exist" % support.TESTFN)
1636
1637    def test_rename(self):
1638        self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
1639
1640    def test_remove(self):
1641        self.assertRaises(OSError, os.remove, support.TESTFN)
1642
1643    def test_chdir(self):
1644        self.assertRaises(OSError, os.chdir, support.TESTFN)
1645
1646    def test_mkdir(self):
1647        self.addCleanup(support.unlink, support.TESTFN)
1648
1649        with open(support.TESTFN, "x") as f:
1650            self.assertRaises(OSError, os.mkdir, support.TESTFN)
1651
1652    def test_utime(self):
1653        self.assertRaises(OSError, os.utime, support.TESTFN, None)
1654
1655    def test_chmod(self):
1656        self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
1657
1658
1659class TestInvalidFD(unittest.TestCase):
1660    singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
1661               "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1662    #singles.append("close")
1663    #We omit close because it doesn't raise an exception on some platforms
1664    def get_single(f):
1665        def helper(self):
1666            if  hasattr(os, f):
1667                self.check(getattr(os, f))
1668        return helper
1669    for f in singles:
1670        locals()["test_"+f] = get_single(f)
1671
1672    def check(self, f, *args):
1673        try:
1674            f(support.make_bad_fd(), *args)
1675        except OSError as e:
1676            self.assertEqual(e.errno, errno.EBADF)
1677        else:
1678            self.fail("%r didn't raise an OSError with a bad file descriptor"
1679                      % f)
1680
1681    @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
1682    def test_isatty(self):
1683        self.assertEqual(os.isatty(support.make_bad_fd()), False)
1684
1685    @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
1686    def test_closerange(self):
1687        fd = support.make_bad_fd()
1688        # Make sure none of the descriptors we are about to close are
1689        # currently valid (issue 6542).
1690        for i in range(10):
1691            try: os.fstat(fd+i)
1692            except OSError:
1693                pass
1694            else:
1695                break
1696        if i < 2:
1697            raise unittest.SkipTest(
1698                "Unable to acquire a range of invalid file descriptors")
1699        self.assertEqual(os.closerange(fd, fd + i-1), None)
1700
1701    @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
1702    def test_dup2(self):
1703        self.check(os.dup2, 20)
1704
1705    @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
1706    def test_fchmod(self):
1707        self.check(os.fchmod, 0)
1708
1709    @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
1710    def test_fchown(self):
1711        self.check(os.fchown, -1, -1)
1712
1713    @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
1714    def test_fpathconf(self):
1715        self.check(os.pathconf, "PC_NAME_MAX")
1716        self.check(os.fpathconf, "PC_NAME_MAX")
1717
1718    @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
1719    def test_ftruncate(self):
1720        self.check(os.truncate, 0)
1721        self.check(os.ftruncate, 0)
1722
1723    @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
1724    def test_lseek(self):
1725        self.check(os.lseek, 0, 0)
1726
1727    @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
1728    def test_read(self):
1729        self.check(os.read, 1)
1730
1731    @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1732    def test_readv(self):
1733        buf = bytearray(10)
1734        self.check(os.readv, [buf])
1735
1736    @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
1737    def test_tcsetpgrpt(self):
1738        self.check(os.tcsetpgrp, 0)
1739
1740    @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
1741    def test_write(self):
1742        self.check(os.write, b" ")
1743
1744    @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1745    def test_writev(self):
1746        self.check(os.writev, [b'abc'])
1747
1748    def test_inheritable(self):
1749        self.check(os.get_inheritable)
1750        self.check(os.set_inheritable, True)
1751
1752    @unittest.skipUnless(hasattr(os, 'get_blocking'),
1753                         'needs os.get_blocking() and os.set_blocking()')
1754    def test_blocking(self):
1755        self.check(os.get_blocking)
1756        self.check(os.set_blocking, True)
1757
1758
1759class LinkTests(unittest.TestCase):
1760    def setUp(self):
1761        self.file1 = support.TESTFN
1762        self.file2 = os.path.join(support.TESTFN + "2")
1763
1764    def tearDown(self):
1765        for file in (self.file1, self.file2):
1766            if os.path.exists(file):
1767                os.unlink(file)
1768
1769    def _test_link(self, file1, file2):
1770        create_file(file1)
1771
1772        try:
1773            os.link(file1, file2)
1774        except PermissionError as e:
1775            self.skipTest('os.link(): %s' % e)
1776        with open(file1, "r") as f1, open(file2, "r") as f2:
1777            self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1778
1779    def test_link(self):
1780        self._test_link(self.file1, self.file2)
1781
1782    def test_link_bytes(self):
1783        self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1784                        bytes(self.file2, sys.getfilesystemencoding()))
1785
1786    def test_unicode_name(self):
1787        try:
1788            os.fsencode("\xf1")
1789        except UnicodeError:
1790            raise unittest.SkipTest("Unable to encode for this platform.")
1791
1792        self.file1 += "\xf1"
1793        self.file2 = self.file1 + "2"
1794        self._test_link(self.file1, self.file2)
1795
1796@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1797class PosixUidGidTests(unittest.TestCase):
1798    # uid_t and gid_t are 32-bit unsigned integers on Linux
1799    UID_OVERFLOW = (1 << 32)
1800    GID_OVERFLOW = (1 << 32)
1801
1802    @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1803    def test_setuid(self):
1804        if os.getuid() != 0:
1805            self.assertRaises(OSError, os.setuid, 0)
1806        self.assertRaises(TypeError, os.setuid, 'not an int')
1807        self.assertRaises(OverflowError, os.setuid, self.UID_OVERFLOW)
1808
1809    @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1810    def test_setgid(self):
1811        if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1812            self.assertRaises(OSError, os.setgid, 0)
1813        self.assertRaises(TypeError, os.setgid, 'not an int')
1814        self.assertRaises(OverflowError, os.setgid, self.GID_OVERFLOW)
1815
1816    @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1817    def test_seteuid(self):
1818        if os.getuid() != 0:
1819            self.assertRaises(OSError, os.seteuid, 0)
1820        self.assertRaises(TypeError, os.setegid, 'not an int')
1821        self.assertRaises(OverflowError, os.seteuid, self.UID_OVERFLOW)
1822
1823    @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1824    def test_setegid(self):
1825        if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1826            self.assertRaises(OSError, os.setegid, 0)
1827        self.assertRaises(TypeError, os.setegid, 'not an int')
1828        self.assertRaises(OverflowError, os.setegid, self.GID_OVERFLOW)
1829
1830    @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1831    def test_setreuid(self):
1832        if os.getuid() != 0:
1833            self.assertRaises(OSError, os.setreuid, 0, 0)
1834        self.assertRaises(TypeError, os.setreuid, 'not an int', 0)
1835        self.assertRaises(TypeError, os.setreuid, 0, 'not an int')
1836        self.assertRaises(OverflowError, os.setreuid, self.UID_OVERFLOW, 0)
1837        self.assertRaises(OverflowError, os.setreuid, 0, self.UID_OVERFLOW)
1838
1839    @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1840    def test_setreuid_neg1(self):
1841        # Needs to accept -1.  We run this in a subprocess to avoid
1842        # altering the test runner's process state (issue8045).
1843        subprocess.check_call([
1844                sys.executable, '-c',
1845                'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
1846
1847    @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1848    def test_setregid(self):
1849        if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1850            self.assertRaises(OSError, os.setregid, 0, 0)
1851        self.assertRaises(TypeError, os.setregid, 'not an int', 0)
1852        self.assertRaises(TypeError, os.setregid, 0, 'not an int')
1853        self.assertRaises(OverflowError, os.setregid, self.GID_OVERFLOW, 0)
1854        self.assertRaises(OverflowError, os.setregid, 0, self.GID_OVERFLOW)
1855
1856    @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1857    def test_setregid_neg1(self):
1858        # Needs to accept -1.  We run this in a subprocess to avoid
1859        # altering the test runner's process state (issue8045).
1860        subprocess.check_call([
1861                sys.executable, '-c',
1862                'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
1863
1864@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1865class Pep383Tests(unittest.TestCase):
1866    def setUp(self):
1867        if support.TESTFN_UNENCODABLE:
1868            self.dir = support.TESTFN_UNENCODABLE
1869        elif support.TESTFN_NONASCII:
1870            self.dir = support.TESTFN_NONASCII
1871        else:
1872            self.dir = support.TESTFN
1873        self.bdir = os.fsencode(self.dir)
1874
1875        bytesfn = []
1876        def add_filename(fn):
1877            try:
1878                fn = os.fsencode(fn)
1879            except UnicodeEncodeError:
1880                return
1881            bytesfn.append(fn)
1882        add_filename(support.TESTFN_UNICODE)
1883        if support.TESTFN_UNENCODABLE:
1884            add_filename(support.TESTFN_UNENCODABLE)
1885        if support.TESTFN_NONASCII:
1886            add_filename(support.TESTFN_NONASCII)
1887        if not bytesfn:
1888            self.skipTest("couldn't create any non-ascii filename")
1889
1890        self.unicodefn = set()
1891        os.mkdir(self.dir)
1892        try:
1893            for fn in bytesfn:
1894                support.create_empty_file(os.path.join(self.bdir, fn))
1895                fn = os.fsdecode(fn)
1896                if fn in self.unicodefn:
1897                    raise ValueError("duplicate filename")
1898                self.unicodefn.add(fn)
1899        except:
1900            shutil.rmtree(self.dir)
1901            raise
1902
1903    def tearDown(self):
1904        shutil.rmtree(self.dir)
1905
1906    def test_listdir(self):
1907        expected = self.unicodefn
1908        found = set(os.listdir(self.dir))
1909        self.assertEqual(found, expected)
1910        # test listdir without arguments
1911        current_directory = os.getcwd()
1912        try:
1913            os.chdir(os.sep)
1914            self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1915        finally:
1916            os.chdir(current_directory)
1917
1918    def test_open(self):
1919        for fn in self.unicodefn:
1920            f = open(os.path.join(self.dir, fn), 'rb')
1921            f.close()
1922
1923    @unittest.skipUnless(hasattr(os, 'statvfs'),
1924                            "need os.statvfs()")
1925    def test_statvfs(self):
1926        # issue #9645
1927        for fn in self.unicodefn:
1928            # should not fail with file not found error
1929            fullname = os.path.join(self.dir, fn)
1930            os.statvfs(fullname)
1931
1932    def test_stat(self):
1933        for fn in self.unicodefn:
1934            os.stat(os.path.join(self.dir, fn))
1935
1936@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1937class Win32KillTests(unittest.TestCase):
1938    def _kill(self, sig):
1939        # Start sys.executable as a subprocess and communicate from the
1940        # subprocess to the parent that the interpreter is ready. When it
1941        # becomes ready, send *sig* via os.kill to the subprocess and check
1942        # that the return code is equal to *sig*.
1943        import ctypes
1944        from ctypes import wintypes
1945        import msvcrt
1946
1947        # Since we can't access the contents of the process' stdout until the
1948        # process has exited, use PeekNamedPipe to see what's inside stdout
1949        # without waiting. This is done so we can tell that the interpreter
1950        # is started and running at a point where it could handle a signal.
1951        PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1952        PeekNamedPipe.restype = wintypes.BOOL
1953        PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1954                                  ctypes.POINTER(ctypes.c_char), # stdout buf
1955                                  wintypes.DWORD, # Buffer size
1956                                  ctypes.POINTER(wintypes.DWORD), # bytes read
1957                                  ctypes.POINTER(wintypes.DWORD), # bytes avail
1958                                  ctypes.POINTER(wintypes.DWORD)) # bytes left
1959        msg = "running"
1960        proc = subprocess.Popen([sys.executable, "-c",
1961                                 "import sys;"
1962                                 "sys.stdout.write('{}');"
1963                                 "sys.stdout.flush();"
1964                                 "input()".format(msg)],
1965                                stdout=subprocess.PIPE,
1966                                stderr=subprocess.PIPE,
1967                                stdin=subprocess.PIPE)
1968        self.addCleanup(proc.stdout.close)
1969        self.addCleanup(proc.stderr.close)
1970        self.addCleanup(proc.stdin.close)
1971
1972        count, max = 0, 100
1973        while count < max and proc.poll() is None:
1974            # Create a string buffer to store the result of stdout from the pipe
1975            buf = ctypes.create_string_buffer(len(msg))
1976            # Obtain the text currently in proc.stdout
1977            # Bytes read/avail/left are left as NULL and unused
1978            rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1979                                 buf, ctypes.sizeof(buf), None, None, None)
1980            self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1981            if buf.value:
1982                self.assertEqual(msg, buf.value.decode())
1983                break
1984            time.sleep(0.1)
1985            count += 1
1986        else:
1987            self.fail("Did not receive communication from the subprocess")
1988
1989        os.kill(proc.pid, sig)
1990        self.assertEqual(proc.wait(), sig)
1991
1992    def test_kill_sigterm(self):
1993        # SIGTERM doesn't mean anything special, but make sure it works
1994        self._kill(signal.SIGTERM)
1995
1996    def test_kill_int(self):
1997        # os.kill on Windows can take an int which gets set as the exit code
1998        self._kill(100)
1999
2000    def _kill_with_event(self, event, name):
2001        tagname = "test_os_%s" % uuid.uuid1()
2002        m = mmap.mmap(-1, 1, tagname)
2003        m[0] = 0
2004        # Run a script which has console control handling enabled.
2005        proc = subprocess.Popen([sys.executable,
2006                   os.path.join(os.path.dirname(__file__),
2007                                "win_console_handler.py"), tagname],
2008                   creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
2009        # Let the interpreter startup before we send signals. See #3137.
2010        count, max = 0, 100
2011        while count < max and proc.poll() is None:
2012            if m[0] == 1:
2013                break
2014            time.sleep(0.1)
2015            count += 1
2016        else:
2017            # Forcefully kill the process if we weren't able to signal it.
2018            os.kill(proc.pid, signal.SIGINT)
2019            self.fail("Subprocess didn't finish initialization")
2020        os.kill(proc.pid, event)
2021        # proc.send_signal(event) could also be done here.
2022        # Allow time for the signal to be passed and the process to exit.
2023        time.sleep(0.5)
2024        if not proc.poll():
2025            # Forcefully kill the process if we weren't able to signal it.
2026            os.kill(proc.pid, signal.SIGINT)
2027            self.fail("subprocess did not stop on {}".format(name))
2028
2029    @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
2030    def test_CTRL_C_EVENT(self):
2031        from ctypes import wintypes
2032        import ctypes
2033
2034        # Make a NULL value by creating a pointer with no argument.
2035        NULL = ctypes.POINTER(ctypes.c_int)()
2036        SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
2037        SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
2038                                          wintypes.BOOL)
2039        SetConsoleCtrlHandler.restype = wintypes.BOOL
2040
2041        # Calling this with NULL and FALSE causes the calling process to
2042        # handle Ctrl+C, rather than ignore it. This property is inherited
2043        # by subprocesses.
2044        SetConsoleCtrlHandler(NULL, 0)
2045
2046        self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
2047
2048    def test_CTRL_BREAK_EVENT(self):
2049        self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
2050
2051
2052@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2053class Win32ListdirTests(unittest.TestCase):
2054    """Test listdir on Windows."""
2055
2056    def setUp(self):
2057        self.created_paths = []
2058        for i in range(2):
2059            dir_name = 'SUB%d' % i
2060            dir_path = os.path.join(support.TESTFN, dir_name)
2061            file_name = 'FILE%d' % i
2062            file_path = os.path.join(support.TESTFN, file_name)
2063            os.makedirs(dir_path)
2064            with open(file_path, 'w') as f:
2065                f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
2066            self.created_paths.extend([dir_name, file_name])
2067        self.created_paths.sort()
2068
2069    def tearDown(self):
2070        shutil.rmtree(support.TESTFN)
2071
2072    def test_listdir_no_extended_path(self):
2073        """Test when the path is not an "extended" path."""
2074        # unicode
2075        self.assertEqual(
2076                sorted(os.listdir(support.TESTFN)),
2077                self.created_paths)
2078
2079        # bytes
2080        self.assertEqual(
2081                sorted(os.listdir(os.fsencode(support.TESTFN))),
2082                [os.fsencode(path) for path in self.created_paths])
2083
2084    def test_listdir_extended_path(self):
2085        """Test when the path starts with '\\\\?\\'."""
2086        # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
2087        # unicode
2088        path = '\\\\?\\' + os.path.abspath(support.TESTFN)
2089        self.assertEqual(
2090                sorted(os.listdir(path)),
2091                self.created_paths)
2092
2093        # bytes
2094        path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
2095        self.assertEqual(
2096                sorted(os.listdir(path)),
2097                [os.fsencode(path) for path in self.created_paths])
2098
2099
2100@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2101@support.skip_unless_symlink
2102class Win32SymlinkTests(unittest.TestCase):
2103    filelink = 'filelinktest'
2104    filelink_target = os.path.abspath(__file__)
2105    dirlink = 'dirlinktest'
2106    dirlink_target = os.path.dirname(filelink_target)
2107    missing_link = 'missing link'
2108
2109    def setUp(self):
2110        assert os.path.exists(self.dirlink_target)
2111        assert os.path.exists(self.filelink_target)
2112        assert not os.path.exists(self.dirlink)
2113        assert not os.path.exists(self.filelink)
2114        assert not os.path.exists(self.missing_link)
2115
2116    def tearDown(self):
2117        if os.path.exists(self.filelink):
2118            os.remove(self.filelink)
2119        if os.path.exists(self.dirlink):
2120            os.rmdir(self.dirlink)
2121        if os.path.lexists(self.missing_link):
2122            os.remove(self.missing_link)
2123
2124    def test_directory_link(self):
2125        os.symlink(self.dirlink_target, self.dirlink)
2126        self.assertTrue(os.path.exists(self.dirlink))
2127        self.assertTrue(os.path.isdir(self.dirlink))
2128        self.assertTrue(os.path.islink(self.dirlink))
2129        self.check_stat(self.dirlink, self.dirlink_target)
2130
2131    def test_file_link(self):
2132        os.symlink(self.filelink_target, self.filelink)
2133        self.assertTrue(os.path.exists(self.filelink))
2134        self.assertTrue(os.path.isfile(self.filelink))
2135        self.assertTrue(os.path.islink(self.filelink))
2136        self.check_stat(self.filelink, self.filelink_target)
2137
2138    def _create_missing_dir_link(self):
2139        'Create a "directory" link to a non-existent target'
2140        linkname = self.missing_link
2141        if os.path.lexists(linkname):
2142            os.remove(linkname)
2143        target = r'c:\\target does not exist.29r3c740'
2144        assert not os.path.exists(target)
2145        target_is_dir = True
2146        os.symlink(target, linkname, target_is_dir)
2147
2148    def test_remove_directory_link_to_missing_target(self):
2149        self._create_missing_dir_link()
2150        # For compatibility with Unix, os.remove will check the
2151        #  directory status and call RemoveDirectory if the symlink
2152        #  was created with target_is_dir==True.
2153        os.remove(self.missing_link)
2154
2155    @unittest.skip("currently fails; consider for improvement")
2156    def test_isdir_on_directory_link_to_missing_target(self):
2157        self._create_missing_dir_link()
2158        # consider having isdir return true for directory links
2159        self.assertTrue(os.path.isdir(self.missing_link))
2160
2161    @unittest.skip("currently fails; consider for improvement")
2162    def test_rmdir_on_directory_link_to_missing_target(self):
2163        self._create_missing_dir_link()
2164        # consider allowing rmdir to remove directory links
2165        os.rmdir(self.missing_link)
2166
2167    def check_stat(self, link, target):
2168        self.assertEqual(os.stat(link), os.stat(target))
2169        self.assertNotEqual(os.lstat(link), os.stat(link))
2170
2171        bytes_link = os.fsencode(link)
2172        self.assertEqual(os.stat(bytes_link), os.stat(target))
2173        self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
2174
2175    def test_12084(self):
2176        level1 = os.path.abspath(support.TESTFN)
2177        level2 = os.path.join(level1, "level2")
2178        level3 = os.path.join(level2, "level3")
2179        self.addCleanup(support.rmtree, level1)
2180
2181        os.mkdir(level1)
2182        os.mkdir(level2)
2183        os.mkdir(level3)
2184
2185        file1 = os.path.abspath(os.path.join(level1, "file1"))
2186        create_file(file1)
2187
2188        orig_dir = os.getcwd()
2189        try:
2190            os.chdir(level2)
2191            link = os.path.join(level2, "link")
2192            os.symlink(os.path.relpath(file1), "link")
2193            self.assertIn("link", os.listdir(os.getcwd()))
2194
2195            # Check os.stat calls from the same dir as the link
2196            self.assertEqual(os.stat(file1), os.stat("link"))
2197
2198            # Check os.stat calls from a dir below the link
2199            os.chdir(level1)
2200            self.assertEqual(os.stat(file1),
2201                             os.stat(os.path.relpath(link)))
2202
2203            # Check os.stat calls from a dir above the link
2204            os.chdir(level3)
2205            self.assertEqual(os.stat(file1),
2206                             os.stat(os.path.relpath(link)))
2207        finally:
2208            os.chdir(orig_dir)
2209
2210    @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users')
2211                            and os.path.exists(r'C:\ProgramData'),
2212                            'Test directories not found')
2213    def test_29248(self):
2214        # os.symlink() calls CreateSymbolicLink, which creates
2215        # the reparse data buffer with the print name stored
2216        # first, so the offset is always 0. CreateSymbolicLink
2217        # stores the "PrintName" DOS path (e.g. "C:\") first,
2218        # with an offset of 0, followed by the "SubstituteName"
2219        # NT path (e.g. "\??\C:\"). The "All Users" link, on
2220        # the other hand, seems to have been created manually
2221        # with an inverted order.
2222        target = os.readlink(r'C:\Users\All Users')
2223        self.assertTrue(os.path.samefile(target, r'C:\ProgramData'))
2224
2225    def test_buffer_overflow(self):
2226        # Older versions would have a buffer overflow when detecting
2227        # whether a link source was a directory. This test ensures we
2228        # no longer crash, but does not otherwise validate the behavior
2229        segment = 'X' * 27
2230        path = os.path.join(*[segment] * 10)
2231        test_cases = [
2232            # overflow with absolute src
2233            ('\\' + path, segment),
2234            # overflow dest with relative src
2235            (segment, path),
2236            # overflow when joining src
2237            (path[:180], path[:180]),
2238        ]
2239        for src, dest in test_cases:
2240            try:
2241                os.symlink(src, dest)
2242            except FileNotFoundError:
2243                pass
2244            else:
2245                try:
2246                    os.remove(dest)
2247                except OSError:
2248                    pass
2249            # Also test with bytes, since that is a separate code path.
2250            try:
2251                os.symlink(os.fsencode(src), os.fsencode(dest))
2252            except FileNotFoundError:
2253                pass
2254            else:
2255                try:
2256                    os.remove(dest)
2257                except OSError:
2258                    pass
2259
2260@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2261class Win32JunctionTests(unittest.TestCase):
2262    junction = 'junctiontest'
2263    junction_target = os.path.dirname(os.path.abspath(__file__))
2264
2265    def setUp(self):
2266        assert os.path.exists(self.junction_target)
2267        assert not os.path.exists(self.junction)
2268
2269    def tearDown(self):
2270        if os.path.exists(self.junction):
2271            # os.rmdir delegates to Windows' RemoveDirectoryW,
2272            # which removes junction points safely.
2273            os.rmdir(self.junction)
2274
2275    def test_create_junction(self):
2276        _winapi.CreateJunction(self.junction_target, self.junction)
2277        self.assertTrue(os.path.exists(self.junction))
2278        self.assertTrue(os.path.isdir(self.junction))
2279
2280        # Junctions are not recognized as links.
2281        self.assertFalse(os.path.islink(self.junction))
2282
2283    def test_unlink_removes_junction(self):
2284        _winapi.CreateJunction(self.junction_target, self.junction)
2285        self.assertTrue(os.path.exists(self.junction))
2286
2287        os.unlink(self.junction)
2288        self.assertFalse(os.path.exists(self.junction))
2289
2290@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2291class Win32NtTests(unittest.TestCase):
2292    def setUp(self):
2293        from test import support
2294        self.nt = support.import_module('nt')
2295        pass
2296
2297    def tearDown(self):
2298        pass
2299
2300    def test_getfinalpathname_handles(self):
2301        try:
2302            import ctypes, ctypes.wintypes
2303        except ImportError:
2304            raise unittest.SkipTest('ctypes module is required for this test')
2305
2306        kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True)
2307        kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE
2308
2309        kernel.GetProcessHandleCount.restype = ctypes.wintypes.BOOL
2310        kernel.GetProcessHandleCount.argtypes = (ctypes.wintypes.HANDLE,
2311                                                 ctypes.wintypes.LPDWORD)
2312
2313        # This is a pseudo-handle that doesn't need to be closed
2314        hproc = kernel.GetCurrentProcess()
2315
2316        handle_count = ctypes.wintypes.DWORD()
2317        ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2318        self.assertEqual(1, ok)
2319
2320        before_count = handle_count.value
2321
2322        # The first two test the error path, __file__ tests the success path
2323        filenames = [ r'\\?\C:',
2324                      r'\\?\NUL',
2325                      r'\\?\CONIN',
2326                      __file__ ]
2327
2328        for i in range(10):
2329            for name in filenames:
2330                try:
2331                    tmp = self.nt._getfinalpathname(name)
2332                except:
2333                    # Failure is expected
2334                    pass
2335                try:
2336                    tmp = os.stat(name)
2337                except:
2338                    pass
2339
2340        ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2341        self.assertEqual(1, ok)
2342
2343        handle_delta = handle_count.value - before_count
2344
2345        self.assertEqual(0, handle_delta)
2346
2347@support.skip_unless_symlink
2348class NonLocalSymlinkTests(unittest.TestCase):
2349
2350    def setUp(self):
2351        r"""
2352        Create this structure:
2353
2354        base
2355         \___ some_dir
2356        """
2357        os.makedirs('base/some_dir')
2358
2359    def tearDown(self):
2360        shutil.rmtree('base')
2361
2362    def test_directory_link_nonlocal(self):
2363        """
2364        The symlink target should resolve relative to the link, not relative
2365        to the current directory.
2366
2367        Then, link base/some_link -> base/some_dir and ensure that some_link
2368        is resolved as a directory.
2369
2370        In issue13772, it was discovered that directory detection failed if
2371        the symlink target was not specified relative to the current
2372        directory, which was a defect in the implementation.
2373        """
2374        src = os.path.join('base', 'some_link')
2375        os.symlink('some_dir', src)
2376        assert os.path.isdir(src)
2377
2378
2379class FSEncodingTests(unittest.TestCase):
2380    def test_nop(self):
2381        self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2382        self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
2383
2384    def test_identity(self):
2385        # assert fsdecode(fsencode(x)) == x
2386        for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2387            try:
2388                bytesfn = os.fsencode(fn)
2389            except UnicodeEncodeError:
2390                continue
2391            self.assertEqual(os.fsdecode(bytesfn), fn)
2392
2393
2394
2395class DeviceEncodingTests(unittest.TestCase):
2396
2397    def test_bad_fd(self):
2398        # Return None when an fd doesn't actually exist.
2399        self.assertIsNone(os.device_encoding(123456))
2400
2401    @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
2402            (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
2403            'test requires a tty and either Windows or nl_langinfo(CODESET)')
2404    def test_device_encoding(self):
2405        encoding = os.device_encoding(0)
2406        self.assertIsNotNone(encoding)
2407        self.assertTrue(codecs.lookup(encoding))
2408
2409
2410class PidTests(unittest.TestCase):
2411    @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2412    def test_getppid(self):
2413        p = subprocess.Popen([sys.executable, '-c',
2414                              'import os; print(os.getppid())'],
2415                             stdout=subprocess.PIPE)
2416        stdout, _ = p.communicate()
2417        # We are the parent of our subprocess
2418        self.assertEqual(int(stdout), os.getpid())
2419
2420    def check_waitpid(self, code, exitcode):
2421        if sys.platform == 'win32':
2422            # On Windows, os.spawnv() simply joins arguments with spaces:
2423            # arguments need to be quoted
2424            args = [f'"{sys.executable}"', '-c', f'"{code}"']
2425        else:
2426            args = [sys.executable, '-c', code]
2427        pid = os.spawnv(os.P_NOWAIT, sys.executable, args)
2428
2429        pid2, status = os.waitpid(pid, 0)
2430        if sys.platform == 'win32':
2431            self.assertEqual(status, exitcode << 8)
2432        else:
2433            self.assertTrue(os.WIFEXITED(status), status)
2434            self.assertEqual(os.WEXITSTATUS(status), exitcode)
2435        self.assertEqual(pid2, pid)
2436
2437    def test_waitpid(self):
2438        self.check_waitpid(code='pass', exitcode=0)
2439
2440    def test_waitpid_exitcode(self):
2441        exitcode = 23
2442        code = f'import sys; sys.exit({exitcode})'
2443        self.check_waitpid(code, exitcode=exitcode)
2444
2445    @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
2446    def test_waitpid_windows(self):
2447        # bpo-40138: test os.waitpid() with exit code larger than INT_MAX.
2448        STATUS_CONTROL_C_EXIT = 0xC000013A
2449        code = f'import _winapi; _winapi.ExitProcess({STATUS_CONTROL_C_EXIT})'
2450        self.check_waitpid(code, exitcode=STATUS_CONTROL_C_EXIT)
2451
2452
2453class SpawnTests(unittest.TestCase):
2454    def create_args(self, *, with_env=False, use_bytes=False):
2455        self.exitcode = 17
2456
2457        filename = support.TESTFN
2458        self.addCleanup(support.unlink, filename)
2459
2460        if not with_env:
2461            code = 'import sys; sys.exit(%s)' % self.exitcode
2462        else:
2463            self.env = dict(os.environ)
2464            # create an unique key
2465            self.key = str(uuid.uuid4())
2466            self.env[self.key] = self.key
2467            # read the variable from os.environ to check that it exists
2468            code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
2469                    % (self.key, self.exitcode))
2470
2471        with open(filename, "w") as fp:
2472            fp.write(code)
2473
2474        args = [sys.executable, filename]
2475        if use_bytes:
2476            args = [os.fsencode(a) for a in args]
2477            self.env = {os.fsencode(k): os.fsencode(v)
2478                        for k, v in self.env.items()}
2479
2480        return args
2481
2482    @requires_os_func('spawnl')
2483    def test_spawnl(self):
2484        args = self.create_args()
2485        exitcode = os.spawnl(os.P_WAIT, args[0], *args)
2486        self.assertEqual(exitcode, self.exitcode)
2487
2488    @requires_os_func('spawnle')
2489    def test_spawnle(self):
2490        args = self.create_args(with_env=True)
2491        exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
2492        self.assertEqual(exitcode, self.exitcode)
2493
2494    @requires_os_func('spawnlp')
2495    def test_spawnlp(self):
2496        args = self.create_args()
2497        exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
2498        self.assertEqual(exitcode, self.exitcode)
2499
2500    @requires_os_func('spawnlpe')
2501    def test_spawnlpe(self):
2502        args = self.create_args(with_env=True)
2503        exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
2504        self.assertEqual(exitcode, self.exitcode)
2505
2506    @requires_os_func('spawnv')
2507    def test_spawnv(self):
2508        args = self.create_args()
2509        exitcode = os.spawnv(os.P_WAIT, args[0], args)
2510        self.assertEqual(exitcode, self.exitcode)
2511
2512    @requires_os_func('spawnve')
2513    def test_spawnve(self):
2514        args = self.create_args(with_env=True)
2515        exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2516        self.assertEqual(exitcode, self.exitcode)
2517
2518    @requires_os_func('spawnvp')
2519    def test_spawnvp(self):
2520        args = self.create_args()
2521        exitcode = os.spawnvp(os.P_WAIT, args[0], args)
2522        self.assertEqual(exitcode, self.exitcode)
2523
2524    @requires_os_func('spawnvpe')
2525    def test_spawnvpe(self):
2526        args = self.create_args(with_env=True)
2527        exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
2528        self.assertEqual(exitcode, self.exitcode)
2529
2530    @requires_os_func('spawnv')
2531    def test_nowait(self):
2532        args = self.create_args()
2533        pid = os.spawnv(os.P_NOWAIT, args[0], args)
2534        result = os.waitpid(pid, 0)
2535        self.assertEqual(result[0], pid)
2536        status = result[1]
2537        if hasattr(os, 'WIFEXITED'):
2538            self.assertTrue(os.WIFEXITED(status))
2539            self.assertEqual(os.WEXITSTATUS(status), self.exitcode)
2540        else:
2541            self.assertEqual(status, self.exitcode << 8)
2542
2543    @requires_os_func('spawnve')
2544    def test_spawnve_bytes(self):
2545        # Test bytes handling in parse_arglist and parse_envlist (#28114)
2546        args = self.create_args(with_env=True, use_bytes=True)
2547        exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2548        self.assertEqual(exitcode, self.exitcode)
2549
2550    @requires_os_func('spawnl')
2551    def test_spawnl_noargs(self):
2552        args = self.create_args()
2553        self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
2554        self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
2555
2556    @requires_os_func('spawnle')
2557    def test_spawnle_noargs(self):
2558        args = self.create_args()
2559        self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
2560        self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
2561
2562    @requires_os_func('spawnv')
2563    def test_spawnv_noargs(self):
2564        args = self.create_args()
2565        self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
2566        self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
2567        self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
2568        self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
2569
2570    @requires_os_func('spawnve')
2571    def test_spawnve_noargs(self):
2572        args = self.create_args()
2573        self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
2574        self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
2575        self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
2576        self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
2577
2578    def _test_invalid_env(self, spawn):
2579        args = [sys.executable, '-c', 'pass']
2580
2581        # null character in the environment variable name
2582        newenv = os.environ.copy()
2583        newenv["FRUIT\0VEGETABLE"] = "cabbage"
2584        try:
2585            exitcode = spawn(os.P_WAIT, args[0], args, newenv)
2586        except ValueError:
2587            pass
2588        else:
2589            self.assertEqual(exitcode, 127)
2590
2591        # null character in the environment variable value
2592        newenv = os.environ.copy()
2593        newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
2594        try:
2595            exitcode = spawn(os.P_WAIT, args[0], args, newenv)
2596        except ValueError:
2597            pass
2598        else:
2599            self.assertEqual(exitcode, 127)
2600
2601        # equal character in the environment variable name
2602        newenv = os.environ.copy()
2603        newenv["FRUIT=ORANGE"] = "lemon"
2604        try:
2605            exitcode = spawn(os.P_WAIT, args[0], args, newenv)
2606        except ValueError:
2607            pass
2608        else:
2609            self.assertEqual(exitcode, 127)
2610
2611        # equal character in the environment variable value
2612        filename = support.TESTFN
2613        self.addCleanup(support.unlink, filename)
2614        with open(filename, "w") as fp:
2615            fp.write('import sys, os\n'
2616                     'if os.getenv("FRUIT") != "orange=lemon":\n'
2617                     '    raise AssertionError')
2618        args = [sys.executable, filename]
2619        newenv = os.environ.copy()
2620        newenv["FRUIT"] = "orange=lemon"
2621        exitcode = spawn(os.P_WAIT, args[0], args, newenv)
2622        self.assertEqual(exitcode, 0)
2623
2624    @requires_os_func('spawnve')
2625    def test_spawnve_invalid_env(self):
2626        self._test_invalid_env(os.spawnve)
2627
2628    @requires_os_func('spawnvpe')
2629    def test_spawnvpe_invalid_env(self):
2630        self._test_invalid_env(os.spawnvpe)
2631
2632
2633# The introduction of this TestCase caused at least two different errors on
2634# *nix buildbots. Temporarily skip this to let the buildbots move along.
2635@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
2636@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2637class LoginTests(unittest.TestCase):
2638    def test_getlogin(self):
2639        user_name = os.getlogin()
2640        self.assertNotEqual(len(user_name), 0)
2641
2642
2643@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2644                     "needs os.getpriority and os.setpriority")
2645class ProgramPriorityTests(unittest.TestCase):
2646    """Tests for os.getpriority() and os.setpriority()."""
2647
2648    def test_set_get_priority(self):
2649
2650        base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2651        os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2652        try:
2653            new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2654            if base >= 19 and new_prio <= 19:
2655                raise unittest.SkipTest("unable to reliably test setpriority "
2656                                        "at current nice level of %s" % base)
2657            else:
2658                self.assertEqual(new_prio, base + 1)
2659        finally:
2660            try:
2661                os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2662            except OSError as err:
2663                if err.errno != errno.EACCES:
2664                    raise
2665
2666
2667class SendfileTestServer(asyncore.dispatcher, threading.Thread):
2668
2669    class Handler(asynchat.async_chat):
2670
2671        def __init__(self, conn):
2672            asynchat.async_chat.__init__(self, conn)
2673            self.in_buffer = []
2674            self.accumulate = True
2675            self.closed = False
2676            self.push(b"220 ready\r\n")
2677
2678        def handle_read(self):
2679            data = self.recv(4096)
2680            if self.accumulate:
2681                self.in_buffer.append(data)
2682
2683        def get_data(self):
2684            return b''.join(self.in_buffer)
2685
2686        def handle_close(self):
2687            self.close()
2688            self.closed = True
2689
2690        def handle_error(self):
2691            raise
2692
2693    def __init__(self, address):
2694        threading.Thread.__init__(self)
2695        asyncore.dispatcher.__init__(self)
2696        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2697        self.bind(address)
2698        self.listen(5)
2699        self.host, self.port = self.socket.getsockname()[:2]
2700        self.handler_instance = None
2701        self._active = False
2702        self._active_lock = threading.Lock()
2703
2704    # --- public API
2705
2706    @property
2707    def running(self):
2708        return self._active
2709
2710    def start(self):
2711        assert not self.running
2712        self.__flag = threading.Event()
2713        threading.Thread.start(self)
2714        self.__flag.wait()
2715
2716    def stop(self):
2717        assert self.running
2718        self._active = False
2719        self.join()
2720
2721    def wait(self):
2722        # wait for handler connection to be closed, then stop the server
2723        while not getattr(self.handler_instance, "closed", False):
2724            time.sleep(0.001)
2725        self.stop()
2726
2727    # --- internals
2728
2729    def run(self):
2730        self._active = True
2731        self.__flag.set()
2732        while self._active and asyncore.socket_map:
2733            self._active_lock.acquire()
2734            asyncore.loop(timeout=0.001, count=1)
2735            self._active_lock.release()
2736        asyncore.close_all()
2737
2738    def handle_accept(self):
2739        conn, addr = self.accept()
2740        self.handler_instance = self.Handler(conn)
2741
2742    def handle_connect(self):
2743        self.close()
2744    handle_read = handle_connect
2745
2746    def writable(self):
2747        return 0
2748
2749    def handle_error(self):
2750        raise
2751
2752
2753@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2754class TestSendfile(unittest.TestCase):
2755
2756    DATA = b"12345abcde" * 16 * 1024  # 160 KiB
2757    SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
2758                               not sys.platform.startswith("solaris") and \
2759                               not sys.platform.startswith("sunos")
2760    requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2761            'requires headers and trailers support')
2762    requires_32b = unittest.skipUnless(sys.maxsize < 2**32,
2763            'test is only meaningful on 32-bit builds')
2764
2765    @classmethod
2766    def setUpClass(cls):
2767        cls.key = support.threading_setup()
2768        create_file(support.TESTFN, cls.DATA)
2769
2770    @classmethod
2771    def tearDownClass(cls):
2772        support.threading_cleanup(*cls.key)
2773        support.unlink(support.TESTFN)
2774
2775    def setUp(self):
2776        self.server = SendfileTestServer((support.HOST, 0))
2777        self.server.start()
2778        self.client = socket.socket()
2779        self.client.connect((self.server.host, self.server.port))
2780        self.client.settimeout(1)
2781        # synchronize by waiting for "220 ready" response
2782        self.client.recv(1024)
2783        self.sockno = self.client.fileno()
2784        self.file = open(support.TESTFN, 'rb')
2785        self.fileno = self.file.fileno()
2786
2787    def tearDown(self):
2788        self.file.close()
2789        self.client.close()
2790        if self.server.running:
2791            self.server.stop()
2792        self.server = None
2793
2794    def sendfile_wrapper(self, *args, **kwargs):
2795        """A higher level wrapper representing how an application is
2796        supposed to use sendfile().
2797        """
2798        while True:
2799            try:
2800                return os.sendfile(*args, **kwargs)
2801            except OSError as err:
2802                if err.errno == errno.ECONNRESET:
2803                    # disconnected
2804                    raise
2805                elif err.errno in (errno.EAGAIN, errno.EBUSY):
2806                    # we have to retry send data
2807                    continue
2808                else:
2809                    raise
2810
2811    def test_send_whole_file(self):
2812        # normal send
2813        total_sent = 0
2814        offset = 0
2815        nbytes = 4096
2816        while total_sent < len(self.DATA):
2817            sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2818            if sent == 0:
2819                break
2820            offset += sent
2821            total_sent += sent
2822            self.assertTrue(sent <= nbytes)
2823            self.assertEqual(offset, total_sent)
2824
2825        self.assertEqual(total_sent, len(self.DATA))
2826        self.client.shutdown(socket.SHUT_RDWR)
2827        self.client.close()
2828        self.server.wait()
2829        data = self.server.handler_instance.get_data()
2830        self.assertEqual(len(data), len(self.DATA))
2831        self.assertEqual(data, self.DATA)
2832
2833    def test_send_at_certain_offset(self):
2834        # start sending a file at a certain offset
2835        total_sent = 0
2836        offset = len(self.DATA) // 2
2837        must_send = len(self.DATA) - offset
2838        nbytes = 4096
2839        while total_sent < must_send:
2840            sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2841            if sent == 0:
2842                break
2843            offset += sent
2844            total_sent += sent
2845            self.assertTrue(sent <= nbytes)
2846
2847        self.client.shutdown(socket.SHUT_RDWR)
2848        self.client.close()
2849        self.server.wait()
2850        data = self.server.handler_instance.get_data()
2851        expected = self.DATA[len(self.DATA) // 2:]
2852        self.assertEqual(total_sent, len(expected))
2853        self.assertEqual(len(data), len(expected))
2854        self.assertEqual(data, expected)
2855
2856    def test_offset_overflow(self):
2857        # specify an offset > file size
2858        offset = len(self.DATA) + 4096
2859        try:
2860            sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
2861        except OSError as e:
2862            # Solaris can raise EINVAL if offset >= file length, ignore.
2863            if e.errno != errno.EINVAL:
2864                raise
2865        else:
2866            self.assertEqual(sent, 0)
2867        self.client.shutdown(socket.SHUT_RDWR)
2868        self.client.close()
2869        self.server.wait()
2870        data = self.server.handler_instance.get_data()
2871        self.assertEqual(data, b'')
2872
2873    def test_invalid_offset(self):
2874        with self.assertRaises(OSError) as cm:
2875            os.sendfile(self.sockno, self.fileno, -1, 4096)
2876        self.assertEqual(cm.exception.errno, errno.EINVAL)
2877
2878    def test_keywords(self):
2879        # Keyword arguments should be supported
2880        os.sendfile(out=self.sockno, offset=0, count=4096,
2881            **{'in': self.fileno})
2882        if self.SUPPORT_HEADERS_TRAILERS:
2883            os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
2884                headers=(), trailers=(), flags=0)
2885
2886    # --- headers / trailers tests
2887
2888    @requires_headers_trailers
2889    def test_headers(self):
2890        total_sent = 0
2891        expected_data = b"x" * 512 + b"y" * 256 + self.DATA[:-1]
2892        sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
2893                            headers=[b"x" * 512, b"y" * 256])
2894        self.assertLessEqual(sent, 512 + 256 + 4096)
2895        total_sent += sent
2896        offset = 4096
2897        while total_sent < len(expected_data):
2898            nbytes = min(len(expected_data) - total_sent, 4096)
2899            sent = self.sendfile_wrapper(self.sockno, self.fileno,
2900                                                    offset, nbytes)
2901            if sent == 0:
2902                break
2903            self.assertLessEqual(sent, nbytes)
2904            total_sent += sent
2905            offset += sent
2906
2907        self.assertEqual(total_sent, len(expected_data))
2908        self.client.close()
2909        self.server.wait()
2910        data = self.server.handler_instance.get_data()
2911        self.assertEqual(hash(data), hash(expected_data))
2912
2913    @requires_headers_trailers
2914    def test_trailers(self):
2915        TESTFN2 = support.TESTFN + "2"
2916        file_data = b"abcdef"
2917
2918        self.addCleanup(support.unlink, TESTFN2)
2919        create_file(TESTFN2, file_data)
2920
2921        with open(TESTFN2, 'rb') as f:
2922            os.sendfile(self.sockno, f.fileno(), 0, 5,
2923                        trailers=[b"123456", b"789"])
2924            self.client.close()
2925            self.server.wait()
2926            data = self.server.handler_instance.get_data()
2927            self.assertEqual(data, b"abcde123456789")
2928
2929    @requires_headers_trailers
2930    @requires_32b
2931    def test_headers_overflow_32bits(self):
2932        self.server.handler_instance.accumulate = False
2933        with self.assertRaises(OSError) as cm:
2934            os.sendfile(self.sockno, self.fileno, 0, 0,
2935                        headers=[b"x" * 2**16] * 2**15)
2936        self.assertEqual(cm.exception.errno, errno.EINVAL)
2937
2938    @requires_headers_trailers
2939    @requires_32b
2940    def test_trailers_overflow_32bits(self):
2941        self.server.handler_instance.accumulate = False
2942        with self.assertRaises(OSError) as cm:
2943            os.sendfile(self.sockno, self.fileno, 0, 0,
2944                        trailers=[b"x" * 2**16] * 2**15)
2945        self.assertEqual(cm.exception.errno, errno.EINVAL)
2946
2947    @requires_headers_trailers
2948    @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
2949                         'test needs os.SF_NODISKIO')
2950    def test_flags(self):
2951        try:
2952            os.sendfile(self.sockno, self.fileno, 0, 4096,
2953                        flags=os.SF_NODISKIO)
2954        except OSError as err:
2955            if err.errno not in (errno.EBUSY, errno.EAGAIN):
2956                raise
2957
2958
2959def supports_extended_attributes():
2960    if not hasattr(os, "setxattr"):
2961        return False
2962
2963    try:
2964        with open(support.TESTFN, "xb", 0) as fp:
2965            try:
2966                os.setxattr(fp.fileno(), b"user.test", b"")
2967            except OSError:
2968                return False
2969    finally:
2970        support.unlink(support.TESTFN)
2971
2972    return True
2973
2974
2975@unittest.skipUnless(supports_extended_attributes(),
2976                     "no non-broken extended attribute support")
2977# Kernels < 2.6.39 don't respect setxattr flags.
2978@support.requires_linux_version(2, 6, 39)
2979class ExtendedAttributeTests(unittest.TestCase):
2980
2981    def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
2982        fn = support.TESTFN
2983        self.addCleanup(support.unlink, fn)
2984        create_file(fn)
2985
2986        with self.assertRaises(OSError) as cm:
2987            getxattr(fn, s("user.test"), **kwargs)
2988        self.assertEqual(cm.exception.errno, errno.ENODATA)
2989
2990        init_xattr = listxattr(fn)
2991        self.assertIsInstance(init_xattr, list)
2992
2993        setxattr(fn, s("user.test"), b"", **kwargs)
2994        xattr = set(init_xattr)
2995        xattr.add("user.test")
2996        self.assertEqual(set(listxattr(fn)), xattr)
2997        self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
2998        setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
2999        self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
3000
3001        with self.assertRaises(OSError) as cm:
3002            setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
3003        self.assertEqual(cm.exception.errno, errno.EEXIST)
3004
3005        with self.assertRaises(OSError) as cm:
3006            setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
3007        self.assertEqual(cm.exception.errno, errno.ENODATA)
3008
3009        setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
3010        xattr.add("user.test2")
3011        self.assertEqual(set(listxattr(fn)), xattr)
3012        removexattr(fn, s("user.test"), **kwargs)
3013
3014        with self.assertRaises(OSError) as cm:
3015            getxattr(fn, s("user.test"), **kwargs)
3016        self.assertEqual(cm.exception.errno, errno.ENODATA)
3017
3018        xattr.remove("user.test")
3019        self.assertEqual(set(listxattr(fn)), xattr)
3020        self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
3021        setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
3022        self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
3023        removexattr(fn, s("user.test"), **kwargs)
3024        many = sorted("user.test{}".format(i) for i in range(100))
3025        for thing in many:
3026            setxattr(fn, thing, b"x", **kwargs)
3027        self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
3028
3029    def _check_xattrs(self, *args, **kwargs):
3030        self._check_xattrs_str(str, *args, **kwargs)
3031        support.unlink(support.TESTFN)
3032
3033        self._check_xattrs_str(os.fsencode, *args, **kwargs)
3034        support.unlink(support.TESTFN)
3035
3036    def test_simple(self):
3037        self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3038                           os.listxattr)
3039
3040    def test_lpath(self):
3041        self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3042                           os.listxattr, follow_symlinks=False)
3043
3044    def test_fds(self):
3045        def getxattr(path, *args):
3046            with open(path, "rb") as fp:
3047                return os.getxattr(fp.fileno(), *args)
3048        def setxattr(path, *args):
3049            with open(path, "wb", 0) as fp:
3050                os.setxattr(fp.fileno(), *args)
3051        def removexattr(path, *args):
3052            with open(path, "wb", 0) as fp:
3053                os.removexattr(fp.fileno(), *args)
3054        def listxattr(path, *args):
3055            with open(path, "rb") as fp:
3056                return os.listxattr(fp.fileno(), *args)
3057        self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
3058
3059
3060@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
3061class TermsizeTests(unittest.TestCase):
3062    def test_does_not_crash(self):
3063        """Check if get_terminal_size() returns a meaningful value.
3064
3065        There's no easy portable way to actually check the size of the
3066        terminal, so let's check if it returns something sensible instead.
3067        """
3068        try:
3069            size = os.get_terminal_size()
3070        except OSError as e:
3071            if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
3072                # Under win32 a generic OSError can be thrown if the
3073                # handle cannot be retrieved
3074                self.skipTest("failed to query terminal size")
3075            raise
3076
3077        self.assertGreaterEqual(size.columns, 0)
3078        self.assertGreaterEqual(size.lines, 0)
3079
3080    def test_stty_match(self):
3081        """Check if stty returns the same results
3082
3083        stty actually tests stdin, so get_terminal_size is invoked on
3084        stdin explicitly. If stty succeeded, then get_terminal_size()
3085        should work too.
3086        """
3087        try:
3088            size = subprocess.check_output(['stty', 'size']).decode().split()
3089        except (FileNotFoundError, subprocess.CalledProcessError,
3090                PermissionError):
3091            self.skipTest("stty invocation failed")
3092        expected = (int(size[1]), int(size[0])) # reversed order
3093
3094        try:
3095            actual = os.get_terminal_size(sys.__stdin__.fileno())
3096        except OSError as e:
3097            if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
3098                # Under win32 a generic OSError can be thrown if the
3099                # handle cannot be retrieved
3100                self.skipTest("failed to query terminal size")
3101            raise
3102        self.assertEqual(expected, actual)
3103
3104
3105class OSErrorTests(unittest.TestCase):
3106    def setUp(self):
3107        class Str(str):
3108            pass
3109
3110        self.bytes_filenames = []
3111        self.unicode_filenames = []
3112        if support.TESTFN_UNENCODABLE is not None:
3113            decoded = support.TESTFN_UNENCODABLE
3114        else:
3115            decoded = support.TESTFN
3116        self.unicode_filenames.append(decoded)
3117        self.unicode_filenames.append(Str(decoded))
3118        if support.TESTFN_UNDECODABLE is not None:
3119            encoded = support.TESTFN_UNDECODABLE
3120        else:
3121            encoded = os.fsencode(support.TESTFN)
3122        self.bytes_filenames.append(encoded)
3123        self.bytes_filenames.append(bytearray(encoded))
3124        self.bytes_filenames.append(memoryview(encoded))
3125
3126        self.filenames = self.bytes_filenames + self.unicode_filenames
3127
3128    def test_oserror_filename(self):
3129        funcs = [
3130            (self.filenames, os.chdir,),
3131            (self.filenames, os.chmod, 0o777),
3132            (self.filenames, os.lstat,),
3133            (self.filenames, os.open, os.O_RDONLY),
3134            (self.filenames, os.rmdir,),
3135            (self.filenames, os.stat,),
3136            (self.filenames, os.unlink,),
3137        ]
3138        if sys.platform == "win32":
3139            funcs.extend((
3140                (self.bytes_filenames, os.rename, b"dst"),
3141                (self.bytes_filenames, os.replace, b"dst"),
3142                (self.unicode_filenames, os.rename, "dst"),
3143                (self.unicode_filenames, os.replace, "dst"),
3144                (self.unicode_filenames, os.listdir, ),
3145            ))
3146        else:
3147            funcs.extend((
3148                (self.filenames, os.listdir,),
3149                (self.filenames, os.rename, "dst"),
3150                (self.filenames, os.replace, "dst"),
3151            ))
3152        if hasattr(os, "chown"):
3153            funcs.append((self.filenames, os.chown, 0, 0))
3154        if hasattr(os, "lchown"):
3155            funcs.append((self.filenames, os.lchown, 0, 0))
3156        if hasattr(os, "truncate"):
3157            funcs.append((self.filenames, os.truncate, 0))
3158        if hasattr(os, "chflags"):
3159            funcs.append((self.filenames, os.chflags, 0))
3160        if hasattr(os, "lchflags"):
3161            funcs.append((self.filenames, os.lchflags, 0))
3162        if hasattr(os, "chroot"):
3163            funcs.append((self.filenames, os.chroot,))
3164        if hasattr(os, "link"):
3165            if sys.platform == "win32":
3166                funcs.append((self.bytes_filenames, os.link, b"dst"))
3167                funcs.append((self.unicode_filenames, os.link, "dst"))
3168            else:
3169                funcs.append((self.filenames, os.link, "dst"))
3170        if hasattr(os, "listxattr"):
3171            funcs.extend((
3172                (self.filenames, os.listxattr,),
3173                (self.filenames, os.getxattr, "user.test"),
3174                (self.filenames, os.setxattr, "user.test", b'user'),
3175                (self.filenames, os.removexattr, "user.test"),
3176            ))
3177        if hasattr(os, "lchmod"):
3178            funcs.append((self.filenames, os.lchmod, 0o777))
3179        if hasattr(os, "readlink"):
3180            if sys.platform == "win32":
3181                funcs.append((self.unicode_filenames, os.readlink,))
3182            else:
3183                funcs.append((self.filenames, os.readlink,))
3184
3185
3186        for filenames, func, *func_args in funcs:
3187            for name in filenames:
3188                try:
3189                    if isinstance(name, (str, bytes)):
3190                        func(name, *func_args)
3191                    else:
3192                        with self.assertWarnsRegex(DeprecationWarning, 'should be'):
3193                            func(name, *func_args)
3194                except OSError as err:
3195                    self.assertIs(err.filename, name, str(func))
3196                except UnicodeDecodeError:
3197                    pass
3198                else:
3199                    self.fail("No exception thrown by {}".format(func))
3200
3201class CPUCountTests(unittest.TestCase):
3202    def test_cpu_count(self):
3203        cpus = os.cpu_count()
3204        if cpus is not None:
3205            self.assertIsInstance(cpus, int)
3206            self.assertGreater(cpus, 0)
3207        else:
3208            self.skipTest("Could not determine the number of CPUs")
3209
3210
3211class FDInheritanceTests(unittest.TestCase):
3212    def test_get_set_inheritable(self):
3213        fd = os.open(__file__, os.O_RDONLY)
3214        self.addCleanup(os.close, fd)
3215        self.assertEqual(os.get_inheritable(fd), False)
3216
3217        os.set_inheritable(fd, True)
3218        self.assertEqual(os.get_inheritable(fd), True)
3219
3220    @unittest.skipIf(fcntl is None, "need fcntl")
3221    def test_get_inheritable_cloexec(self):
3222        fd = os.open(__file__, os.O_RDONLY)
3223        self.addCleanup(os.close, fd)
3224        self.assertEqual(os.get_inheritable(fd), False)
3225
3226        # clear FD_CLOEXEC flag
3227        flags = fcntl.fcntl(fd, fcntl.F_GETFD)
3228        flags &= ~fcntl.FD_CLOEXEC
3229        fcntl.fcntl(fd, fcntl.F_SETFD, flags)
3230
3231        self.assertEqual(os.get_inheritable(fd), True)
3232
3233    @unittest.skipIf(fcntl is None, "need fcntl")
3234    def test_set_inheritable_cloexec(self):
3235        fd = os.open(__file__, os.O_RDONLY)
3236        self.addCleanup(os.close, fd)
3237        self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3238                         fcntl.FD_CLOEXEC)
3239
3240        os.set_inheritable(fd, True)
3241        self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3242                         0)
3243
3244    def test_open(self):
3245        fd = os.open(__file__, os.O_RDONLY)
3246        self.addCleanup(os.close, fd)
3247        self.assertEqual(os.get_inheritable(fd), False)
3248
3249    @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
3250    def test_pipe(self):
3251        rfd, wfd = os.pipe()
3252        self.addCleanup(os.close, rfd)
3253        self.addCleanup(os.close, wfd)
3254        self.assertEqual(os.get_inheritable(rfd), False)
3255        self.assertEqual(os.get_inheritable(wfd), False)
3256
3257    def test_dup(self):
3258        fd1 = os.open(__file__, os.O_RDONLY)
3259        self.addCleanup(os.close, fd1)
3260
3261        fd2 = os.dup(fd1)
3262        self.addCleanup(os.close, fd2)
3263        self.assertEqual(os.get_inheritable(fd2), False)
3264
3265    def test_dup_standard_stream(self):
3266        fd = os.dup(1)
3267        self.addCleanup(os.close, fd)
3268        self.assertGreater(fd, 0)
3269
3270    @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
3271    def test_dup_nul(self):
3272        # os.dup() was creating inheritable fds for character files.
3273        fd1 = os.open('NUL', os.O_RDONLY)
3274        self.addCleanup(os.close, fd1)
3275        fd2 = os.dup(fd1)
3276        self.addCleanup(os.close, fd2)
3277        self.assertFalse(os.get_inheritable(fd2))
3278
3279    @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
3280    def test_dup2(self):
3281        fd = os.open(__file__, os.O_RDONLY)
3282        self.addCleanup(os.close, fd)
3283
3284        # inheritable by default
3285        fd2 = os.open(__file__, os.O_RDONLY)
3286        self.addCleanup(os.close, fd2)
3287        self.assertEqual(os.dup2(fd, fd2), fd2)
3288        self.assertTrue(os.get_inheritable(fd2))
3289
3290        # force non-inheritable
3291        fd3 = os.open(__file__, os.O_RDONLY)
3292        self.addCleanup(os.close, fd3)
3293        self.assertEqual(os.dup2(fd, fd3, inheritable=False), fd3)
3294        self.assertFalse(os.get_inheritable(fd3))
3295
3296    @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
3297    def test_openpty(self):
3298        master_fd, slave_fd = os.openpty()
3299        self.addCleanup(os.close, master_fd)
3300        self.addCleanup(os.close, slave_fd)
3301        self.assertEqual(os.get_inheritable(master_fd), False)
3302        self.assertEqual(os.get_inheritable(slave_fd), False)
3303
3304
3305class PathTConverterTests(unittest.TestCase):
3306    # tuples of (function name, allows fd arguments, additional arguments to
3307    # function, cleanup function)
3308    functions = [
3309        ('stat', True, (), None),
3310        ('lstat', False, (), None),
3311        ('access', False, (os.F_OK,), None),
3312        ('chflags', False, (0,), None),
3313        ('lchflags', False, (0,), None),
3314        ('open', False, (0,), getattr(os, 'close', None)),
3315    ]
3316
3317    def test_path_t_converter(self):
3318        str_filename = support.TESTFN
3319        if os.name == 'nt':
3320            bytes_fspath = bytes_filename = None
3321        else:
3322            bytes_filename = support.TESTFN.encode('ascii')
3323            bytes_fspath = FakePath(bytes_filename)
3324        fd = os.open(FakePath(str_filename), os.O_WRONLY|os.O_CREAT)
3325        self.addCleanup(support.unlink, support.TESTFN)
3326        self.addCleanup(os.close, fd)
3327
3328        int_fspath = FakePath(fd)
3329        str_fspath = FakePath(str_filename)
3330
3331        for name, allow_fd, extra_args, cleanup_fn in self.functions:
3332            with self.subTest(name=name):
3333                try:
3334                    fn = getattr(os, name)
3335                except AttributeError:
3336                    continue
3337
3338                for path in (str_filename, bytes_filename, str_fspath,
3339                             bytes_fspath):
3340                    if path is None:
3341                        continue
3342                    with self.subTest(name=name, path=path):
3343                        result = fn(path, *extra_args)
3344                        if cleanup_fn is not None:
3345                            cleanup_fn(result)
3346
3347                with self.assertRaisesRegex(
3348                        TypeError, 'to return str or bytes'):
3349                    fn(int_fspath, *extra_args)
3350
3351                if allow_fd:
3352                    result = fn(fd, *extra_args)  # should not fail
3353                    if cleanup_fn is not None:
3354                        cleanup_fn(result)
3355                else:
3356                    with self.assertRaisesRegex(
3357                            TypeError,
3358                            'os.PathLike'):
3359                        fn(fd, *extra_args)
3360
3361    def test_path_t_converter_and_custom_class(self):
3362        msg = r'__fspath__\(\) to return str or bytes, not %s'
3363        with self.assertRaisesRegex(TypeError, msg % r'int'):
3364            os.stat(FakePath(2))
3365        with self.assertRaisesRegex(TypeError, msg % r'float'):
3366            os.stat(FakePath(2.34))
3367        with self.assertRaisesRegex(TypeError, msg % r'object'):
3368            os.stat(FakePath(object()))
3369
3370
3371@unittest.skipUnless(hasattr(os, 'get_blocking'),
3372                     'needs os.get_blocking() and os.set_blocking()')
3373class BlockingTests(unittest.TestCase):
3374    def test_blocking(self):
3375        fd = os.open(__file__, os.O_RDONLY)
3376        self.addCleanup(os.close, fd)
3377        self.assertEqual(os.get_blocking(fd), True)
3378
3379        os.set_blocking(fd, False)
3380        self.assertEqual(os.get_blocking(fd), False)
3381
3382        os.set_blocking(fd, True)
3383        self.assertEqual(os.get_blocking(fd), True)
3384
3385
3386
3387class ExportsTests(unittest.TestCase):
3388    def test_os_all(self):
3389        self.assertIn('open', os.__all__)
3390        self.assertIn('walk', os.__all__)
3391
3392
3393class TestScandir(unittest.TestCase):
3394    check_no_resource_warning = support.check_no_resource_warning
3395
3396    def setUp(self):
3397        self.path = os.path.realpath(support.TESTFN)
3398        self.bytes_path = os.fsencode(self.path)
3399        self.addCleanup(support.rmtree, self.path)
3400        os.mkdir(self.path)
3401
3402    def create_file(self, name="file.txt"):
3403        path = self.bytes_path if isinstance(name, bytes) else self.path
3404        filename = os.path.join(path, name)
3405        create_file(filename, b'python')
3406        return filename
3407
3408    def get_entries(self, names):
3409        entries = dict((entry.name, entry)
3410                       for entry in os.scandir(self.path))
3411        self.assertEqual(sorted(entries.keys()), names)
3412        return entries
3413
3414    def assert_stat_equal(self, stat1, stat2, skip_fields):
3415        if skip_fields:
3416            for attr in dir(stat1):
3417                if not attr.startswith("st_"):
3418                    continue
3419                if attr in ("st_dev", "st_ino", "st_nlink"):
3420                    continue
3421                self.assertEqual(getattr(stat1, attr),
3422                                 getattr(stat2, attr),
3423                                 (stat1, stat2, attr))
3424        else:
3425            self.assertEqual(stat1, stat2)
3426
3427    def check_entry(self, entry, name, is_dir, is_file, is_symlink):
3428        self.assertIsInstance(entry, os.DirEntry)
3429        self.assertEqual(entry.name, name)
3430        self.assertEqual(entry.path, os.path.join(self.path, name))
3431        self.assertEqual(entry.inode(),
3432                         os.stat(entry.path, follow_symlinks=False).st_ino)
3433
3434        entry_stat = os.stat(entry.path)
3435        self.assertEqual(entry.is_dir(),
3436                         stat.S_ISDIR(entry_stat.st_mode))
3437        self.assertEqual(entry.is_file(),
3438                         stat.S_ISREG(entry_stat.st_mode))
3439        self.assertEqual(entry.is_symlink(),
3440                         os.path.islink(entry.path))
3441
3442        entry_lstat = os.stat(entry.path, follow_symlinks=False)
3443        self.assertEqual(entry.is_dir(follow_symlinks=False),
3444                         stat.S_ISDIR(entry_lstat.st_mode))
3445        self.assertEqual(entry.is_file(follow_symlinks=False),
3446                         stat.S_ISREG(entry_lstat.st_mode))
3447
3448        self.assert_stat_equal(entry.stat(),
3449                               entry_stat,
3450                               os.name == 'nt' and not is_symlink)
3451        self.assert_stat_equal(entry.stat(follow_symlinks=False),
3452                               entry_lstat,
3453                               os.name == 'nt')
3454
3455    def test_attributes(self):
3456        link = hasattr(os, 'link')
3457        symlink = support.can_symlink()
3458
3459        dirname = os.path.join(self.path, "dir")
3460        os.mkdir(dirname)
3461        filename = self.create_file("file.txt")
3462        if link:
3463            try:
3464                os.link(filename, os.path.join(self.path, "link_file.txt"))
3465            except PermissionError as e:
3466                self.skipTest('os.link(): %s' % e)
3467        if symlink:
3468            os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
3469                       target_is_directory=True)
3470            os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
3471
3472        names = ['dir', 'file.txt']
3473        if link:
3474            names.append('link_file.txt')
3475        if symlink:
3476            names.extend(('symlink_dir', 'symlink_file.txt'))
3477        entries = self.get_entries(names)
3478
3479        entry = entries['dir']
3480        self.check_entry(entry, 'dir', True, False, False)
3481
3482        entry = entries['file.txt']
3483        self.check_entry(entry, 'file.txt', False, True, False)
3484
3485        if link:
3486            entry = entries['link_file.txt']
3487            self.check_entry(entry, 'link_file.txt', False, True, False)
3488
3489        if symlink:
3490            entry = entries['symlink_dir']
3491            self.check_entry(entry, 'symlink_dir', True, False, True)
3492
3493            entry = entries['symlink_file.txt']
3494            self.check_entry(entry, 'symlink_file.txt', False, True, True)
3495
3496    def get_entry(self, name):
3497        path = self.bytes_path if isinstance(name, bytes) else self.path
3498        entries = list(os.scandir(path))
3499        self.assertEqual(len(entries), 1)
3500
3501        entry = entries[0]
3502        self.assertEqual(entry.name, name)
3503        return entry
3504
3505    def create_file_entry(self, name='file.txt'):
3506        filename = self.create_file(name=name)
3507        return self.get_entry(os.path.basename(filename))
3508
3509    def test_current_directory(self):
3510        filename = self.create_file()
3511        old_dir = os.getcwd()
3512        try:
3513            os.chdir(self.path)
3514
3515            # call scandir() without parameter: it must list the content
3516            # of the current directory
3517            entries = dict((entry.name, entry) for entry in os.scandir())
3518            self.assertEqual(sorted(entries.keys()),
3519                             [os.path.basename(filename)])
3520        finally:
3521            os.chdir(old_dir)
3522
3523    def test_repr(self):
3524        entry = self.create_file_entry()
3525        self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
3526
3527    def test_fspath_protocol(self):
3528        entry = self.create_file_entry()
3529        self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
3530
3531    def test_fspath_protocol_bytes(self):
3532        bytes_filename = os.fsencode('bytesfile.txt')
3533        bytes_entry = self.create_file_entry(name=bytes_filename)
3534        fspath = os.fspath(bytes_entry)
3535        self.assertIsInstance(fspath, bytes)
3536        self.assertEqual(fspath,
3537                         os.path.join(os.fsencode(self.path),bytes_filename))
3538
3539    def test_removed_dir(self):
3540        path = os.path.join(self.path, 'dir')
3541
3542        os.mkdir(path)
3543        entry = self.get_entry('dir')
3544        os.rmdir(path)
3545
3546        # On POSIX, is_dir() result depends if scandir() filled d_type or not
3547        if os.name == 'nt':
3548            self.assertTrue(entry.is_dir())
3549        self.assertFalse(entry.is_file())
3550        self.assertFalse(entry.is_symlink())
3551        if os.name == 'nt':
3552            self.assertRaises(FileNotFoundError, entry.inode)
3553            # don't fail
3554            entry.stat()
3555            entry.stat(follow_symlinks=False)
3556        else:
3557            self.assertGreater(entry.inode(), 0)
3558            self.assertRaises(FileNotFoundError, entry.stat)
3559            self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3560
3561    def test_removed_file(self):
3562        entry = self.create_file_entry()
3563        os.unlink(entry.path)
3564
3565        self.assertFalse(entry.is_dir())
3566        # On POSIX, is_dir() result depends if scandir() filled d_type or not
3567        if os.name == 'nt':
3568            self.assertTrue(entry.is_file())
3569        self.assertFalse(entry.is_symlink())
3570        if os.name == 'nt':
3571            self.assertRaises(FileNotFoundError, entry.inode)
3572            # don't fail
3573            entry.stat()
3574            entry.stat(follow_symlinks=False)
3575        else:
3576            self.assertGreater(entry.inode(), 0)
3577            self.assertRaises(FileNotFoundError, entry.stat)
3578            self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3579
3580    def test_broken_symlink(self):
3581        if not support.can_symlink():
3582            return self.skipTest('cannot create symbolic link')
3583
3584        filename = self.create_file("file.txt")
3585        os.symlink(filename,
3586                   os.path.join(self.path, "symlink.txt"))
3587        entries = self.get_entries(['file.txt', 'symlink.txt'])
3588        entry = entries['symlink.txt']
3589        os.unlink(filename)
3590
3591        self.assertGreater(entry.inode(), 0)
3592        self.assertFalse(entry.is_dir())
3593        self.assertFalse(entry.is_file())  # broken symlink returns False
3594        self.assertFalse(entry.is_dir(follow_symlinks=False))
3595        self.assertFalse(entry.is_file(follow_symlinks=False))
3596        self.assertTrue(entry.is_symlink())
3597        self.assertRaises(FileNotFoundError, entry.stat)
3598        # don't fail
3599        entry.stat(follow_symlinks=False)
3600
3601    def test_bytes(self):
3602        self.create_file("file.txt")
3603
3604        path_bytes = os.fsencode(self.path)
3605        entries = list(os.scandir(path_bytes))
3606        self.assertEqual(len(entries), 1, entries)
3607        entry = entries[0]
3608
3609        self.assertEqual(entry.name, b'file.txt')
3610        self.assertEqual(entry.path,
3611                         os.fsencode(os.path.join(self.path, 'file.txt')))
3612
3613    def test_bytes_like(self):
3614        self.create_file("file.txt")
3615
3616        for cls in bytearray, memoryview:
3617            path_bytes = cls(os.fsencode(self.path))
3618            with self.assertWarns(DeprecationWarning):
3619                entries = list(os.scandir(path_bytes))
3620            self.assertEqual(len(entries), 1, entries)
3621            entry = entries[0]
3622
3623            self.assertEqual(entry.name, b'file.txt')
3624            self.assertEqual(entry.path,
3625                             os.fsencode(os.path.join(self.path, 'file.txt')))
3626            self.assertIs(type(entry.name), bytes)
3627            self.assertIs(type(entry.path), bytes)
3628
3629    @unittest.skipUnless(os.listdir in os.supports_fd,
3630                         'fd support for listdir required for this test.')
3631    def test_fd(self):
3632        self.assertIn(os.scandir, os.supports_fd)
3633        self.create_file('file.txt')
3634        expected_names = ['file.txt']
3635        if support.can_symlink():
3636            os.symlink('file.txt', os.path.join(self.path, 'link'))
3637            expected_names.append('link')
3638
3639        fd = os.open(self.path, os.O_RDONLY)
3640        try:
3641            with os.scandir(fd) as it:
3642                entries = list(it)
3643            names = [entry.name for entry in entries]
3644            self.assertEqual(sorted(names), expected_names)
3645            self.assertEqual(names, os.listdir(fd))
3646            for entry in entries:
3647                self.assertEqual(entry.path, entry.name)
3648                self.assertEqual(os.fspath(entry), entry.name)
3649                self.assertEqual(entry.is_symlink(), entry.name == 'link')
3650                if os.stat in os.supports_dir_fd:
3651                    st = os.stat(entry.name, dir_fd=fd)
3652                    self.assertEqual(entry.stat(), st)
3653                    st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False)
3654                    self.assertEqual(entry.stat(follow_symlinks=False), st)
3655        finally:
3656            os.close(fd)
3657
3658    def test_empty_path(self):
3659        self.assertRaises(FileNotFoundError, os.scandir, '')
3660
3661    def test_consume_iterator_twice(self):
3662        self.create_file("file.txt")
3663        iterator = os.scandir(self.path)
3664
3665        entries = list(iterator)
3666        self.assertEqual(len(entries), 1, entries)
3667
3668        # check than consuming the iterator twice doesn't raise exception
3669        entries2 = list(iterator)
3670        self.assertEqual(len(entries2), 0, entries2)
3671
3672    def test_bad_path_type(self):
3673        for obj in [1.234, {}, []]:
3674            self.assertRaises(TypeError, os.scandir, obj)
3675
3676    def test_close(self):
3677        self.create_file("file.txt")
3678        self.create_file("file2.txt")
3679        iterator = os.scandir(self.path)
3680        next(iterator)
3681        iterator.close()
3682        # multiple closes
3683        iterator.close()
3684        with self.check_no_resource_warning():
3685            del iterator
3686
3687    def test_context_manager(self):
3688        self.create_file("file.txt")
3689        self.create_file("file2.txt")
3690        with os.scandir(self.path) as iterator:
3691            next(iterator)
3692        with self.check_no_resource_warning():
3693            del iterator
3694
3695    def test_context_manager_close(self):
3696        self.create_file("file.txt")
3697        self.create_file("file2.txt")
3698        with os.scandir(self.path) as iterator:
3699            next(iterator)
3700            iterator.close()
3701
3702    def test_context_manager_exception(self):
3703        self.create_file("file.txt")
3704        self.create_file("file2.txt")
3705        with self.assertRaises(ZeroDivisionError):
3706            with os.scandir(self.path) as iterator:
3707                next(iterator)
3708                1/0
3709        with self.check_no_resource_warning():
3710            del iterator
3711
3712    def test_resource_warning(self):
3713        self.create_file("file.txt")
3714        self.create_file("file2.txt")
3715        iterator = os.scandir(self.path)
3716        next(iterator)
3717        with self.assertWarns(ResourceWarning):
3718            del iterator
3719            support.gc_collect()
3720        # exhausted iterator
3721        iterator = os.scandir(self.path)
3722        list(iterator)
3723        with self.check_no_resource_warning():
3724            del iterator
3725
3726
3727class TestPEP519(unittest.TestCase):
3728
3729    # Abstracted so it can be overridden to test pure Python implementation
3730    # if a C version is provided.
3731    fspath = staticmethod(os.fspath)
3732
3733    def test_return_bytes(self):
3734        for b in b'hello', b'goodbye', b'some/path/and/file':
3735            self.assertEqual(b, self.fspath(b))
3736
3737    def test_return_string(self):
3738        for s in 'hello', 'goodbye', 'some/path/and/file':
3739            self.assertEqual(s, self.fspath(s))
3740
3741    def test_fsencode_fsdecode(self):
3742        for p in "path/like/object", b"path/like/object":
3743            pathlike = FakePath(p)
3744
3745            self.assertEqual(p, self.fspath(pathlike))
3746            self.assertEqual(b"path/like/object", os.fsencode(pathlike))
3747            self.assertEqual("path/like/object", os.fsdecode(pathlike))
3748
3749    def test_pathlike(self):
3750        self.assertEqual('#feelthegil', self.fspath(FakePath('#feelthegil')))
3751        self.assertTrue(issubclass(FakePath, os.PathLike))
3752        self.assertTrue(isinstance(FakePath('x'), os.PathLike))
3753
3754    def test_garbage_in_exception_out(self):
3755        vapor = type('blah', (), {})
3756        for o in int, type, os, vapor():
3757            self.assertRaises(TypeError, self.fspath, o)
3758
3759    def test_argument_required(self):
3760        self.assertRaises(TypeError, self.fspath)
3761
3762    def test_bad_pathlike(self):
3763        # __fspath__ returns a value other than str or bytes.
3764        self.assertRaises(TypeError, self.fspath, FakePath(42))
3765        # __fspath__ attribute that is not callable.
3766        c = type('foo', (), {})
3767        c.__fspath__ = 1
3768        self.assertRaises(TypeError, self.fspath, c())
3769        # __fspath__ raises an exception.
3770        self.assertRaises(ZeroDivisionError, self.fspath,
3771                          FakePath(ZeroDivisionError()))
3772
3773    def test_pathlike_subclasshook(self):
3774        # bpo-38878: subclasshook causes subclass checks
3775        # true on abstract implementation.
3776        class A(os.PathLike):
3777            pass
3778        self.assertFalse(issubclass(FakePath, A))
3779        self.assertTrue(issubclass(FakePath, os.PathLike))
3780
3781
3782class TimesTests(unittest.TestCase):
3783    def test_times(self):
3784        times = os.times()
3785        self.assertIsInstance(times, os.times_result)
3786
3787        for field in ('user', 'system', 'children_user', 'children_system',
3788                      'elapsed'):
3789            value = getattr(times, field)
3790            self.assertIsInstance(value, float)
3791
3792        if os.name == 'nt':
3793            self.assertEqual(times.children_user, 0)
3794            self.assertEqual(times.children_system, 0)
3795            self.assertEqual(times.elapsed, 0)
3796
3797
3798# Only test if the C version is provided, otherwise TestPEP519 already tested
3799# the pure Python implementation.
3800if hasattr(os, "_fspath"):
3801    class TestPEP519PurePython(TestPEP519):
3802
3803        """Explicitly test the pure Python implementation of os.fspath()."""
3804
3805        fspath = staticmethod(os._fspath)
3806
3807
3808if __name__ == "__main__":
3809    unittest.main()
3810