1"Test posix functions"
2
3from test import support
4from test.support import import_helper
5from test.support import os_helper
6from test.support import warnings_helper
7from test.support.script_helper import assert_python_ok
8
9# Skip these tests if there is no posix module.
10posix = import_helper.import_module('posix')
11
12import errno
13import sys
14import signal
15import time
16import os
17import platform
18import pwd
19import stat
20import tempfile
21import unittest
22import warnings
23import textwrap
24
25_DUMMY_SYMLINK = os.path.join(tempfile.gettempdir(),
26                              os_helper.TESTFN + '-dummy-symlink')
27
28requires_32b = unittest.skipUnless(sys.maxsize < 2**32,
29        'test is only meaningful on 32-bit builds')
30
31def _supports_sched():
32    if not hasattr(posix, 'sched_getscheduler'):
33        return False
34    try:
35        posix.sched_getscheduler(0)
36    except OSError as e:
37        if e.errno == errno.ENOSYS:
38            return False
39    return True
40
41requires_sched = unittest.skipUnless(_supports_sched(), 'requires POSIX scheduler API')
42
43
44class PosixTester(unittest.TestCase):
45
46    def setUp(self):
47        # create empty file
48        with open(os_helper.TESTFN, "wb"):
49            pass
50        self.teardown_files = [ os_helper.TESTFN ]
51        self._warnings_manager = warnings_helper.check_warnings()
52        self._warnings_manager.__enter__()
53        warnings.filterwarnings('ignore', '.* potential security risk .*',
54                                RuntimeWarning)
55
56    def tearDown(self):
57        for teardown_file in self.teardown_files:
58            os_helper.unlink(teardown_file)
59        self._warnings_manager.__exit__(None, None, None)
60
61    def testNoArgFunctions(self):
62        # test posix functions which take no arguments and have
63        # no side-effects which we need to cleanup (e.g., fork, wait, abort)
64        NO_ARG_FUNCTIONS = [ "ctermid", "getcwd", "getcwdb", "uname",
65                             "times", "getloadavg",
66                             "getegid", "geteuid", "getgid", "getgroups",
67                             "getpid", "getpgrp", "getppid", "getuid", "sync",
68                           ]
69
70        for name in NO_ARG_FUNCTIONS:
71            posix_func = getattr(posix, name, None)
72            if posix_func is not None:
73                posix_func()
74                self.assertRaises(TypeError, posix_func, 1)
75
76    @unittest.skipUnless(hasattr(posix, 'getresuid'),
77                         'test needs posix.getresuid()')
78    def test_getresuid(self):
79        user_ids = posix.getresuid()
80        self.assertEqual(len(user_ids), 3)
81        for val in user_ids:
82            self.assertGreaterEqual(val, 0)
83
84    @unittest.skipUnless(hasattr(posix, 'getresgid'),
85                         'test needs posix.getresgid()')
86    def test_getresgid(self):
87        group_ids = posix.getresgid()
88        self.assertEqual(len(group_ids), 3)
89        for val in group_ids:
90            self.assertGreaterEqual(val, 0)
91
92    @unittest.skipUnless(hasattr(posix, 'setresuid'),
93                         'test needs posix.setresuid()')
94    def test_setresuid(self):
95        current_user_ids = posix.getresuid()
96        self.assertIsNone(posix.setresuid(*current_user_ids))
97        # -1 means don't change that value.
98        self.assertIsNone(posix.setresuid(-1, -1, -1))
99
100    @unittest.skipUnless(hasattr(posix, 'setresuid'),
101                         'test needs posix.setresuid()')
102    def test_setresuid_exception(self):
103        # Don't do this test if someone is silly enough to run us as root.
104        current_user_ids = posix.getresuid()
105        if 0 not in current_user_ids:
106            new_user_ids = (current_user_ids[0]+1, -1, -1)
107            self.assertRaises(OSError, posix.setresuid, *new_user_ids)
108
109    @unittest.skipUnless(hasattr(posix, 'setresgid'),
110                         'test needs posix.setresgid()')
111    def test_setresgid(self):
112        current_group_ids = posix.getresgid()
113        self.assertIsNone(posix.setresgid(*current_group_ids))
114        # -1 means don't change that value.
115        self.assertIsNone(posix.setresgid(-1, -1, -1))
116
117    @unittest.skipUnless(hasattr(posix, 'setresgid'),
118                         'test needs posix.setresgid()')
119    def test_setresgid_exception(self):
120        # Don't do this test if someone is silly enough to run us as root.
121        current_group_ids = posix.getresgid()
122        if 0 not in current_group_ids:
123            new_group_ids = (current_group_ids[0]+1, -1, -1)
124            self.assertRaises(OSError, posix.setresgid, *new_group_ids)
125
126    @unittest.skipUnless(hasattr(posix, 'initgroups'),
127                         "test needs os.initgroups()")
128    def test_initgroups(self):
129        # It takes a string and an integer; check that it raises a TypeError
130        # for other argument lists.
131        self.assertRaises(TypeError, posix.initgroups)
132        self.assertRaises(TypeError, posix.initgroups, None)
133        self.assertRaises(TypeError, posix.initgroups, 3, "foo")
134        self.assertRaises(TypeError, posix.initgroups, "foo", 3, object())
135
136        # If a non-privileged user invokes it, it should fail with OSError
137        # EPERM.
138        if os.getuid() != 0:
139            try:
140                name = pwd.getpwuid(posix.getuid()).pw_name
141            except KeyError:
142                # the current UID may not have a pwd entry
143                raise unittest.SkipTest("need a pwd entry")
144            try:
145                posix.initgroups(name, 13)
146            except OSError as e:
147                self.assertEqual(e.errno, errno.EPERM)
148            else:
149                self.fail("Expected OSError to be raised by initgroups")
150
151    @unittest.skipUnless(hasattr(posix, 'statvfs'),
152                         'test needs posix.statvfs()')
153    def test_statvfs(self):
154        self.assertTrue(posix.statvfs(os.curdir))
155
156    @unittest.skipUnless(hasattr(posix, 'fstatvfs'),
157                         'test needs posix.fstatvfs()')
158    def test_fstatvfs(self):
159        fp = open(os_helper.TESTFN)
160        try:
161            self.assertTrue(posix.fstatvfs(fp.fileno()))
162            self.assertTrue(posix.statvfs(fp.fileno()))
163        finally:
164            fp.close()
165
166    @unittest.skipUnless(hasattr(posix, 'ftruncate'),
167                         'test needs posix.ftruncate()')
168    def test_ftruncate(self):
169        fp = open(os_helper.TESTFN, 'w+')
170        try:
171            # we need to have some data to truncate
172            fp.write('test')
173            fp.flush()
174            posix.ftruncate(fp.fileno(), 0)
175        finally:
176            fp.close()
177
178    @unittest.skipUnless(hasattr(posix, 'truncate'), "test needs posix.truncate()")
179    def test_truncate(self):
180        with open(os_helper.TESTFN, 'w') as fp:
181            fp.write('test')
182            fp.flush()
183        posix.truncate(os_helper.TESTFN, 0)
184
185    @unittest.skipUnless(getattr(os, 'execve', None) in os.supports_fd, "test needs execve() to support the fd parameter")
186    @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()")
187    def test_fexecve(self):
188        fp = os.open(sys.executable, os.O_RDONLY)
189        try:
190            pid = os.fork()
191            if pid == 0:
192                os.chdir(os.path.split(sys.executable)[0])
193                posix.execve(fp, [sys.executable, '-c', 'pass'], os.environ)
194            else:
195                support.wait_process(pid, exitcode=0)
196        finally:
197            os.close(fp)
198
199
200    @unittest.skipUnless(hasattr(posix, 'waitid'), "test needs posix.waitid()")
201    @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()")
202    def test_waitid(self):
203        pid = os.fork()
204        if pid == 0:
205            os.chdir(os.path.split(sys.executable)[0])
206            posix.execve(sys.executable, [sys.executable, '-c', 'pass'], os.environ)
207        else:
208            res = posix.waitid(posix.P_PID, pid, posix.WEXITED)
209            self.assertEqual(pid, res.si_pid)
210
211    @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()")
212    def test_register_at_fork(self):
213        with self.assertRaises(TypeError, msg="Positional args not allowed"):
214            os.register_at_fork(lambda: None)
215        with self.assertRaises(TypeError, msg="Args must be callable"):
216            os.register_at_fork(before=2)
217        with self.assertRaises(TypeError, msg="Args must be callable"):
218            os.register_at_fork(after_in_child="three")
219        with self.assertRaises(TypeError, msg="Args must be callable"):
220            os.register_at_fork(after_in_parent=b"Five")
221        with self.assertRaises(TypeError, msg="Args must not be None"):
222            os.register_at_fork(before=None)
223        with self.assertRaises(TypeError, msg="Args must not be None"):
224            os.register_at_fork(after_in_child=None)
225        with self.assertRaises(TypeError, msg="Args must not be None"):
226            os.register_at_fork(after_in_parent=None)
227        with self.assertRaises(TypeError, msg="Invalid arg was allowed"):
228            # Ensure a combination of valid and invalid is an error.
229            os.register_at_fork(before=None, after_in_parent=lambda: 3)
230        with self.assertRaises(TypeError, msg="Invalid arg was allowed"):
231            # Ensure a combination of valid and invalid is an error.
232            os.register_at_fork(before=lambda: None, after_in_child='')
233        # We test actual registrations in their own process so as not to
234        # pollute this one.  There is no way to unregister for cleanup.
235        code = """if 1:
236            import os
237
238            r, w = os.pipe()
239            fin_r, fin_w = os.pipe()
240
241            os.register_at_fork(before=lambda: os.write(w, b'A'))
242            os.register_at_fork(after_in_parent=lambda: os.write(w, b'C'))
243            os.register_at_fork(after_in_child=lambda: os.write(w, b'E'))
244            os.register_at_fork(before=lambda: os.write(w, b'B'),
245                                after_in_parent=lambda: os.write(w, b'D'),
246                                after_in_child=lambda: os.write(w, b'F'))
247
248            pid = os.fork()
249            if pid == 0:
250                # At this point, after-forkers have already been executed
251                os.close(w)
252                # Wait for parent to tell us to exit
253                os.read(fin_r, 1)
254                os._exit(0)
255            else:
256                try:
257                    os.close(w)
258                    with open(r, "rb") as f:
259                        data = f.read()
260                        assert len(data) == 6, data
261                        # Check before-fork callbacks
262                        assert data[:2] == b'BA', data
263                        # Check after-fork callbacks
264                        assert sorted(data[2:]) == list(b'CDEF'), data
265                        assert data.index(b'C') < data.index(b'D'), data
266                        assert data.index(b'E') < data.index(b'F'), data
267                finally:
268                    os.write(fin_w, b'!')
269            """
270        assert_python_ok('-c', code)
271
272    @unittest.skipUnless(hasattr(posix, 'lockf'), "test needs posix.lockf()")
273    def test_lockf(self):
274        fd = os.open(os_helper.TESTFN, os.O_WRONLY | os.O_CREAT)
275        try:
276            os.write(fd, b'test')
277            os.lseek(fd, 0, os.SEEK_SET)
278            posix.lockf(fd, posix.F_LOCK, 4)
279            # section is locked
280            posix.lockf(fd, posix.F_ULOCK, 4)
281        finally:
282            os.close(fd)
283
284    @unittest.skipUnless(hasattr(posix, 'pread'), "test needs posix.pread()")
285    def test_pread(self):
286        fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT)
287        try:
288            os.write(fd, b'test')
289            os.lseek(fd, 0, os.SEEK_SET)
290            self.assertEqual(b'es', posix.pread(fd, 2, 1))
291            # the first pread() shouldn't disturb the file offset
292            self.assertEqual(b'te', posix.read(fd, 2))
293        finally:
294            os.close(fd)
295
296    @unittest.skipUnless(hasattr(posix, 'preadv'), "test needs posix.preadv()")
297    def test_preadv(self):
298        fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT)
299        try:
300            os.write(fd, b'test1tt2t3t5t6t6t8')
301            buf = [bytearray(i) for i in [5, 3, 2]]
302            self.assertEqual(posix.preadv(fd, buf, 3), 10)
303            self.assertEqual([b't1tt2', b't3t', b'5t'], list(buf))
304        finally:
305            os.close(fd)
306
307    @unittest.skipUnless(hasattr(posix, 'preadv'), "test needs posix.preadv()")
308    @unittest.skipUnless(hasattr(posix, 'RWF_HIPRI'), "test needs posix.RWF_HIPRI")
309    def test_preadv_flags(self):
310        fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT)
311        try:
312            os.write(fd, b'test1tt2t3t5t6t6t8')
313            buf = [bytearray(i) for i in [5, 3, 2]]
314            self.assertEqual(posix.preadv(fd, buf, 3, os.RWF_HIPRI), 10)
315            self.assertEqual([b't1tt2', b't3t', b'5t'], list(buf))
316        except NotImplementedError:
317            self.skipTest("preadv2 not available")
318        except OSError as inst:
319            # Is possible that the macro RWF_HIPRI was defined at compilation time
320            # but the option is not supported by the kernel or the runtime libc shared
321            # library.
322            if inst.errno in {errno.EINVAL, errno.ENOTSUP}:
323                raise unittest.SkipTest("RWF_HIPRI is not supported by the current system")
324            else:
325                raise
326        finally:
327            os.close(fd)
328
329    @unittest.skipUnless(hasattr(posix, 'preadv'), "test needs posix.preadv()")
330    @requires_32b
331    def test_preadv_overflow_32bits(self):
332        fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT)
333        try:
334            buf = [bytearray(2**16)] * 2**15
335            with self.assertRaises(OSError) as cm:
336                os.preadv(fd, buf, 0)
337            self.assertEqual(cm.exception.errno, errno.EINVAL)
338            self.assertEqual(bytes(buf[0]), b'\0'* 2**16)
339        finally:
340            os.close(fd)
341
342    @unittest.skipUnless(hasattr(posix, 'pwrite'), "test needs posix.pwrite()")
343    def test_pwrite(self):
344        fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT)
345        try:
346            os.write(fd, b'test')
347            os.lseek(fd, 0, os.SEEK_SET)
348            posix.pwrite(fd, b'xx', 1)
349            self.assertEqual(b'txxt', posix.read(fd, 4))
350        finally:
351            os.close(fd)
352
353    @unittest.skipUnless(hasattr(posix, 'pwritev'), "test needs posix.pwritev()")
354    def test_pwritev(self):
355        fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT)
356        try:
357            os.write(fd, b"xx")
358            os.lseek(fd, 0, os.SEEK_SET)
359            n = os.pwritev(fd, [b'test1', b'tt2', b't3'], 2)
360            self.assertEqual(n, 10)
361
362            os.lseek(fd, 0, os.SEEK_SET)
363            self.assertEqual(b'xxtest1tt2t3', posix.read(fd, 100))
364        finally:
365            os.close(fd)
366
367    @unittest.skipUnless(hasattr(posix, 'pwritev'), "test needs posix.pwritev()")
368    @unittest.skipUnless(hasattr(posix, 'os.RWF_SYNC'), "test needs os.RWF_SYNC")
369    def test_pwritev_flags(self):
370        fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT)
371        try:
372            os.write(fd,b"xx")
373            os.lseek(fd, 0, os.SEEK_SET)
374            n = os.pwritev(fd, [b'test1', b'tt2', b't3'], 2, os.RWF_SYNC)
375            self.assertEqual(n, 10)
376
377            os.lseek(fd, 0, os.SEEK_SET)
378            self.assertEqual(b'xxtest1tt2', posix.read(fd, 100))
379        finally:
380            os.close(fd)
381
382    @unittest.skipUnless(hasattr(posix, 'pwritev'), "test needs posix.pwritev()")
383    @requires_32b
384    def test_pwritev_overflow_32bits(self):
385        fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT)
386        try:
387            with self.assertRaises(OSError) as cm:
388                os.pwritev(fd, [b"x" * 2**16] * 2**15, 0)
389            self.assertEqual(cm.exception.errno, errno.EINVAL)
390        finally:
391            os.close(fd)
392
393    @unittest.skipUnless(hasattr(posix, 'posix_fallocate'),
394        "test needs posix.posix_fallocate()")
395    def test_posix_fallocate(self):
396        fd = os.open(os_helper.TESTFN, os.O_WRONLY | os.O_CREAT)
397        try:
398            posix.posix_fallocate(fd, 0, 10)
399        except OSError as inst:
400            # issue10812, ZFS doesn't appear to support posix_fallocate,
401            # so skip Solaris-based since they are likely to have ZFS.
402            # issue33655: Also ignore EINVAL on *BSD since ZFS is also
403            # often used there.
404            if inst.errno == errno.EINVAL and sys.platform.startswith(
405                ('sunos', 'freebsd', 'netbsd', 'openbsd', 'gnukfreebsd')):
406                raise unittest.SkipTest("test may fail on ZFS filesystems")
407            else:
408                raise
409        finally:
410            os.close(fd)
411
412    # issue31106 - posix_fallocate() does not set error in errno.
413    @unittest.skipUnless(hasattr(posix, 'posix_fallocate'),
414        "test needs posix.posix_fallocate()")
415    def test_posix_fallocate_errno(self):
416        try:
417            posix.posix_fallocate(-42, 0, 10)
418        except OSError as inst:
419            if inst.errno != errno.EBADF:
420                raise
421
422    @unittest.skipUnless(hasattr(posix, 'posix_fadvise'),
423        "test needs posix.posix_fadvise()")
424    def test_posix_fadvise(self):
425        fd = os.open(os_helper.TESTFN, os.O_RDONLY)
426        try:
427            posix.posix_fadvise(fd, 0, 0, posix.POSIX_FADV_WILLNEED)
428        finally:
429            os.close(fd)
430
431    @unittest.skipUnless(hasattr(posix, 'posix_fadvise'),
432        "test needs posix.posix_fadvise()")
433    def test_posix_fadvise_errno(self):
434        try:
435            posix.posix_fadvise(-42, 0, 0, posix.POSIX_FADV_WILLNEED)
436        except OSError as inst:
437            if inst.errno != errno.EBADF:
438                raise
439
440    @unittest.skipUnless(os.utime in os.supports_fd, "test needs fd support in os.utime")
441    def test_utime_with_fd(self):
442        now = time.time()
443        fd = os.open(os_helper.TESTFN, os.O_RDONLY)
444        try:
445            posix.utime(fd)
446            posix.utime(fd, None)
447            self.assertRaises(TypeError, posix.utime, fd, (None, None))
448            self.assertRaises(TypeError, posix.utime, fd, (now, None))
449            self.assertRaises(TypeError, posix.utime, fd, (None, now))
450            posix.utime(fd, (int(now), int(now)))
451            posix.utime(fd, (now, now))
452            self.assertRaises(ValueError, posix.utime, fd, (now, now), ns=(now, now))
453            self.assertRaises(ValueError, posix.utime, fd, (now, 0), ns=(None, None))
454            self.assertRaises(ValueError, posix.utime, fd, (None, None), ns=(now, 0))
455            posix.utime(fd, (int(now), int((now - int(now)) * 1e9)))
456            posix.utime(fd, ns=(int(now), int((now - int(now)) * 1e9)))
457
458        finally:
459            os.close(fd)
460
461    @unittest.skipUnless(os.utime in os.supports_follow_symlinks, "test needs follow_symlinks support in os.utime")
462    def test_utime_nofollow_symlinks(self):
463        now = time.time()
464        posix.utime(os_helper.TESTFN, None, follow_symlinks=False)
465        self.assertRaises(TypeError, posix.utime, os_helper.TESTFN,
466                          (None, None), follow_symlinks=False)
467        self.assertRaises(TypeError, posix.utime, os_helper.TESTFN,
468                          (now, None), follow_symlinks=False)
469        self.assertRaises(TypeError, posix.utime, os_helper.TESTFN,
470                          (None, now), follow_symlinks=False)
471        posix.utime(os_helper.TESTFN, (int(now), int(now)),
472                    follow_symlinks=False)
473        posix.utime(os_helper.TESTFN, (now, now), follow_symlinks=False)
474        posix.utime(os_helper.TESTFN, follow_symlinks=False)
475
476    @unittest.skipUnless(hasattr(posix, 'writev'), "test needs posix.writev()")
477    def test_writev(self):
478        fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT)
479        try:
480            n = os.writev(fd, (b'test1', b'tt2', b't3'))
481            self.assertEqual(n, 10)
482
483            os.lseek(fd, 0, os.SEEK_SET)
484            self.assertEqual(b'test1tt2t3', posix.read(fd, 10))
485
486            # Issue #20113: empty list of buffers should not crash
487            try:
488                size = posix.writev(fd, [])
489            except OSError:
490                # writev(fd, []) raises OSError(22, "Invalid argument")
491                # on OpenIndiana
492                pass
493            else:
494                self.assertEqual(size, 0)
495        finally:
496            os.close(fd)
497
498    @unittest.skipUnless(hasattr(posix, 'writev'), "test needs posix.writev()")
499    @requires_32b
500    def test_writev_overflow_32bits(self):
501        fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT)
502        try:
503            with self.assertRaises(OSError) as cm:
504                os.writev(fd, [b"x" * 2**16] * 2**15)
505            self.assertEqual(cm.exception.errno, errno.EINVAL)
506        finally:
507            os.close(fd)
508
509    @unittest.skipUnless(hasattr(posix, 'readv'), "test needs posix.readv()")
510    def test_readv(self):
511        fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT)
512        try:
513            os.write(fd, b'test1tt2t3')
514            os.lseek(fd, 0, os.SEEK_SET)
515            buf = [bytearray(i) for i in [5, 3, 2]]
516            self.assertEqual(posix.readv(fd, buf), 10)
517            self.assertEqual([b'test1', b'tt2', b't3'], [bytes(i) for i in buf])
518
519            # Issue #20113: empty list of buffers should not crash
520            try:
521                size = posix.readv(fd, [])
522            except OSError:
523                # readv(fd, []) raises OSError(22, "Invalid argument")
524                # on OpenIndiana
525                pass
526            else:
527                self.assertEqual(size, 0)
528        finally:
529            os.close(fd)
530
531    @unittest.skipUnless(hasattr(posix, 'readv'), "test needs posix.readv()")
532    @requires_32b
533    def test_readv_overflow_32bits(self):
534        fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT)
535        try:
536            buf = [bytearray(2**16)] * 2**15
537            with self.assertRaises(OSError) as cm:
538                os.readv(fd, buf)
539            self.assertEqual(cm.exception.errno, errno.EINVAL)
540            self.assertEqual(bytes(buf[0]), b'\0'* 2**16)
541        finally:
542            os.close(fd)
543
544    @unittest.skipUnless(hasattr(posix, 'dup'),
545                         'test needs posix.dup()')
546    def test_dup(self):
547        fp = open(os_helper.TESTFN)
548        try:
549            fd = posix.dup(fp.fileno())
550            self.assertIsInstance(fd, int)
551            os.close(fd)
552        finally:
553            fp.close()
554
555    @unittest.skipUnless(hasattr(posix, 'confstr'),
556                         'test needs posix.confstr()')
557    def test_confstr(self):
558        self.assertRaises(ValueError, posix.confstr, "CS_garbage")
559        self.assertEqual(len(posix.confstr("CS_PATH")) > 0, True)
560
561    @unittest.skipUnless(hasattr(posix, 'dup2'),
562                         'test needs posix.dup2()')
563    def test_dup2(self):
564        fp1 = open(os_helper.TESTFN)
565        fp2 = open(os_helper.TESTFN)
566        try:
567            posix.dup2(fp1.fileno(), fp2.fileno())
568        finally:
569            fp1.close()
570            fp2.close()
571
572    @unittest.skipUnless(hasattr(os, 'O_CLOEXEC'), "needs os.O_CLOEXEC")
573    @support.requires_linux_version(2, 6, 23)
574    def test_oscloexec(self):
575        fd = os.open(os_helper.TESTFN, os.O_RDONLY|os.O_CLOEXEC)
576        self.addCleanup(os.close, fd)
577        self.assertFalse(os.get_inheritable(fd))
578
579    @unittest.skipUnless(hasattr(posix, 'O_EXLOCK'),
580                         'test needs posix.O_EXLOCK')
581    def test_osexlock(self):
582        fd = os.open(os_helper.TESTFN,
583                     os.O_WRONLY|os.O_EXLOCK|os.O_CREAT)
584        self.assertRaises(OSError, os.open, os_helper.TESTFN,
585                          os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK)
586        os.close(fd)
587
588        if hasattr(posix, "O_SHLOCK"):
589            fd = os.open(os_helper.TESTFN,
590                         os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
591            self.assertRaises(OSError, os.open, os_helper.TESTFN,
592                              os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK)
593            os.close(fd)
594
595    @unittest.skipUnless(hasattr(posix, 'O_SHLOCK'),
596                         'test needs posix.O_SHLOCK')
597    def test_osshlock(self):
598        fd1 = os.open(os_helper.TESTFN,
599                     os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
600        fd2 = os.open(os_helper.TESTFN,
601                      os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
602        os.close(fd2)
603        os.close(fd1)
604
605        if hasattr(posix, "O_EXLOCK"):
606            fd = os.open(os_helper.TESTFN,
607                         os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
608            self.assertRaises(OSError, os.open, os_helper.TESTFN,
609                              os.O_RDONLY|os.O_EXLOCK|os.O_NONBLOCK)
610            os.close(fd)
611
612    @unittest.skipUnless(hasattr(posix, 'fstat'),
613                         'test needs posix.fstat()')
614    def test_fstat(self):
615        fp = open(os_helper.TESTFN)
616        try:
617            self.assertTrue(posix.fstat(fp.fileno()))
618            self.assertTrue(posix.stat(fp.fileno()))
619
620            self.assertRaisesRegex(TypeError,
621                    'should be string, bytes, os.PathLike or integer, not',
622                    posix.stat, float(fp.fileno()))
623        finally:
624            fp.close()
625
626    def test_stat(self):
627        self.assertTrue(posix.stat(os_helper.TESTFN))
628        self.assertTrue(posix.stat(os.fsencode(os_helper.TESTFN)))
629
630        self.assertWarnsRegex(DeprecationWarning,
631                'should be string, bytes, os.PathLike or integer, not',
632                posix.stat, bytearray(os.fsencode(os_helper.TESTFN)))
633        self.assertRaisesRegex(TypeError,
634                'should be string, bytes, os.PathLike or integer, not',
635                posix.stat, None)
636        self.assertRaisesRegex(TypeError,
637                'should be string, bytes, os.PathLike or integer, not',
638                posix.stat, list(os_helper.TESTFN))
639        self.assertRaisesRegex(TypeError,
640                'should be string, bytes, os.PathLike or integer, not',
641                posix.stat, list(os.fsencode(os_helper.TESTFN)))
642
643    @unittest.skipUnless(hasattr(posix, 'mkfifo'), "don't have mkfifo()")
644    def test_mkfifo(self):
645        if sys.platform == "vxworks":
646            fifo_path = os.path.join("/fifos/", os_helper.TESTFN)
647        else:
648            fifo_path = os_helper.TESTFN
649        os_helper.unlink(fifo_path)
650        self.addCleanup(os_helper.unlink, fifo_path)
651        try:
652            posix.mkfifo(fifo_path, stat.S_IRUSR | stat.S_IWUSR)
653        except PermissionError as e:
654            self.skipTest('posix.mkfifo(): %s' % e)
655        self.assertTrue(stat.S_ISFIFO(posix.stat(fifo_path).st_mode))
656
657    @unittest.skipUnless(hasattr(posix, 'mknod') and hasattr(stat, 'S_IFIFO'),
658                         "don't have mknod()/S_IFIFO")
659    def test_mknod(self):
660        # Test using mknod() to create a FIFO (the only use specified
661        # by POSIX).
662        os_helper.unlink(os_helper.TESTFN)
663        mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR
664        try:
665            posix.mknod(os_helper.TESTFN, mode, 0)
666        except OSError as e:
667            # Some old systems don't allow unprivileged users to use
668            # mknod(), or only support creating device nodes.
669            self.assertIn(e.errno, (errno.EPERM, errno.EINVAL, errno.EACCES))
670        else:
671            self.assertTrue(stat.S_ISFIFO(posix.stat(os_helper.TESTFN).st_mode))
672
673        # Keyword arguments are also supported
674        os_helper.unlink(os_helper.TESTFN)
675        try:
676            posix.mknod(path=os_helper.TESTFN, mode=mode, device=0,
677                dir_fd=None)
678        except OSError as e:
679            self.assertIn(e.errno, (errno.EPERM, errno.EINVAL, errno.EACCES))
680
681    @unittest.skipUnless(hasattr(posix, 'makedev'), 'test needs posix.makedev()')
682    def test_makedev(self):
683        st = posix.stat(os_helper.TESTFN)
684        dev = st.st_dev
685        self.assertIsInstance(dev, int)
686        self.assertGreaterEqual(dev, 0)
687
688        major = posix.major(dev)
689        self.assertIsInstance(major, int)
690        self.assertGreaterEqual(major, 0)
691        self.assertEqual(posix.major(dev), major)
692        self.assertRaises(TypeError, posix.major, float(dev))
693        self.assertRaises(TypeError, posix.major)
694        self.assertRaises((ValueError, OverflowError), posix.major, -1)
695
696        minor = posix.minor(dev)
697        self.assertIsInstance(minor, int)
698        self.assertGreaterEqual(minor, 0)
699        self.assertEqual(posix.minor(dev), minor)
700        self.assertRaises(TypeError, posix.minor, float(dev))
701        self.assertRaises(TypeError, posix.minor)
702        self.assertRaises((ValueError, OverflowError), posix.minor, -1)
703
704        self.assertEqual(posix.makedev(major, minor), dev)
705        self.assertRaises(TypeError, posix.makedev, float(major), minor)
706        self.assertRaises(TypeError, posix.makedev, major, float(minor))
707        self.assertRaises(TypeError, posix.makedev, major)
708        self.assertRaises(TypeError, posix.makedev)
709
710    def _test_all_chown_common(self, chown_func, first_param, stat_func):
711        """Common code for chown, fchown and lchown tests."""
712        def check_stat(uid, gid):
713            if stat_func is not None:
714                stat = stat_func(first_param)
715                self.assertEqual(stat.st_uid, uid)
716                self.assertEqual(stat.st_gid, gid)
717        uid = os.getuid()
718        gid = os.getgid()
719        # test a successful chown call
720        chown_func(first_param, uid, gid)
721        check_stat(uid, gid)
722        chown_func(first_param, -1, gid)
723        check_stat(uid, gid)
724        chown_func(first_param, uid, -1)
725        check_stat(uid, gid)
726
727        if sys.platform == "vxworks":
728            # On VxWorks, root user id is 1 and 0 means no login user:
729            # both are super users.
730            is_root = (uid in (0, 1))
731        else:
732            is_root = (uid == 0)
733        if is_root:
734            # Try an amusingly large uid/gid to make sure we handle
735            # large unsigned values.  (chown lets you use any
736            # uid/gid you like, even if they aren't defined.)
737            #
738            # On VxWorks uid_t is defined as unsigned short. A big
739            # value greater than 65535 will result in underflow error.
740            #
741            # This problem keeps coming up:
742            #   http://bugs.python.org/issue1747858
743            #   http://bugs.python.org/issue4591
744            #   http://bugs.python.org/issue15301
745            # Hopefully the fix in 4591 fixes it for good!
746            #
747            # This part of the test only runs when run as root.
748            # Only scary people run their tests as root.
749
750            big_value = (2**31 if sys.platform != "vxworks" else 2**15)
751            chown_func(first_param, big_value, big_value)
752            check_stat(big_value, big_value)
753            chown_func(first_param, -1, -1)
754            check_stat(big_value, big_value)
755            chown_func(first_param, uid, gid)
756            check_stat(uid, gid)
757        elif platform.system() in ('HP-UX', 'SunOS'):
758            # HP-UX and Solaris can allow a non-root user to chown() to root
759            # (issue #5113)
760            raise unittest.SkipTest("Skipping because of non-standard chown() "
761                                    "behavior")
762        else:
763            # non-root cannot chown to root, raises OSError
764            self.assertRaises(OSError, chown_func, first_param, 0, 0)
765            check_stat(uid, gid)
766            self.assertRaises(OSError, chown_func, first_param, 0, -1)
767            check_stat(uid, gid)
768            if 0 not in os.getgroups():
769                self.assertRaises(OSError, chown_func, first_param, -1, 0)
770                check_stat(uid, gid)
771        # test illegal types
772        for t in str, float:
773            self.assertRaises(TypeError, chown_func, first_param, t(uid), gid)
774            check_stat(uid, gid)
775            self.assertRaises(TypeError, chown_func, first_param, uid, t(gid))
776            check_stat(uid, gid)
777
778    @unittest.skipUnless(hasattr(posix, 'chown'), "test needs os.chown()")
779    def test_chown(self):
780        # raise an OSError if the file does not exist
781        os.unlink(os_helper.TESTFN)
782        self.assertRaises(OSError, posix.chown, os_helper.TESTFN, -1, -1)
783
784        # re-create the file
785        os_helper.create_empty_file(os_helper.TESTFN)
786        self._test_all_chown_common(posix.chown, os_helper.TESTFN, posix.stat)
787
788    @unittest.skipUnless(hasattr(posix, 'fchown'), "test needs os.fchown()")
789    def test_fchown(self):
790        os.unlink(os_helper.TESTFN)
791
792        # re-create the file
793        test_file = open(os_helper.TESTFN, 'w')
794        try:
795            fd = test_file.fileno()
796            self._test_all_chown_common(posix.fchown, fd,
797                                        getattr(posix, 'fstat', None))
798        finally:
799            test_file.close()
800
801    @unittest.skipUnless(hasattr(posix, 'lchown'), "test needs os.lchown()")
802    def test_lchown(self):
803        os.unlink(os_helper.TESTFN)
804        # create a symlink
805        os.symlink(_DUMMY_SYMLINK, os_helper.TESTFN)
806        self._test_all_chown_common(posix.lchown, os_helper.TESTFN,
807                                    getattr(posix, 'lstat', None))
808
809    @unittest.skipUnless(hasattr(posix, 'chdir'), 'test needs posix.chdir()')
810    def test_chdir(self):
811        posix.chdir(os.curdir)
812        self.assertRaises(OSError, posix.chdir, os_helper.TESTFN)
813
814    def test_listdir(self):
815        self.assertIn(os_helper.TESTFN, posix.listdir(os.curdir))
816
817    def test_listdir_default(self):
818        # When listdir is called without argument,
819        # it's the same as listdir(os.curdir).
820        self.assertIn(os_helper.TESTFN, posix.listdir())
821
822    def test_listdir_bytes(self):
823        # When listdir is called with a bytes object,
824        # the returned strings are of type bytes.
825        self.assertIn(os.fsencode(os_helper.TESTFN), posix.listdir(b'.'))
826
827    def test_listdir_bytes_like(self):
828        for cls in bytearray, memoryview:
829            with self.assertWarns(DeprecationWarning):
830                names = posix.listdir(cls(b'.'))
831            self.assertIn(os.fsencode(os_helper.TESTFN), names)
832            for name in names:
833                self.assertIs(type(name), bytes)
834
835    @unittest.skipUnless(posix.listdir in os.supports_fd,
836                         "test needs fd support for posix.listdir()")
837    def test_listdir_fd(self):
838        f = posix.open(posix.getcwd(), posix.O_RDONLY)
839        self.addCleanup(posix.close, f)
840        self.assertEqual(
841            sorted(posix.listdir('.')),
842            sorted(posix.listdir(f))
843            )
844        # Check that the fd offset was reset (issue #13739)
845        self.assertEqual(
846            sorted(posix.listdir('.')),
847            sorted(posix.listdir(f))
848            )
849
850    @unittest.skipUnless(hasattr(posix, 'access'), 'test needs posix.access()')
851    def test_access(self):
852        self.assertTrue(posix.access(os_helper.TESTFN, os.R_OK))
853
854    @unittest.skipUnless(hasattr(posix, 'umask'), 'test needs posix.umask()')
855    def test_umask(self):
856        old_mask = posix.umask(0)
857        self.assertIsInstance(old_mask, int)
858        posix.umask(old_mask)
859
860    @unittest.skipUnless(hasattr(posix, 'strerror'),
861                         'test needs posix.strerror()')
862    def test_strerror(self):
863        self.assertTrue(posix.strerror(0))
864
865    @unittest.skipUnless(hasattr(posix, 'pipe'), 'test needs posix.pipe()')
866    def test_pipe(self):
867        reader, writer = posix.pipe()
868        os.close(reader)
869        os.close(writer)
870
871    @unittest.skipUnless(hasattr(os, 'pipe2'), "test needs os.pipe2()")
872    @support.requires_linux_version(2, 6, 27)
873    def test_pipe2(self):
874        self.assertRaises(TypeError, os.pipe2, 'DEADBEEF')
875        self.assertRaises(TypeError, os.pipe2, 0, 0)
876
877        # try calling with flags = 0, like os.pipe()
878        r, w = os.pipe2(0)
879        os.close(r)
880        os.close(w)
881
882        # test flags
883        r, w = os.pipe2(os.O_CLOEXEC|os.O_NONBLOCK)
884        self.addCleanup(os.close, r)
885        self.addCleanup(os.close, w)
886        self.assertFalse(os.get_inheritable(r))
887        self.assertFalse(os.get_inheritable(w))
888        self.assertFalse(os.get_blocking(r))
889        self.assertFalse(os.get_blocking(w))
890        # try reading from an empty pipe: this should fail, not block
891        self.assertRaises(OSError, os.read, r, 1)
892        # try a write big enough to fill-up the pipe: this should either
893        # fail or perform a partial write, not block
894        try:
895            os.write(w, b'x' * support.PIPE_MAX_SIZE)
896        except OSError:
897            pass
898
899    @support.cpython_only
900    @unittest.skipUnless(hasattr(os, 'pipe2'), "test needs os.pipe2()")
901    @support.requires_linux_version(2, 6, 27)
902    def test_pipe2_c_limits(self):
903        # Issue 15989
904        import _testcapi
905        self.assertRaises(OverflowError, os.pipe2, _testcapi.INT_MAX + 1)
906        self.assertRaises(OverflowError, os.pipe2, _testcapi.UINT_MAX + 1)
907
908    @unittest.skipUnless(hasattr(posix, 'utime'), 'test needs posix.utime()')
909    def test_utime(self):
910        now = time.time()
911        posix.utime(os_helper.TESTFN, None)
912        self.assertRaises(TypeError, posix.utime,
913                          os_helper.TESTFN, (None, None))
914        self.assertRaises(TypeError, posix.utime,
915                          os_helper.TESTFN, (now, None))
916        self.assertRaises(TypeError, posix.utime,
917                          os_helper.TESTFN, (None, now))
918        posix.utime(os_helper.TESTFN, (int(now), int(now)))
919        posix.utime(os_helper.TESTFN, (now, now))
920
921    def _test_chflags_regular_file(self, chflags_func, target_file, **kwargs):
922        st = os.stat(target_file)
923        self.assertTrue(hasattr(st, 'st_flags'))
924
925        # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE.
926        flags = st.st_flags | stat.UF_IMMUTABLE
927        try:
928            chflags_func(target_file, flags, **kwargs)
929        except OSError as err:
930            if err.errno != errno.EOPNOTSUPP:
931                raise
932            msg = 'chflag UF_IMMUTABLE not supported by underlying fs'
933            self.skipTest(msg)
934
935        try:
936            new_st = os.stat(target_file)
937            self.assertEqual(st.st_flags | stat.UF_IMMUTABLE, new_st.st_flags)
938            try:
939                fd = open(target_file, 'w+')
940            except OSError as e:
941                self.assertEqual(e.errno, errno.EPERM)
942        finally:
943            posix.chflags(target_file, st.st_flags)
944
945    @unittest.skipUnless(hasattr(posix, 'chflags'), 'test needs os.chflags()')
946    def test_chflags(self):
947        self._test_chflags_regular_file(posix.chflags, os_helper.TESTFN)
948
949    @unittest.skipUnless(hasattr(posix, 'lchflags'), 'test needs os.lchflags()')
950    def test_lchflags_regular_file(self):
951        self._test_chflags_regular_file(posix.lchflags, os_helper.TESTFN)
952        self._test_chflags_regular_file(posix.chflags, os_helper.TESTFN,
953                                        follow_symlinks=False)
954
955    @unittest.skipUnless(hasattr(posix, 'lchflags'), 'test needs os.lchflags()')
956    def test_lchflags_symlink(self):
957        testfn_st = os.stat(os_helper.TESTFN)
958
959        self.assertTrue(hasattr(testfn_st, 'st_flags'))
960
961        os.symlink(os_helper.TESTFN, _DUMMY_SYMLINK)
962        self.teardown_files.append(_DUMMY_SYMLINK)
963        dummy_symlink_st = os.lstat(_DUMMY_SYMLINK)
964
965        def chflags_nofollow(path, flags):
966            return posix.chflags(path, flags, follow_symlinks=False)
967
968        for fn in (posix.lchflags, chflags_nofollow):
969            # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE.
970            flags = dummy_symlink_st.st_flags | stat.UF_IMMUTABLE
971            try:
972                fn(_DUMMY_SYMLINK, flags)
973            except OSError as err:
974                if err.errno != errno.EOPNOTSUPP:
975                    raise
976                msg = 'chflag UF_IMMUTABLE not supported by underlying fs'
977                self.skipTest(msg)
978            try:
979                new_testfn_st = os.stat(os_helper.TESTFN)
980                new_dummy_symlink_st = os.lstat(_DUMMY_SYMLINK)
981
982                self.assertEqual(testfn_st.st_flags, new_testfn_st.st_flags)
983                self.assertEqual(dummy_symlink_st.st_flags | stat.UF_IMMUTABLE,
984                                 new_dummy_symlink_st.st_flags)
985            finally:
986                fn(_DUMMY_SYMLINK, dummy_symlink_st.st_flags)
987
988    def test_environ(self):
989        if os.name == "nt":
990            item_type = str
991        else:
992            item_type = bytes
993        for k, v in posix.environ.items():
994            self.assertEqual(type(k), item_type)
995            self.assertEqual(type(v), item_type)
996
997    def test_putenv(self):
998        with self.assertRaises(ValueError):
999            os.putenv('FRUIT\0VEGETABLE', 'cabbage')
1000        with self.assertRaises(ValueError):
1001            os.putenv(b'FRUIT\0VEGETABLE', b'cabbage')
1002        with self.assertRaises(ValueError):
1003            os.putenv('FRUIT', 'orange\0VEGETABLE=cabbage')
1004        with self.assertRaises(ValueError):
1005            os.putenv(b'FRUIT', b'orange\0VEGETABLE=cabbage')
1006        with self.assertRaises(ValueError):
1007            os.putenv('FRUIT=ORANGE', 'lemon')
1008        with self.assertRaises(ValueError):
1009            os.putenv(b'FRUIT=ORANGE', b'lemon')
1010
1011    @unittest.skipUnless(hasattr(posix, 'getcwd'), 'test needs posix.getcwd()')
1012    def test_getcwd_long_pathnames(self):
1013        dirname = 'getcwd-test-directory-0123456789abcdef-01234567890abcdef'
1014        curdir = os.getcwd()
1015        base_path = os.path.abspath(os_helper.TESTFN) + '.getcwd'
1016
1017        try:
1018            os.mkdir(base_path)
1019            os.chdir(base_path)
1020        except:
1021            #  Just returning nothing instead of the SkipTest exception, because
1022            #  the test results in Error in that case.  Is that ok?
1023            #  raise unittest.SkipTest("cannot create directory for testing")
1024            return
1025
1026            def _create_and_do_getcwd(dirname, current_path_length = 0):
1027                try:
1028                    os.mkdir(dirname)
1029                except:
1030                    raise unittest.SkipTest("mkdir cannot create directory sufficiently deep for getcwd test")
1031
1032                os.chdir(dirname)
1033                try:
1034                    os.getcwd()
1035                    if current_path_length < 1027:
1036                        _create_and_do_getcwd(dirname, current_path_length + len(dirname) + 1)
1037                finally:
1038                    os.chdir('..')
1039                    os.rmdir(dirname)
1040
1041            _create_and_do_getcwd(dirname)
1042
1043        finally:
1044            os.chdir(curdir)
1045            os_helper.rmtree(base_path)
1046
1047    @unittest.skipUnless(hasattr(posix, 'getgrouplist'), "test needs posix.getgrouplist()")
1048    @unittest.skipUnless(hasattr(pwd, 'getpwuid'), "test needs pwd.getpwuid()")
1049    @unittest.skipUnless(hasattr(os, 'getuid'), "test needs os.getuid()")
1050    def test_getgrouplist(self):
1051        user = pwd.getpwuid(os.getuid())[0]
1052        group = pwd.getpwuid(os.getuid())[3]
1053        self.assertIn(group, posix.getgrouplist(user, group))
1054
1055
1056    @unittest.skipUnless(hasattr(os, 'getegid'), "test needs os.getegid()")
1057    @unittest.skipUnless(hasattr(os, 'popen'), "test needs os.popen()")
1058    def test_getgroups(self):
1059        with os.popen('id -G 2>/dev/null') as idg:
1060            groups = idg.read().strip()
1061            ret = idg.close()
1062
1063        try:
1064            idg_groups = set(int(g) for g in groups.split())
1065        except ValueError:
1066            idg_groups = set()
1067        if ret is not None or not idg_groups:
1068            raise unittest.SkipTest("need working 'id -G'")
1069
1070        # Issues 16698: OS X ABIs prior to 10.6 have limits on getgroups()
1071        if sys.platform == 'darwin':
1072            import sysconfig
1073            dt = sysconfig.get_config_var('MACOSX_DEPLOYMENT_TARGET') or '10.3'
1074            if tuple(int(n) for n in dt.split('.')[0:2]) < (10, 6):
1075                raise unittest.SkipTest("getgroups(2) is broken prior to 10.6")
1076
1077        # 'id -G' and 'os.getgroups()' should return the same
1078        # groups, ignoring order, duplicates, and the effective gid.
1079        # #10822/#26944 - It is implementation defined whether
1080        # posix.getgroups() includes the effective gid.
1081        symdiff = idg_groups.symmetric_difference(posix.getgroups())
1082        self.assertTrue(not symdiff or symdiff == {posix.getegid()})
1083
1084    # tests for the posix *at functions follow
1085
1086    @unittest.skipUnless(os.access in os.supports_dir_fd, "test needs dir_fd support for os.access()")
1087    def test_access_dir_fd(self):
1088        f = posix.open(posix.getcwd(), posix.O_RDONLY)
1089        try:
1090            self.assertTrue(posix.access(os_helper.TESTFN, os.R_OK, dir_fd=f))
1091        finally:
1092            posix.close(f)
1093
1094    @unittest.skipUnless(os.chmod in os.supports_dir_fd, "test needs dir_fd support in os.chmod()")
1095    def test_chmod_dir_fd(self):
1096        os.chmod(os_helper.TESTFN, stat.S_IRUSR)
1097
1098        f = posix.open(posix.getcwd(), posix.O_RDONLY)
1099        try:
1100            posix.chmod(os_helper.TESTFN, stat.S_IRUSR | stat.S_IWUSR, dir_fd=f)
1101
1102            s = posix.stat(os_helper.TESTFN)
1103            self.assertEqual(s[0] & stat.S_IRWXU, stat.S_IRUSR | stat.S_IWUSR)
1104        finally:
1105            posix.close(f)
1106
1107    @unittest.skipUnless(hasattr(os, 'chown') and (os.chown in os.supports_dir_fd),
1108                         "test needs dir_fd support in os.chown()")
1109    def test_chown_dir_fd(self):
1110        os_helper.unlink(os_helper.TESTFN)
1111        os_helper.create_empty_file(os_helper.TESTFN)
1112
1113        f = posix.open(posix.getcwd(), posix.O_RDONLY)
1114        try:
1115            posix.chown(os_helper.TESTFN, os.getuid(), os.getgid(), dir_fd=f)
1116        finally:
1117            posix.close(f)
1118
1119    @unittest.skipUnless(os.stat in os.supports_dir_fd, "test needs dir_fd support in os.stat()")
1120    def test_stat_dir_fd(self):
1121        os_helper.unlink(os_helper.TESTFN)
1122        with open(os_helper.TESTFN, 'w') as outfile:
1123            outfile.write("testline\n")
1124
1125        f = posix.open(posix.getcwd(), posix.O_RDONLY)
1126        try:
1127            s1 = posix.stat(os_helper.TESTFN)
1128            s2 = posix.stat(os_helper.TESTFN, dir_fd=f)
1129            self.assertEqual(s1, s2)
1130            s2 = posix.stat(os_helper.TESTFN, dir_fd=None)
1131            self.assertEqual(s1, s2)
1132            self.assertRaisesRegex(TypeError, 'should be integer or None, not',
1133                    posix.stat, os_helper.TESTFN, dir_fd=posix.getcwd())
1134            self.assertRaisesRegex(TypeError, 'should be integer or None, not',
1135                    posix.stat, os_helper.TESTFN, dir_fd=float(f))
1136            self.assertRaises(OverflowError,
1137                    posix.stat, os_helper.TESTFN, dir_fd=10**20)
1138        finally:
1139            posix.close(f)
1140
1141    @unittest.skipUnless(os.utime in os.supports_dir_fd, "test needs dir_fd support in os.utime()")
1142    def test_utime_dir_fd(self):
1143        f = posix.open(posix.getcwd(), posix.O_RDONLY)
1144        try:
1145            now = time.time()
1146            posix.utime(os_helper.TESTFN, None, dir_fd=f)
1147            posix.utime(os_helper.TESTFN, dir_fd=f)
1148            self.assertRaises(TypeError, posix.utime, os_helper.TESTFN,
1149                              now, dir_fd=f)
1150            self.assertRaises(TypeError, posix.utime, os_helper.TESTFN,
1151                              (None, None), dir_fd=f)
1152            self.assertRaises(TypeError, posix.utime, os_helper.TESTFN,
1153                              (now, None), dir_fd=f)
1154            self.assertRaises(TypeError, posix.utime, os_helper.TESTFN,
1155                              (None, now), dir_fd=f)
1156            self.assertRaises(TypeError, posix.utime, os_helper.TESTFN,
1157                              (now, "x"), dir_fd=f)
1158            posix.utime(os_helper.TESTFN, (int(now), int(now)), dir_fd=f)
1159            posix.utime(os_helper.TESTFN, (now, now), dir_fd=f)
1160            posix.utime(os_helper.TESTFN,
1161                    (int(now), int((now - int(now)) * 1e9)), dir_fd=f)
1162            posix.utime(os_helper.TESTFN, dir_fd=f,
1163                            times=(int(now), int((now - int(now)) * 1e9)))
1164
1165            # try dir_fd and follow_symlinks together
1166            if os.utime in os.supports_follow_symlinks:
1167                try:
1168                    posix.utime(os_helper.TESTFN, follow_symlinks=False,
1169                                dir_fd=f)
1170                except ValueError:
1171                    # whoops!  using both together not supported on this platform.
1172                    pass
1173
1174        finally:
1175            posix.close(f)
1176
1177    @unittest.skipUnless(os.link in os.supports_dir_fd, "test needs dir_fd support in os.link()")
1178    def test_link_dir_fd(self):
1179        f = posix.open(posix.getcwd(), posix.O_RDONLY)
1180        try:
1181            posix.link(os_helper.TESTFN, os_helper.TESTFN + 'link',
1182                       src_dir_fd=f, dst_dir_fd=f)
1183        except PermissionError as e:
1184            self.skipTest('posix.link(): %s' % e)
1185        else:
1186            # should have same inodes
1187            self.assertEqual(posix.stat(os_helper.TESTFN)[1],
1188                posix.stat(os_helper.TESTFN + 'link')[1])
1189        finally:
1190            posix.close(f)
1191            os_helper.unlink(os_helper.TESTFN + 'link')
1192
1193    @unittest.skipUnless(os.mkdir in os.supports_dir_fd, "test needs dir_fd support in os.mkdir()")
1194    def test_mkdir_dir_fd(self):
1195        f = posix.open(posix.getcwd(), posix.O_RDONLY)
1196        try:
1197            posix.mkdir(os_helper.TESTFN + 'dir', dir_fd=f)
1198            posix.stat(os_helper.TESTFN + 'dir') # should not raise exception
1199        finally:
1200            posix.close(f)
1201            os_helper.rmtree(os_helper.TESTFN + 'dir')
1202
1203    @unittest.skipUnless(hasattr(os, 'mknod')
1204                         and (os.mknod in os.supports_dir_fd)
1205                         and hasattr(stat, 'S_IFIFO'),
1206                         "test requires both stat.S_IFIFO and dir_fd support for os.mknod()")
1207    def test_mknod_dir_fd(self):
1208        # Test using mknodat() to create a FIFO (the only use specified
1209        # by POSIX).
1210        os_helper.unlink(os_helper.TESTFN)
1211        mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR
1212        f = posix.open(posix.getcwd(), posix.O_RDONLY)
1213        try:
1214            posix.mknod(os_helper.TESTFN, mode, 0, dir_fd=f)
1215        except OSError as e:
1216            # Some old systems don't allow unprivileged users to use
1217            # mknod(), or only support creating device nodes.
1218            self.assertIn(e.errno, (errno.EPERM, errno.EINVAL, errno.EACCES))
1219        else:
1220            self.assertTrue(stat.S_ISFIFO(posix.stat(os_helper.TESTFN).st_mode))
1221        finally:
1222            posix.close(f)
1223
1224    @unittest.skipUnless(os.open in os.supports_dir_fd, "test needs dir_fd support in os.open()")
1225    def test_open_dir_fd(self):
1226        os_helper.unlink(os_helper.TESTFN)
1227        with open(os_helper.TESTFN, 'w') as outfile:
1228            outfile.write("testline\n")
1229        a = posix.open(posix.getcwd(), posix.O_RDONLY)
1230        b = posix.open(os_helper.TESTFN, posix.O_RDONLY, dir_fd=a)
1231        try:
1232            res = posix.read(b, 9).decode(encoding="utf-8")
1233            self.assertEqual("testline\n", res)
1234        finally:
1235            posix.close(a)
1236            posix.close(b)
1237
1238    @unittest.skipUnless(hasattr(os, 'readlink') and (os.readlink in os.supports_dir_fd),
1239                         "test needs dir_fd support in os.readlink()")
1240    def test_readlink_dir_fd(self):
1241        os.symlink(os_helper.TESTFN, os_helper.TESTFN + 'link')
1242        f = posix.open(posix.getcwd(), posix.O_RDONLY)
1243        try:
1244            self.assertEqual(posix.readlink(os_helper.TESTFN + 'link'),
1245                posix.readlink(os_helper.TESTFN + 'link', dir_fd=f))
1246        finally:
1247            os_helper.unlink(os_helper.TESTFN + 'link')
1248            posix.close(f)
1249
1250    @unittest.skipUnless(os.rename in os.supports_dir_fd, "test needs dir_fd support in os.rename()")
1251    def test_rename_dir_fd(self):
1252        os_helper.unlink(os_helper.TESTFN)
1253        os_helper.create_empty_file(os_helper.TESTFN + 'ren')
1254        f = posix.open(posix.getcwd(), posix.O_RDONLY)
1255        try:
1256            posix.rename(os_helper.TESTFN + 'ren', os_helper.TESTFN, src_dir_fd=f, dst_dir_fd=f)
1257        except:
1258            posix.rename(os_helper.TESTFN + 'ren', os_helper.TESTFN)
1259            raise
1260        else:
1261            posix.stat(os_helper.TESTFN) # should not raise exception
1262        finally:
1263            posix.close(f)
1264
1265    @unittest.skipUnless(hasattr(signal, 'SIGCHLD'), 'CLD_XXXX be placed in si_code for a SIGCHLD signal')
1266    @unittest.skipUnless(hasattr(os, 'waitid_result'), "test needs os.waitid_result")
1267    def test_cld_xxxx_constants(self):
1268        os.CLD_EXITED
1269        os.CLD_KILLED
1270        os.CLD_DUMPED
1271        os.CLD_TRAPPED
1272        os.CLD_STOPPED
1273        os.CLD_CONTINUED
1274
1275    @unittest.skipUnless(os.symlink in os.supports_dir_fd, "test needs dir_fd support in os.symlink()")
1276    def test_symlink_dir_fd(self):
1277        f = posix.open(posix.getcwd(), posix.O_RDONLY)
1278        try:
1279            posix.symlink(os_helper.TESTFN, os_helper.TESTFN + 'link',
1280                          dir_fd=f)
1281            self.assertEqual(posix.readlink(os_helper.TESTFN + 'link'),
1282                             os_helper.TESTFN)
1283        finally:
1284            posix.close(f)
1285            os_helper.unlink(os_helper.TESTFN + 'link')
1286
1287    @unittest.skipUnless(os.unlink in os.supports_dir_fd, "test needs dir_fd support in os.unlink()")
1288    def test_unlink_dir_fd(self):
1289        f = posix.open(posix.getcwd(), posix.O_RDONLY)
1290        os_helper.create_empty_file(os_helper.TESTFN + 'del')
1291        posix.stat(os_helper.TESTFN + 'del') # should not raise exception
1292        try:
1293            posix.unlink(os_helper.TESTFN + 'del', dir_fd=f)
1294        except:
1295            os_helper.unlink(os_helper.TESTFN + 'del')
1296            raise
1297        else:
1298            self.assertRaises(OSError, posix.stat, os_helper.TESTFN + 'link')
1299        finally:
1300            posix.close(f)
1301
1302    @unittest.skipUnless(os.mkfifo in os.supports_dir_fd, "test needs dir_fd support in os.mkfifo()")
1303    def test_mkfifo_dir_fd(self):
1304        os_helper.unlink(os_helper.TESTFN)
1305        f = posix.open(posix.getcwd(), posix.O_RDONLY)
1306        try:
1307            try:
1308                posix.mkfifo(os_helper.TESTFN,
1309                             stat.S_IRUSR | stat.S_IWUSR, dir_fd=f)
1310            except PermissionError as e:
1311                self.skipTest('posix.mkfifo(): %s' % e)
1312            self.assertTrue(stat.S_ISFIFO(posix.stat(os_helper.TESTFN).st_mode))
1313        finally:
1314            posix.close(f)
1315
1316    requires_sched_h = unittest.skipUnless(hasattr(posix, 'sched_yield'),
1317                                           "don't have scheduling support")
1318    requires_sched_affinity = unittest.skipUnless(hasattr(posix, 'sched_setaffinity'),
1319                                                  "don't have sched affinity support")
1320
1321    @requires_sched_h
1322    def test_sched_yield(self):
1323        # This has no error conditions (at least on Linux).
1324        posix.sched_yield()
1325
1326    @requires_sched_h
1327    @unittest.skipUnless(hasattr(posix, 'sched_get_priority_max'),
1328                         "requires sched_get_priority_max()")
1329    def test_sched_priority(self):
1330        # Round-robin usually has interesting priorities.
1331        pol = posix.SCHED_RR
1332        lo = posix.sched_get_priority_min(pol)
1333        hi = posix.sched_get_priority_max(pol)
1334        self.assertIsInstance(lo, int)
1335        self.assertIsInstance(hi, int)
1336        self.assertGreaterEqual(hi, lo)
1337        # OSX evidently just returns 15 without checking the argument.
1338        if sys.platform != "darwin":
1339            self.assertRaises(OSError, posix.sched_get_priority_min, -23)
1340            self.assertRaises(OSError, posix.sched_get_priority_max, -23)
1341
1342    @requires_sched
1343    def test_get_and_set_scheduler_and_param(self):
1344        possible_schedulers = [sched for name, sched in posix.__dict__.items()
1345                               if name.startswith("SCHED_")]
1346        mine = posix.sched_getscheduler(0)
1347        self.assertIn(mine, possible_schedulers)
1348        try:
1349            parent = posix.sched_getscheduler(os.getppid())
1350        except OSError as e:
1351            if e.errno != errno.EPERM:
1352                raise
1353        else:
1354            self.assertIn(parent, possible_schedulers)
1355        self.assertRaises(OSError, posix.sched_getscheduler, -1)
1356        self.assertRaises(OSError, posix.sched_getparam, -1)
1357        param = posix.sched_getparam(0)
1358        self.assertIsInstance(param.sched_priority, int)
1359
1360        # POSIX states that calling sched_setparam() or sched_setscheduler() on
1361        # a process with a scheduling policy other than SCHED_FIFO or SCHED_RR
1362        # is implementation-defined: NetBSD and FreeBSD can return EINVAL.
1363        if not sys.platform.startswith(('freebsd', 'netbsd')):
1364            try:
1365                posix.sched_setscheduler(0, mine, param)
1366                posix.sched_setparam(0, param)
1367            except OSError as e:
1368                if e.errno != errno.EPERM:
1369                    raise
1370            self.assertRaises(OSError, posix.sched_setparam, -1, param)
1371
1372        self.assertRaises(OSError, posix.sched_setscheduler, -1, mine, param)
1373        self.assertRaises(TypeError, posix.sched_setscheduler, 0, mine, None)
1374        self.assertRaises(TypeError, posix.sched_setparam, 0, 43)
1375        param = posix.sched_param(None)
1376        self.assertRaises(TypeError, posix.sched_setparam, 0, param)
1377        large = 214748364700
1378        param = posix.sched_param(large)
1379        self.assertRaises(OverflowError, posix.sched_setparam, 0, param)
1380        param = posix.sched_param(sched_priority=-large)
1381        self.assertRaises(OverflowError, posix.sched_setparam, 0, param)
1382
1383    @unittest.skipUnless(hasattr(posix, "sched_rr_get_interval"), "no function")
1384    def test_sched_rr_get_interval(self):
1385        try:
1386            interval = posix.sched_rr_get_interval(0)
1387        except OSError as e:
1388            # This likely means that sched_rr_get_interval is only valid for
1389            # processes with the SCHED_RR scheduler in effect.
1390            if e.errno != errno.EINVAL:
1391                raise
1392            self.skipTest("only works on SCHED_RR processes")
1393        self.assertIsInstance(interval, float)
1394        # Reasonable constraints, I think.
1395        self.assertGreaterEqual(interval, 0.)
1396        self.assertLess(interval, 1.)
1397
1398    @requires_sched_affinity
1399    def test_sched_getaffinity(self):
1400        mask = posix.sched_getaffinity(0)
1401        self.assertIsInstance(mask, set)
1402        self.assertGreaterEqual(len(mask), 1)
1403        self.assertRaises(OSError, posix.sched_getaffinity, -1)
1404        for cpu in mask:
1405            self.assertIsInstance(cpu, int)
1406            self.assertGreaterEqual(cpu, 0)
1407            self.assertLess(cpu, 1 << 32)
1408
1409    @requires_sched_affinity
1410    def test_sched_setaffinity(self):
1411        mask = posix.sched_getaffinity(0)
1412        if len(mask) > 1:
1413            # Empty masks are forbidden
1414            mask.pop()
1415        posix.sched_setaffinity(0, mask)
1416        self.assertEqual(posix.sched_getaffinity(0), mask)
1417        self.assertRaises(OSError, posix.sched_setaffinity, 0, [])
1418        self.assertRaises(ValueError, posix.sched_setaffinity, 0, [-10])
1419        self.assertRaises(ValueError, posix.sched_setaffinity, 0, map(int, "0X"))
1420        self.assertRaises(OverflowError, posix.sched_setaffinity, 0, [1<<128])
1421        self.assertRaises(OSError, posix.sched_setaffinity, -1, mask)
1422
1423    def test_rtld_constants(self):
1424        # check presence of major RTLD_* constants
1425        posix.RTLD_LAZY
1426        posix.RTLD_NOW
1427        posix.RTLD_GLOBAL
1428        posix.RTLD_LOCAL
1429
1430    @unittest.skipUnless(hasattr(os, 'SEEK_HOLE'),
1431                         "test needs an OS that reports file holes")
1432    def test_fs_holes(self):
1433        # Even if the filesystem doesn't report holes,
1434        # if the OS supports it the SEEK_* constants
1435        # will be defined and will have a consistent
1436        # behaviour:
1437        # os.SEEK_DATA = current position
1438        # os.SEEK_HOLE = end of file position
1439        with open(os_helper.TESTFN, 'r+b') as fp:
1440            fp.write(b"hello")
1441            fp.flush()
1442            size = fp.tell()
1443            fno = fp.fileno()
1444            try :
1445                for i in range(size):
1446                    self.assertEqual(i, os.lseek(fno, i, os.SEEK_DATA))
1447                    self.assertLessEqual(size, os.lseek(fno, i, os.SEEK_HOLE))
1448                self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_DATA)
1449                self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_HOLE)
1450            except OSError :
1451                # Some OSs claim to support SEEK_HOLE/SEEK_DATA
1452                # but it is not true.
1453                # For instance:
1454                # http://lists.freebsd.org/pipermail/freebsd-amd64/2012-January/014332.html
1455                raise unittest.SkipTest("OSError raised!")
1456
1457    def test_path_error2(self):
1458        """
1459        Test functions that call path_error2(), providing two filenames in their exceptions.
1460        """
1461        for name in ("rename", "replace", "link"):
1462            function = getattr(os, name, None)
1463            if function is None:
1464                continue
1465
1466            for dst in ("noodly2", os_helper.TESTFN):
1467                try:
1468                    function('doesnotexistfilename', dst)
1469                except OSError as e:
1470                    self.assertIn("'doesnotexistfilename' -> '{}'".format(dst), str(e))
1471                    break
1472            else:
1473                self.fail("No valid path_error2() test for os." + name)
1474
1475    def test_path_with_null_character(self):
1476        fn = os_helper.TESTFN
1477        fn_with_NUL = fn + '\0'
1478        self.addCleanup(os_helper.unlink, fn)
1479        os_helper.unlink(fn)
1480        fd = None
1481        try:
1482            with self.assertRaises(ValueError):
1483                fd = os.open(fn_with_NUL, os.O_WRONLY | os.O_CREAT) # raises
1484        finally:
1485            if fd is not None:
1486                os.close(fd)
1487        self.assertFalse(os.path.exists(fn))
1488        self.assertRaises(ValueError, os.mkdir, fn_with_NUL)
1489        self.assertFalse(os.path.exists(fn))
1490        open(fn, 'wb').close()
1491        self.assertRaises(ValueError, os.stat, fn_with_NUL)
1492
1493    def test_path_with_null_byte(self):
1494        fn = os.fsencode(os_helper.TESTFN)
1495        fn_with_NUL = fn + b'\0'
1496        self.addCleanup(os_helper.unlink, fn)
1497        os_helper.unlink(fn)
1498        fd = None
1499        try:
1500            with self.assertRaises(ValueError):
1501                fd = os.open(fn_with_NUL, os.O_WRONLY | os.O_CREAT) # raises
1502        finally:
1503            if fd is not None:
1504                os.close(fd)
1505        self.assertFalse(os.path.exists(fn))
1506        self.assertRaises(ValueError, os.mkdir, fn_with_NUL)
1507        self.assertFalse(os.path.exists(fn))
1508        open(fn, 'wb').close()
1509        self.assertRaises(ValueError, os.stat, fn_with_NUL)
1510
1511    @unittest.skipUnless(hasattr(os, "pidfd_open"), "pidfd_open unavailable")
1512    def test_pidfd_open(self):
1513        with self.assertRaises(OSError) as cm:
1514            os.pidfd_open(-1)
1515        if cm.exception.errno == errno.ENOSYS:
1516            self.skipTest("system does not support pidfd_open")
1517        if isinstance(cm.exception, PermissionError):
1518            self.skipTest(f"pidfd_open syscall blocked: {cm.exception!r}")
1519        self.assertEqual(cm.exception.errno, errno.EINVAL)
1520        os.close(os.pidfd_open(os.getpid(), 0))
1521
1522class PosixGroupsTester(unittest.TestCase):
1523
1524    def setUp(self):
1525        if posix.getuid() != 0:
1526            raise unittest.SkipTest("not enough privileges")
1527        if not hasattr(posix, 'getgroups'):
1528            raise unittest.SkipTest("need posix.getgroups")
1529        if sys.platform == 'darwin':
1530            raise unittest.SkipTest("getgroups(2) is broken on OSX")
1531        self.saved_groups = posix.getgroups()
1532
1533    def tearDown(self):
1534        if hasattr(posix, 'setgroups'):
1535            posix.setgroups(self.saved_groups)
1536        elif hasattr(posix, 'initgroups'):
1537            name = pwd.getpwuid(posix.getuid()).pw_name
1538            posix.initgroups(name, self.saved_groups[0])
1539
1540    @unittest.skipUnless(hasattr(posix, 'initgroups'),
1541                         "test needs posix.initgroups()")
1542    def test_initgroups(self):
1543        # find missing group
1544
1545        g = max(self.saved_groups or [0]) + 1
1546        name = pwd.getpwuid(posix.getuid()).pw_name
1547        posix.initgroups(name, g)
1548        self.assertIn(g, posix.getgroups())
1549
1550    @unittest.skipUnless(hasattr(posix, 'setgroups'),
1551                         "test needs posix.setgroups()")
1552    def test_setgroups(self):
1553        for groups in [[0], list(range(16))]:
1554            posix.setgroups(groups)
1555            self.assertListEqual(groups, posix.getgroups())
1556
1557
1558class _PosixSpawnMixin:
1559    # Program which does nothing and exits with status 0 (success)
1560    NOOP_PROGRAM = (sys.executable, '-I', '-S', '-c', 'pass')
1561    spawn_func = None
1562
1563    def python_args(self, *args):
1564        # Disable site module to avoid side effects. For example,
1565        # on Fedora 28, if the HOME environment variable is not set,
1566        # site._getuserbase() calls pwd.getpwuid() which opens
1567        # /var/lib/sss/mc/passwd but then leaves the file open which makes
1568        # test_close_file() to fail.
1569        return (sys.executable, '-I', '-S', *args)
1570
1571    def test_returns_pid(self):
1572        pidfile = os_helper.TESTFN
1573        self.addCleanup(os_helper.unlink, pidfile)
1574        script = f"""if 1:
1575            import os
1576            with open({pidfile!r}, "w") as pidfile:
1577                pidfile.write(str(os.getpid()))
1578            """
1579        args = self.python_args('-c', script)
1580        pid = self.spawn_func(args[0], args, os.environ)
1581        support.wait_process(pid, exitcode=0)
1582        with open(pidfile, encoding="utf-8") as f:
1583            self.assertEqual(f.read(), str(pid))
1584
1585    def test_no_such_executable(self):
1586        no_such_executable = 'no_such_executable'
1587        try:
1588            pid = self.spawn_func(no_such_executable,
1589                                  [no_such_executable],
1590                                  os.environ)
1591        # bpo-35794: PermissionError can be raised if there are
1592        # directories in the $PATH that are not accessible.
1593        except (FileNotFoundError, PermissionError) as exc:
1594            self.assertEqual(exc.filename, no_such_executable)
1595        else:
1596            pid2, status = os.waitpid(pid, 0)
1597            self.assertEqual(pid2, pid)
1598            self.assertNotEqual(status, 0)
1599
1600    def test_specify_environment(self):
1601        envfile = os_helper.TESTFN
1602        self.addCleanup(os_helper.unlink, envfile)
1603        script = f"""if 1:
1604            import os
1605            with open({envfile!r}, "w", encoding="utf-8") as envfile:
1606                envfile.write(os.environ['foo'])
1607        """
1608        args = self.python_args('-c', script)
1609        pid = self.spawn_func(args[0], args,
1610                              {**os.environ, 'foo': 'bar'})
1611        support.wait_process(pid, exitcode=0)
1612        with open(envfile, encoding="utf-8") as f:
1613            self.assertEqual(f.read(), 'bar')
1614
1615    def test_none_file_actions(self):
1616        pid = self.spawn_func(
1617            self.NOOP_PROGRAM[0],
1618            self.NOOP_PROGRAM,
1619            os.environ,
1620            file_actions=None
1621        )
1622        support.wait_process(pid, exitcode=0)
1623
1624    def test_empty_file_actions(self):
1625        pid = self.spawn_func(
1626            self.NOOP_PROGRAM[0],
1627            self.NOOP_PROGRAM,
1628            os.environ,
1629            file_actions=[]
1630        )
1631        support.wait_process(pid, exitcode=0)
1632
1633    def test_resetids_explicit_default(self):
1634        pid = self.spawn_func(
1635            sys.executable,
1636            [sys.executable, '-c', 'pass'],
1637            os.environ,
1638            resetids=False
1639        )
1640        support.wait_process(pid, exitcode=0)
1641
1642    def test_resetids(self):
1643        pid = self.spawn_func(
1644            sys.executable,
1645            [sys.executable, '-c', 'pass'],
1646            os.environ,
1647            resetids=True
1648        )
1649        support.wait_process(pid, exitcode=0)
1650
1651    def test_resetids_wrong_type(self):
1652        with self.assertRaises(TypeError):
1653            self.spawn_func(sys.executable,
1654                            [sys.executable, "-c", "pass"],
1655                            os.environ, resetids=None)
1656
1657    def test_setpgroup(self):
1658        pid = self.spawn_func(
1659            sys.executable,
1660            [sys.executable, '-c', 'pass'],
1661            os.environ,
1662            setpgroup=os.getpgrp()
1663        )
1664        support.wait_process(pid, exitcode=0)
1665
1666    def test_setpgroup_wrong_type(self):
1667        with self.assertRaises(TypeError):
1668            self.spawn_func(sys.executable,
1669                            [sys.executable, "-c", "pass"],
1670                            os.environ, setpgroup="023")
1671
1672    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
1673                           'need signal.pthread_sigmask()')
1674    def test_setsigmask(self):
1675        code = textwrap.dedent("""\
1676            import signal
1677            signal.raise_signal(signal.SIGUSR1)""")
1678
1679        pid = self.spawn_func(
1680            sys.executable,
1681            [sys.executable, '-c', code],
1682            os.environ,
1683            setsigmask=[signal.SIGUSR1]
1684        )
1685        support.wait_process(pid, exitcode=0)
1686
1687    def test_setsigmask_wrong_type(self):
1688        with self.assertRaises(TypeError):
1689            self.spawn_func(sys.executable,
1690                            [sys.executable, "-c", "pass"],
1691                            os.environ, setsigmask=34)
1692        with self.assertRaises(TypeError):
1693            self.spawn_func(sys.executable,
1694                            [sys.executable, "-c", "pass"],
1695                            os.environ, setsigmask=["j"])
1696        with self.assertRaises(ValueError):
1697            self.spawn_func(sys.executable,
1698                            [sys.executable, "-c", "pass"],
1699                            os.environ, setsigmask=[signal.NSIG,
1700                                                    signal.NSIG+1])
1701
1702    def test_setsid(self):
1703        rfd, wfd = os.pipe()
1704        self.addCleanup(os.close, rfd)
1705        try:
1706            os.set_inheritable(wfd, True)
1707
1708            code = textwrap.dedent(f"""
1709                import os
1710                fd = {wfd}
1711                sid = os.getsid(0)
1712                os.write(fd, str(sid).encode())
1713            """)
1714
1715            try:
1716                pid = self.spawn_func(sys.executable,
1717                                      [sys.executable, "-c", code],
1718                                      os.environ, setsid=True)
1719            except NotImplementedError as exc:
1720                self.skipTest(f"setsid is not supported: {exc!r}")
1721            except PermissionError as exc:
1722                self.skipTest(f"setsid failed with: {exc!r}")
1723        finally:
1724            os.close(wfd)
1725
1726        support.wait_process(pid, exitcode=0)
1727
1728        output = os.read(rfd, 100)
1729        child_sid = int(output)
1730        parent_sid = os.getsid(os.getpid())
1731        self.assertNotEqual(parent_sid, child_sid)
1732
1733    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
1734                         'need signal.pthread_sigmask()')
1735    def test_setsigdef(self):
1736        original_handler = signal.signal(signal.SIGUSR1, signal.SIG_IGN)
1737        code = textwrap.dedent("""\
1738            import signal
1739            signal.raise_signal(signal.SIGUSR1)""")
1740        try:
1741            pid = self.spawn_func(
1742                sys.executable,
1743                [sys.executable, '-c', code],
1744                os.environ,
1745                setsigdef=[signal.SIGUSR1]
1746            )
1747        finally:
1748            signal.signal(signal.SIGUSR1, original_handler)
1749
1750        support.wait_process(pid, exitcode=-signal.SIGUSR1)
1751
1752    def test_setsigdef_wrong_type(self):
1753        with self.assertRaises(TypeError):
1754            self.spawn_func(sys.executable,
1755                            [sys.executable, "-c", "pass"],
1756                            os.environ, setsigdef=34)
1757        with self.assertRaises(TypeError):
1758            self.spawn_func(sys.executable,
1759                            [sys.executable, "-c", "pass"],
1760                            os.environ, setsigdef=["j"])
1761        with self.assertRaises(ValueError):
1762            self.spawn_func(sys.executable,
1763                            [sys.executable, "-c", "pass"],
1764                            os.environ, setsigdef=[signal.NSIG, signal.NSIG+1])
1765
1766    @requires_sched
1767    @unittest.skipIf(sys.platform.startswith(('freebsd', 'netbsd')),
1768                     "bpo-34685: test can fail on BSD")
1769    def test_setscheduler_only_param(self):
1770        policy = os.sched_getscheduler(0)
1771        priority = os.sched_get_priority_min(policy)
1772        code = textwrap.dedent(f"""\
1773            import os, sys
1774            if os.sched_getscheduler(0) != {policy}:
1775                sys.exit(101)
1776            if os.sched_getparam(0).sched_priority != {priority}:
1777                sys.exit(102)""")
1778        pid = self.spawn_func(
1779            sys.executable,
1780            [sys.executable, '-c', code],
1781            os.environ,
1782            scheduler=(None, os.sched_param(priority))
1783        )
1784        support.wait_process(pid, exitcode=0)
1785
1786    @requires_sched
1787    @unittest.skipIf(sys.platform.startswith(('freebsd', 'netbsd')),
1788                     "bpo-34685: test can fail on BSD")
1789    def test_setscheduler_with_policy(self):
1790        policy = os.sched_getscheduler(0)
1791        priority = os.sched_get_priority_min(policy)
1792        code = textwrap.dedent(f"""\
1793            import os, sys
1794            if os.sched_getscheduler(0) != {policy}:
1795                sys.exit(101)
1796            if os.sched_getparam(0).sched_priority != {priority}:
1797                sys.exit(102)""")
1798        pid = self.spawn_func(
1799            sys.executable,
1800            [sys.executable, '-c', code],
1801            os.environ,
1802            scheduler=(policy, os.sched_param(priority))
1803        )
1804        support.wait_process(pid, exitcode=0)
1805
1806    def test_multiple_file_actions(self):
1807        file_actions = [
1808            (os.POSIX_SPAWN_OPEN, 3, os.path.realpath(__file__), os.O_RDONLY, 0),
1809            (os.POSIX_SPAWN_CLOSE, 0),
1810            (os.POSIX_SPAWN_DUP2, 1, 4),
1811        ]
1812        pid = self.spawn_func(self.NOOP_PROGRAM[0],
1813                              self.NOOP_PROGRAM,
1814                              os.environ,
1815                              file_actions=file_actions)
1816        support.wait_process(pid, exitcode=0)
1817
1818    def test_bad_file_actions(self):
1819        args = self.NOOP_PROGRAM
1820        with self.assertRaises(TypeError):
1821            self.spawn_func(args[0], args, os.environ,
1822                            file_actions=[None])
1823        with self.assertRaises(TypeError):
1824            self.spawn_func(args[0], args, os.environ,
1825                            file_actions=[()])
1826        with self.assertRaises(TypeError):
1827            self.spawn_func(args[0], args, os.environ,
1828                            file_actions=[(None,)])
1829        with self.assertRaises(TypeError):
1830            self.spawn_func(args[0], args, os.environ,
1831                            file_actions=[(12345,)])
1832        with self.assertRaises(TypeError):
1833            self.spawn_func(args[0], args, os.environ,
1834                            file_actions=[(os.POSIX_SPAWN_CLOSE,)])
1835        with self.assertRaises(TypeError):
1836            self.spawn_func(args[0], args, os.environ,
1837                            file_actions=[(os.POSIX_SPAWN_CLOSE, 1, 2)])
1838        with self.assertRaises(TypeError):
1839            self.spawn_func(args[0], args, os.environ,
1840                            file_actions=[(os.POSIX_SPAWN_CLOSE, None)])
1841        with self.assertRaises(ValueError):
1842            self.spawn_func(args[0], args, os.environ,
1843                            file_actions=[(os.POSIX_SPAWN_OPEN,
1844                                           3, __file__ + '\0',
1845                                           os.O_RDONLY, 0)])
1846
1847    def test_open_file(self):
1848        outfile = os_helper.TESTFN
1849        self.addCleanup(os_helper.unlink, outfile)
1850        script = """if 1:
1851            import sys
1852            sys.stdout.write("hello")
1853            """
1854        file_actions = [
1855            (os.POSIX_SPAWN_OPEN, 1, outfile,
1856                os.O_WRONLY | os.O_CREAT | os.O_TRUNC,
1857                stat.S_IRUSR | stat.S_IWUSR),
1858        ]
1859        args = self.python_args('-c', script)
1860        pid = self.spawn_func(args[0], args, os.environ,
1861                              file_actions=file_actions)
1862
1863        support.wait_process(pid, exitcode=0)
1864        with open(outfile, encoding="utf-8") as f:
1865            self.assertEqual(f.read(), 'hello')
1866
1867    def test_close_file(self):
1868        closefile = os_helper.TESTFN
1869        self.addCleanup(os_helper.unlink, closefile)
1870        script = f"""if 1:
1871            import os
1872            try:
1873                os.fstat(0)
1874            except OSError as e:
1875                with open({closefile!r}, 'w', encoding='utf-8') as closefile:
1876                    closefile.write('is closed %d' % e.errno)
1877            """
1878        args = self.python_args('-c', script)
1879        pid = self.spawn_func(args[0], args, os.environ,
1880                              file_actions=[(os.POSIX_SPAWN_CLOSE, 0)])
1881
1882        support.wait_process(pid, exitcode=0)
1883        with open(closefile, encoding="utf-8") as f:
1884            self.assertEqual(f.read(), 'is closed %d' % errno.EBADF)
1885
1886    def test_dup2(self):
1887        dupfile = os_helper.TESTFN
1888        self.addCleanup(os_helper.unlink, dupfile)
1889        script = """if 1:
1890            import sys
1891            sys.stdout.write("hello")
1892            """
1893        with open(dupfile, "wb") as childfile:
1894            file_actions = [
1895                (os.POSIX_SPAWN_DUP2, childfile.fileno(), 1),
1896            ]
1897            args = self.python_args('-c', script)
1898            pid = self.spawn_func(args[0], args, os.environ,
1899                                  file_actions=file_actions)
1900            support.wait_process(pid, exitcode=0)
1901        with open(dupfile, encoding="utf-8") as f:
1902            self.assertEqual(f.read(), 'hello')
1903
1904
1905@unittest.skipUnless(hasattr(os, 'posix_spawn'), "test needs os.posix_spawn")
1906class TestPosixSpawn(unittest.TestCase, _PosixSpawnMixin):
1907    spawn_func = getattr(posix, 'posix_spawn', None)
1908
1909
1910@unittest.skipUnless(hasattr(os, 'posix_spawnp'), "test needs os.posix_spawnp")
1911class TestPosixSpawnP(unittest.TestCase, _PosixSpawnMixin):
1912    spawn_func = getattr(posix, 'posix_spawnp', None)
1913
1914    @os_helper.skip_unless_symlink
1915    def test_posix_spawnp(self):
1916        # Use a symlink to create a program in its own temporary directory
1917        temp_dir = tempfile.mkdtemp()
1918        self.addCleanup(os_helper.rmtree, temp_dir)
1919
1920        program = 'posix_spawnp_test_program.exe'
1921        program_fullpath = os.path.join(temp_dir, program)
1922        os.symlink(sys.executable, program_fullpath)
1923
1924        try:
1925            path = os.pathsep.join((temp_dir, os.environ['PATH']))
1926        except KeyError:
1927            path = temp_dir   # PATH is not set
1928
1929        spawn_args = (program, '-I', '-S', '-c', 'pass')
1930        code = textwrap.dedent("""
1931            import os
1932            from test import support
1933
1934            args = %a
1935            pid = os.posix_spawnp(args[0], args, os.environ)
1936
1937            support.wait_process(pid, exitcode=0)
1938        """ % (spawn_args,))
1939
1940        # Use a subprocess to test os.posix_spawnp() with a modified PATH
1941        # environment variable: posix_spawnp() uses the current environment
1942        # to locate the program, not its environment argument.
1943        args = ('-c', code)
1944        assert_python_ok(*args, PATH=path)
1945
1946
1947@unittest.skipUnless(sys.platform == "darwin", "test weak linking on macOS")
1948class TestPosixWeaklinking(unittest.TestCase):
1949    # These test cases verify that weak linking support on macOS works
1950    # as expected. These cases only test new behaviour introduced by weak linking,
1951    # regular behaviour is tested by the normal test cases.
1952    #
1953    # See the section on Weak Linking in Mac/README.txt for more information.
1954    def setUp(self):
1955        import sysconfig
1956        import platform
1957
1958        config_vars = sysconfig.get_config_vars()
1959        self.available = { nm for nm in config_vars if nm.startswith("HAVE_") and config_vars[nm] }
1960        self.mac_ver = tuple(int(part) for part in platform.mac_ver()[0].split("."))
1961
1962    def _verify_available(self, name):
1963        if name not in self.available:
1964            raise unittest.SkipTest(f"{name} not weak-linked")
1965
1966    def test_pwritev(self):
1967        self._verify_available("HAVE_PWRITEV")
1968        if self.mac_ver >= (10, 16):
1969            self.assertTrue(hasattr(os, "pwritev"), "os.pwritev is not available")
1970            self.assertTrue(hasattr(os, "preadv"), "os.readv is not available")
1971
1972        else:
1973            self.assertFalse(hasattr(os, "pwritev"), "os.pwritev is available")
1974            self.assertFalse(hasattr(os, "preadv"), "os.readv is available")
1975
1976    def test_stat(self):
1977        self._verify_available("HAVE_FSTATAT")
1978        if self.mac_ver >= (10, 10):
1979            self.assertIn("HAVE_FSTATAT", posix._have_functions)
1980
1981        else:
1982            self.assertNotIn("HAVE_FSTATAT", posix._have_functions)
1983
1984            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
1985                os.stat("file", dir_fd=0)
1986
1987    def test_access(self):
1988        self._verify_available("HAVE_FACCESSAT")
1989        if self.mac_ver >= (10, 10):
1990            self.assertIn("HAVE_FACCESSAT", posix._have_functions)
1991
1992        else:
1993            self.assertNotIn("HAVE_FACCESSAT", posix._have_functions)
1994
1995            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
1996                os.access("file", os.R_OK, dir_fd=0)
1997
1998            with self.assertRaisesRegex(NotImplementedError, "follow_symlinks unavailable"):
1999                os.access("file", os.R_OK, follow_symlinks=False)
2000
2001            with self.assertRaisesRegex(NotImplementedError, "effective_ids unavailable"):
2002                os.access("file", os.R_OK, effective_ids=True)
2003
2004    def test_chmod(self):
2005        self._verify_available("HAVE_FCHMODAT")
2006        if self.mac_ver >= (10, 10):
2007            self.assertIn("HAVE_FCHMODAT", posix._have_functions)
2008
2009        else:
2010            self.assertNotIn("HAVE_FCHMODAT", posix._have_functions)
2011            self.assertIn("HAVE_LCHMOD", posix._have_functions)
2012
2013            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
2014                os.chmod("file", 0o644, dir_fd=0)
2015
2016    def test_chown(self):
2017        self._verify_available("HAVE_FCHOWNAT")
2018        if self.mac_ver >= (10, 10):
2019            self.assertIn("HAVE_FCHOWNAT", posix._have_functions)
2020
2021        else:
2022            self.assertNotIn("HAVE_FCHOWNAT", posix._have_functions)
2023            self.assertIn("HAVE_LCHOWN", posix._have_functions)
2024
2025            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
2026                os.chown("file", 0, 0, dir_fd=0)
2027
2028    def test_link(self):
2029        self._verify_available("HAVE_LINKAT")
2030        if self.mac_ver >= (10, 10):
2031            self.assertIn("HAVE_LINKAT", posix._have_functions)
2032
2033        else:
2034            self.assertNotIn("HAVE_LINKAT", posix._have_functions)
2035
2036            with self.assertRaisesRegex(NotImplementedError, "src_dir_fd unavailable"):
2037                os.link("source", "target",  src_dir_fd=0)
2038
2039            with self.assertRaisesRegex(NotImplementedError, "dst_dir_fd unavailable"):
2040                os.link("source", "target",  dst_dir_fd=0)
2041
2042            with self.assertRaisesRegex(NotImplementedError, "src_dir_fd unavailable"):
2043                os.link("source", "target",  src_dir_fd=0, dst_dir_fd=0)
2044
2045            # issue 41355: !HAVE_LINKAT code path ignores the follow_symlinks flag
2046            with os_helper.temp_dir() as base_path:
2047                link_path = os.path.join(base_path, "link")
2048                target_path = os.path.join(base_path, "target")
2049                source_path = os.path.join(base_path, "source")
2050
2051                with open(source_path, "w") as fp:
2052                    fp.write("data")
2053
2054                os.symlink("target", link_path)
2055
2056                # Calling os.link should fail in the link(2) call, and
2057                # should not reject *follow_symlinks* (to match the
2058                # behaviour you'd get when building on a platform without
2059                # linkat)
2060                with self.assertRaises(FileExistsError):
2061                    os.link(source_path, link_path, follow_symlinks=True)
2062
2063                with self.assertRaises(FileExistsError):
2064                    os.link(source_path, link_path, follow_symlinks=False)
2065
2066
2067    def test_listdir_scandir(self):
2068        self._verify_available("HAVE_FDOPENDIR")
2069        if self.mac_ver >= (10, 10):
2070            self.assertIn("HAVE_FDOPENDIR", posix._have_functions)
2071
2072        else:
2073            self.assertNotIn("HAVE_FDOPENDIR", posix._have_functions)
2074
2075            with self.assertRaisesRegex(TypeError, "listdir: path should be string, bytes, os.PathLike or None, not int"):
2076                os.listdir(0)
2077
2078            with self.assertRaisesRegex(TypeError, "scandir: path should be string, bytes, os.PathLike or None, not int"):
2079                os.scandir(0)
2080
2081    def test_mkdir(self):
2082        self._verify_available("HAVE_MKDIRAT")
2083        if self.mac_ver >= (10, 10):
2084            self.assertIn("HAVE_MKDIRAT", posix._have_functions)
2085
2086        else:
2087            self.assertNotIn("HAVE_MKDIRAT", posix._have_functions)
2088
2089            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
2090                os.mkdir("dir", dir_fd=0)
2091
2092    def test_rename_replace(self):
2093        self._verify_available("HAVE_RENAMEAT")
2094        if self.mac_ver >= (10, 10):
2095            self.assertIn("HAVE_RENAMEAT", posix._have_functions)
2096
2097        else:
2098            self.assertNotIn("HAVE_RENAMEAT", posix._have_functions)
2099
2100            with self.assertRaisesRegex(NotImplementedError, "src_dir_fd and dst_dir_fd unavailable"):
2101                os.rename("a", "b", src_dir_fd=0)
2102
2103            with self.assertRaisesRegex(NotImplementedError, "src_dir_fd and dst_dir_fd unavailable"):
2104                os.rename("a", "b", dst_dir_fd=0)
2105
2106            with self.assertRaisesRegex(NotImplementedError, "src_dir_fd and dst_dir_fd unavailable"):
2107                os.replace("a", "b", src_dir_fd=0)
2108
2109            with self.assertRaisesRegex(NotImplementedError, "src_dir_fd and dst_dir_fd unavailable"):
2110                os.replace("a", "b", dst_dir_fd=0)
2111
2112    def test_unlink_rmdir(self):
2113        self._verify_available("HAVE_UNLINKAT")
2114        if self.mac_ver >= (10, 10):
2115            self.assertIn("HAVE_UNLINKAT", posix._have_functions)
2116
2117        else:
2118            self.assertNotIn("HAVE_UNLINKAT", posix._have_functions)
2119
2120            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
2121                os.unlink("path", dir_fd=0)
2122
2123            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
2124                os.rmdir("path", dir_fd=0)
2125
2126    def test_open(self):
2127        self._verify_available("HAVE_OPENAT")
2128        if self.mac_ver >= (10, 10):
2129            self.assertIn("HAVE_OPENAT", posix._have_functions)
2130
2131        else:
2132            self.assertNotIn("HAVE_OPENAT", posix._have_functions)
2133
2134            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
2135                os.open("path", os.O_RDONLY, dir_fd=0)
2136
2137    def test_readlink(self):
2138        self._verify_available("HAVE_READLINKAT")
2139        if self.mac_ver >= (10, 10):
2140            self.assertIn("HAVE_READLINKAT", posix._have_functions)
2141
2142        else:
2143            self.assertNotIn("HAVE_READLINKAT", posix._have_functions)
2144
2145            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
2146                os.readlink("path",  dir_fd=0)
2147
2148    def test_symlink(self):
2149        self._verify_available("HAVE_SYMLINKAT")
2150        if self.mac_ver >= (10, 10):
2151            self.assertIn("HAVE_SYMLINKAT", posix._have_functions)
2152
2153        else:
2154            self.assertNotIn("HAVE_SYMLINKAT", posix._have_functions)
2155
2156            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
2157                os.symlink("a", "b",  dir_fd=0)
2158
2159    def test_utime(self):
2160        self._verify_available("HAVE_FUTIMENS")
2161        self._verify_available("HAVE_UTIMENSAT")
2162        if self.mac_ver >= (10, 13):
2163            self.assertIn("HAVE_FUTIMENS", posix._have_functions)
2164            self.assertIn("HAVE_UTIMENSAT", posix._have_functions)
2165
2166        else:
2167            self.assertNotIn("HAVE_FUTIMENS", posix._have_functions)
2168            self.assertNotIn("HAVE_UTIMENSAT", posix._have_functions)
2169
2170            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
2171                os.utime("path", dir_fd=0)
2172
2173
2174def tearDownModule():
2175    support.reap_children()
2176
2177
2178if __name__ == '__main__':
2179    unittest.main()
2180