1"""
2This test suite exercises some system calls subject to interruption with EINTR,
3to check that it is actually handled transparently.
4It is intended to be run by the main test suite within a child process, to
5ensure there is no background thread running (so that signals are delivered to
6the correct thread).
7Signals are generated in-process using setitimer(ITIMER_REAL), which allows
8sub-second periodicity (contrarily to signal()).
9"""
10
11import contextlib
12import faulthandler
13import fcntl
14import os
15import platform
16import select
17import signal
18import socket
19import subprocess
20import sys
21import time
22import unittest
23
24from test import support
25
26@contextlib.contextmanager
27def kill_on_error(proc):
28    """Context manager killing the subprocess if a Python exception is raised."""
29    with proc:
30        try:
31            yield proc
32        except:
33            proc.kill()
34            raise
35
36
37@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
38class EINTRBaseTest(unittest.TestCase):
39    """ Base class for EINTR tests. """
40
41    # delay for initial signal delivery
42    signal_delay = 0.1
43    # signal delivery periodicity
44    signal_period = 0.1
45    # default sleep time for tests - should obviously have:
46    # sleep_time > signal_period
47    sleep_time = 0.2
48
49    def sighandler(self, signum, frame):
50        self.signals += 1
51
52    def setUp(self):
53        self.signals = 0
54        self.orig_handler = signal.signal(signal.SIGALRM, self.sighandler)
55        signal.setitimer(signal.ITIMER_REAL, self.signal_delay,
56                         self.signal_period)
57
58        # Use faulthandler as watchdog to debug when a test hangs
59        # (timeout of 10 minutes)
60        if hasattr(faulthandler, 'dump_traceback_later'):
61            faulthandler.dump_traceback_later(10 * 60, exit=True,
62                                              file=sys.__stderr__)
63
64    @staticmethod
65    def stop_alarm():
66        signal.setitimer(signal.ITIMER_REAL, 0, 0)
67
68    def tearDown(self):
69        self.stop_alarm()
70        signal.signal(signal.SIGALRM, self.orig_handler)
71        if hasattr(faulthandler, 'cancel_dump_traceback_later'):
72            faulthandler.cancel_dump_traceback_later()
73
74    def subprocess(self, *args, **kw):
75        cmd_args = (sys.executable, '-c') + args
76        return subprocess.Popen(cmd_args, **kw)
77
78
79@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
80class OSEINTRTest(EINTRBaseTest):
81    """ EINTR tests for the os module. """
82
83    def new_sleep_process(self):
84        code = 'import time; time.sleep(%r)' % self.sleep_time
85        return self.subprocess(code)
86
87    def _test_wait_multiple(self, wait_func):
88        num = 3
89        processes = [self.new_sleep_process() for _ in range(num)]
90        for _ in range(num):
91            wait_func()
92        # Call the Popen method to avoid a ResourceWarning
93        for proc in processes:
94            proc.wait()
95
96    def test_wait(self):
97        self._test_wait_multiple(os.wait)
98
99    @unittest.skipUnless(hasattr(os, 'wait3'), 'requires wait3()')
100    def test_wait3(self):
101        self._test_wait_multiple(lambda: os.wait3(0))
102
103    def _test_wait_single(self, wait_func):
104        proc = self.new_sleep_process()
105        wait_func(proc.pid)
106        # Call the Popen method to avoid a ResourceWarning
107        proc.wait()
108
109    def test_waitpid(self):
110        self._test_wait_single(lambda pid: os.waitpid(pid, 0))
111
112    @unittest.skipUnless(hasattr(os, 'wait4'), 'requires wait4()')
113    def test_wait4(self):
114        self._test_wait_single(lambda pid: os.wait4(pid, 0))
115
116    def test_read(self):
117        rd, wr = os.pipe()
118        self.addCleanup(os.close, rd)
119        # wr closed explicitly by parent
120
121        # the payload below are smaller than PIPE_BUF, hence the writes will be
122        # atomic
123        datas = [b"hello", b"world", b"spam"]
124
125        code = '\n'.join((
126            'import os, sys, time',
127            '',
128            'wr = int(sys.argv[1])',
129            'datas = %r' % datas,
130            'sleep_time = %r' % self.sleep_time,
131            '',
132            'for data in datas:',
133            '    # let the parent block on read()',
134            '    time.sleep(sleep_time)',
135            '    os.write(wr, data)',
136        ))
137
138        proc = self.subprocess(code, str(wr), pass_fds=[wr])
139        with kill_on_error(proc):
140            os.close(wr)
141            for data in datas:
142                self.assertEqual(data, os.read(rd, len(data)))
143            self.assertEqual(proc.wait(), 0)
144
145    def test_write(self):
146        rd, wr = os.pipe()
147        self.addCleanup(os.close, wr)
148        # rd closed explicitly by parent
149
150        # we must write enough data for the write() to block
151        data = b"x" * support.PIPE_MAX_SIZE
152
153        code = '\n'.join((
154            'import io, os, sys, time',
155            '',
156            'rd = int(sys.argv[1])',
157            'sleep_time = %r' % self.sleep_time,
158            'data = b"x" * %s' % support.PIPE_MAX_SIZE,
159            'data_len = len(data)',
160            '',
161            '# let the parent block on write()',
162            'time.sleep(sleep_time)',
163            '',
164            'read_data = io.BytesIO()',
165            'while len(read_data.getvalue()) < data_len:',
166            '    chunk = os.read(rd, 2 * data_len)',
167            '    read_data.write(chunk)',
168            '',
169            'value = read_data.getvalue()',
170            'if value != data:',
171            '    raise Exception("read error: %s vs %s bytes"',
172            '                    % (len(value), data_len))',
173        ))
174
175        proc = self.subprocess(code, str(rd), pass_fds=[rd])
176        with kill_on_error(proc):
177            os.close(rd)
178            written = 0
179            while written < len(data):
180                written += os.write(wr, memoryview(data)[written:])
181            self.assertEqual(proc.wait(), 0)
182
183
184@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
185class SocketEINTRTest(EINTRBaseTest):
186    """ EINTR tests for the socket module. """
187
188    @unittest.skipUnless(hasattr(socket, 'socketpair'), 'needs socketpair()')
189    def _test_recv(self, recv_func):
190        rd, wr = socket.socketpair()
191        self.addCleanup(rd.close)
192        # wr closed explicitly by parent
193
194        # single-byte payload guard us against partial recv
195        datas = [b"x", b"y", b"z"]
196
197        code = '\n'.join((
198            'import os, socket, sys, time',
199            '',
200            'fd = int(sys.argv[1])',
201            'family = %s' % int(wr.family),
202            'sock_type = %s' % int(wr.type),
203            'datas = %r' % datas,
204            'sleep_time = %r' % self.sleep_time,
205            '',
206            'wr = socket.fromfd(fd, family, sock_type)',
207            'os.close(fd)',
208            '',
209            'with wr:',
210            '    for data in datas:',
211            '        # let the parent block on recv()',
212            '        time.sleep(sleep_time)',
213            '        wr.sendall(data)',
214        ))
215
216        fd = wr.fileno()
217        proc = self.subprocess(code, str(fd), pass_fds=[fd])
218        with kill_on_error(proc):
219            wr.close()
220            for data in datas:
221                self.assertEqual(data, recv_func(rd, len(data)))
222            self.assertEqual(proc.wait(), 0)
223
224    def test_recv(self):
225        self._test_recv(socket.socket.recv)
226
227    @unittest.skipUnless(hasattr(socket.socket, 'recvmsg'), 'needs recvmsg()')
228    def test_recvmsg(self):
229        self._test_recv(lambda sock, data: sock.recvmsg(data)[0])
230
231    def _test_send(self, send_func):
232        rd, wr = socket.socketpair()
233        self.addCleanup(wr.close)
234        # rd closed explicitly by parent
235
236        # we must send enough data for the send() to block
237        data = b"xyz" * (support.SOCK_MAX_SIZE // 3)
238
239        code = '\n'.join((
240            'import os, socket, sys, time',
241            '',
242            'fd = int(sys.argv[1])',
243            'family = %s' % int(rd.family),
244            'sock_type = %s' % int(rd.type),
245            'sleep_time = %r' % self.sleep_time,
246            'data = b"xyz" * %s' % (support.SOCK_MAX_SIZE // 3),
247            'data_len = len(data)',
248            '',
249            'rd = socket.fromfd(fd, family, sock_type)',
250            'os.close(fd)',
251            '',
252            'with rd:',
253            '    # let the parent block on send()',
254            '    time.sleep(sleep_time)',
255            '',
256            '    received_data = bytearray(data_len)',
257            '    n = 0',
258            '    while n < data_len:',
259            '        n += rd.recv_into(memoryview(received_data)[n:])',
260            '',
261            'if received_data != data:',
262            '    raise Exception("recv error: %s vs %s bytes"',
263            '                    % (len(received_data), data_len))',
264        ))
265
266        fd = rd.fileno()
267        proc = self.subprocess(code, str(fd), pass_fds=[fd])
268        with kill_on_error(proc):
269            rd.close()
270            written = 0
271            while written < len(data):
272                sent = send_func(wr, memoryview(data)[written:])
273                # sendall() returns None
274                written += len(data) if sent is None else sent
275            self.assertEqual(proc.wait(), 0)
276
277    def test_send(self):
278        self._test_send(socket.socket.send)
279
280    def test_sendall(self):
281        self._test_send(socket.socket.sendall)
282
283    @unittest.skipUnless(hasattr(socket.socket, 'sendmsg'), 'needs sendmsg()')
284    def test_sendmsg(self):
285        self._test_send(lambda sock, data: sock.sendmsg([data]))
286
287    def test_accept(self):
288        sock = socket.create_server((support.HOST, 0))
289        self.addCleanup(sock.close)
290        port = sock.getsockname()[1]
291
292        code = '\n'.join((
293            'import socket, time',
294            '',
295            'host = %r' % support.HOST,
296            'port = %s' % port,
297            'sleep_time = %r' % self.sleep_time,
298            '',
299            '# let parent block on accept()',
300            'time.sleep(sleep_time)',
301            'with socket.create_connection((host, port)):',
302            '    time.sleep(sleep_time)',
303        ))
304
305        proc = self.subprocess(code)
306        with kill_on_error(proc):
307            client_sock, _ = sock.accept()
308            client_sock.close()
309            self.assertEqual(proc.wait(), 0)
310
311    # Issue #25122: There is a race condition in the FreeBSD kernel on
312    # handling signals in the FIFO device. Skip the test until the bug is
313    # fixed in the kernel.
314    # https://bugs.freebsd.org/bugzilla/show_bug.cgi?id=203162
315    @support.requires_freebsd_version(10, 3)
316    @unittest.skipUnless(hasattr(os, 'mkfifo'), 'needs mkfifo()')
317    def _test_open(self, do_open_close_reader, do_open_close_writer):
318        filename = support.TESTFN
319
320        # Use a fifo: until the child opens it for reading, the parent will
321        # block when trying to open it for writing.
322        support.unlink(filename)
323        try:
324            os.mkfifo(filename)
325        except PermissionError as e:
326            self.skipTest('os.mkfifo(): %s' % e)
327        self.addCleanup(support.unlink, filename)
328
329        code = '\n'.join((
330            'import os, time',
331            '',
332            'path = %a' % filename,
333            'sleep_time = %r' % self.sleep_time,
334            '',
335            '# let the parent block',
336            'time.sleep(sleep_time)',
337            '',
338            do_open_close_reader,
339        ))
340
341        proc = self.subprocess(code)
342        with kill_on_error(proc):
343            do_open_close_writer(filename)
344            self.assertEqual(proc.wait(), 0)
345
346    def python_open(self, path):
347        fp = open(path, 'w')
348        fp.close()
349
350    @unittest.skipIf(sys.platform == "darwin",
351                     "hangs under macOS; see bpo-25234, bpo-35363")
352    def test_open(self):
353        self._test_open("fp = open(path, 'r')\nfp.close()",
354                        self.python_open)
355
356    def os_open(self, path):
357        fd = os.open(path, os.O_WRONLY)
358        os.close(fd)
359
360    @unittest.skipIf(sys.platform == "darwin",
361                     "hangs under macOS; see bpo-25234, bpo-35363")
362    def test_os_open(self):
363        self._test_open("fd = os.open(path, os.O_RDONLY)\nos.close(fd)",
364                        self.os_open)
365
366
367@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
368class TimeEINTRTest(EINTRBaseTest):
369    """ EINTR tests for the time module. """
370
371    def test_sleep(self):
372        t0 = time.monotonic()
373        time.sleep(self.sleep_time)
374        self.stop_alarm()
375        dt = time.monotonic() - t0
376        self.assertGreaterEqual(dt, self.sleep_time)
377
378
379@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
380# bpo-30320: Need pthread_sigmask() to block the signal, otherwise the test
381# is vulnerable to a race condition between the child and the parent processes.
382@unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
383                     'need signal.pthread_sigmask()')
384class SignalEINTRTest(EINTRBaseTest):
385    """ EINTR tests for the signal module. """
386
387    def check_sigwait(self, wait_func):
388        signum = signal.SIGUSR1
389        pid = os.getpid()
390
391        old_handler = signal.signal(signum, lambda *args: None)
392        self.addCleanup(signal.signal, signum, old_handler)
393
394        code = '\n'.join((
395            'import os, time',
396            'pid = %s' % os.getpid(),
397            'signum = %s' % int(signum),
398            'sleep_time = %r' % self.sleep_time,
399            'time.sleep(sleep_time)',
400            'os.kill(pid, signum)',
401        ))
402
403        old_mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
404        self.addCleanup(signal.pthread_sigmask, signal.SIG_UNBLOCK, [signum])
405
406        t0 = time.monotonic()
407        proc = self.subprocess(code)
408        with kill_on_error(proc):
409            wait_func(signum)
410            dt = time.monotonic() - t0
411
412        self.assertEqual(proc.wait(), 0)
413
414    @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'),
415                         'need signal.sigwaitinfo()')
416    def test_sigwaitinfo(self):
417        def wait_func(signum):
418            signal.sigwaitinfo([signum])
419
420        self.check_sigwait(wait_func)
421
422    @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
423                         'need signal.sigwaitinfo()')
424    def test_sigtimedwait(self):
425        def wait_func(signum):
426            signal.sigtimedwait([signum], 120.0)
427
428        self.check_sigwait(wait_func)
429
430
431@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
432class SelectEINTRTest(EINTRBaseTest):
433    """ EINTR tests for the select module. """
434
435    def test_select(self):
436        t0 = time.monotonic()
437        select.select([], [], [], self.sleep_time)
438        dt = time.monotonic() - t0
439        self.stop_alarm()
440        self.assertGreaterEqual(dt, self.sleep_time)
441
442    @unittest.skipIf(sys.platform == "darwin",
443                     "poll may fail on macOS; see issue #28087")
444    @unittest.skipUnless(hasattr(select, 'poll'), 'need select.poll')
445    def test_poll(self):
446        poller = select.poll()
447
448        t0 = time.monotonic()
449        poller.poll(self.sleep_time * 1e3)
450        dt = time.monotonic() - t0
451        self.stop_alarm()
452        self.assertGreaterEqual(dt, self.sleep_time)
453
454    @unittest.skipUnless(hasattr(select, 'epoll'), 'need select.epoll')
455    def test_epoll(self):
456        poller = select.epoll()
457        self.addCleanup(poller.close)
458
459        t0 = time.monotonic()
460        poller.poll(self.sleep_time)
461        dt = time.monotonic() - t0
462        self.stop_alarm()
463        self.assertGreaterEqual(dt, self.sleep_time)
464
465    @unittest.skipUnless(hasattr(select, 'kqueue'), 'need select.kqueue')
466    def test_kqueue(self):
467        kqueue = select.kqueue()
468        self.addCleanup(kqueue.close)
469
470        t0 = time.monotonic()
471        kqueue.control(None, 1, self.sleep_time)
472        dt = time.monotonic() - t0
473        self.stop_alarm()
474        self.assertGreaterEqual(dt, self.sleep_time)
475
476    @unittest.skipUnless(hasattr(select, 'devpoll'), 'need select.devpoll')
477    def test_devpoll(self):
478        poller = select.devpoll()
479        self.addCleanup(poller.close)
480
481        t0 = time.monotonic()
482        poller.poll(self.sleep_time * 1e3)
483        dt = time.monotonic() - t0
484        self.stop_alarm()
485        self.assertGreaterEqual(dt, self.sleep_time)
486
487
488class FNTLEINTRTest(EINTRBaseTest):
489    def _lock(self, lock_func, lock_name):
490        self.addCleanup(support.unlink, support.TESTFN)
491        code = '\n'.join((
492            "import fcntl, time",
493            "with open('%s', 'wb') as f:" % support.TESTFN,
494            "   fcntl.%s(f, fcntl.LOCK_EX)" % lock_name,
495            "   time.sleep(%s)" % self.sleep_time))
496        start_time = time.monotonic()
497        proc = self.subprocess(code)
498        with kill_on_error(proc):
499            with open(support.TESTFN, 'wb') as f:
500                while True:  # synchronize the subprocess
501                    dt = time.monotonic() - start_time
502                    if dt > 60.0:
503                        raise Exception("failed to sync child in %.1f sec" % dt)
504                    try:
505                        lock_func(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
506                        lock_func(f, fcntl.LOCK_UN)
507                        time.sleep(0.01)
508                    except BlockingIOError:
509                        break
510                # the child locked the file just a moment ago for 'sleep_time' seconds
511                # that means that the lock below will block for 'sleep_time' minus some
512                # potential context switch delay
513                lock_func(f, fcntl.LOCK_EX)
514                dt = time.monotonic() - start_time
515                self.assertGreaterEqual(dt, self.sleep_time)
516                self.stop_alarm()
517            proc.wait()
518
519    # Issue 35633: See https://bugs.python.org/issue35633#msg333662
520    # skip test rather than accept PermissionError from all platforms
521    @unittest.skipIf(platform.system() == "AIX", "AIX returns PermissionError")
522    def test_lockf(self):
523        self._lock(fcntl.lockf, "lockf")
524
525    def test_flock(self):
526        self._lock(fcntl.flock, "flock")
527
528
529if __name__ == "__main__":
530    unittest.main()
531