1import enum
2import errno
3import os
4import random
5import signal
6import socket
7import statistics
8import subprocess
9import sys
10import threading
11import time
12import unittest
13from test import support
14from test.support import os_helper
15from test.support.script_helper import assert_python_ok, spawn_python
16try:
17    import _testcapi
18except ImportError:
19    _testcapi = None
20
21
22class GenericTests(unittest.TestCase):
23
24    def test_enums(self):
25        for name in dir(signal):
26            sig = getattr(signal, name)
27            if name in {'SIG_DFL', 'SIG_IGN'}:
28                self.assertIsInstance(sig, signal.Handlers)
29            elif name in {'SIG_BLOCK', 'SIG_UNBLOCK', 'SIG_SETMASK'}:
30                self.assertIsInstance(sig, signal.Sigmasks)
31            elif name.startswith('SIG') and not name.startswith('SIG_'):
32                self.assertIsInstance(sig, signal.Signals)
33            elif name.startswith('CTRL_'):
34                self.assertIsInstance(sig, signal.Signals)
35                self.assertEqual(sys.platform, "win32")
36
37        CheckedSignals = enum._old_convert_(
38                enum.IntEnum, 'Signals', 'signal',
39                lambda name:
40                    name.isupper()
41                    and (name.startswith('SIG') and not name.startswith('SIG_'))
42                    or name.startswith('CTRL_'),
43                source=signal,
44                )
45        enum._test_simple_enum(CheckedSignals, signal.Signals)
46
47        CheckedHandlers = enum._old_convert_(
48                enum.IntEnum, 'Handlers', 'signal',
49                lambda name: name in ('SIG_DFL', 'SIG_IGN'),
50                source=signal,
51                )
52        enum._test_simple_enum(CheckedHandlers, signal.Handlers)
53
54        Sigmasks = getattr(signal, 'Sigmasks', None)
55        if Sigmasks is not None:
56            CheckedSigmasks = enum._old_convert_(
57                    enum.IntEnum, 'Sigmasks', 'signal',
58                    lambda name: name in ('SIG_BLOCK', 'SIG_UNBLOCK', 'SIG_SETMASK'),
59                    source=signal,
60                    )
61            enum._test_simple_enum(CheckedSigmasks, Sigmasks)
62
63
64@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
65class PosixTests(unittest.TestCase):
66    def trivial_signal_handler(self, *args):
67        pass
68
69    def test_out_of_range_signal_number_raises_error(self):
70        self.assertRaises(ValueError, signal.getsignal, 4242)
71
72        self.assertRaises(ValueError, signal.signal, 4242,
73                          self.trivial_signal_handler)
74
75        self.assertRaises(ValueError, signal.strsignal, 4242)
76
77    def test_setting_signal_handler_to_none_raises_error(self):
78        self.assertRaises(TypeError, signal.signal,
79                          signal.SIGUSR1, None)
80
81    def test_getsignal(self):
82        hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler)
83        self.assertIsInstance(hup, signal.Handlers)
84        self.assertEqual(signal.getsignal(signal.SIGHUP),
85                         self.trivial_signal_handler)
86        signal.signal(signal.SIGHUP, hup)
87        self.assertEqual(signal.getsignal(signal.SIGHUP), hup)
88
89    def test_strsignal(self):
90        self.assertIn("Interrupt", signal.strsignal(signal.SIGINT))
91        self.assertIn("Terminated", signal.strsignal(signal.SIGTERM))
92        self.assertIn("Hangup", signal.strsignal(signal.SIGHUP))
93
94    # Issue 3864, unknown if this affects earlier versions of freebsd also
95    def test_interprocess_signal(self):
96        dirname = os.path.dirname(__file__)
97        script = os.path.join(dirname, 'signalinterproctester.py')
98        assert_python_ok(script)
99
100    def test_valid_signals(self):
101        s = signal.valid_signals()
102        self.assertIsInstance(s, set)
103        self.assertIn(signal.Signals.SIGINT, s)
104        self.assertIn(signal.Signals.SIGALRM, s)
105        self.assertNotIn(0, s)
106        self.assertNotIn(signal.NSIG, s)
107        self.assertLess(len(s), signal.NSIG)
108
109    @unittest.skipUnless(sys.executable, "sys.executable required.")
110    def test_keyboard_interrupt_exit_code(self):
111        """KeyboardInterrupt triggers exit via SIGINT."""
112        process = subprocess.run(
113                [sys.executable, "-c",
114                 "import os, signal, time\n"
115                 "os.kill(os.getpid(), signal.SIGINT)\n"
116                 "for _ in range(999): time.sleep(0.01)"],
117                stderr=subprocess.PIPE)
118        self.assertIn(b"KeyboardInterrupt", process.stderr)
119        self.assertEqual(process.returncode, -signal.SIGINT)
120        # Caveat: The exit code is insufficient to guarantee we actually died
121        # via a signal.  POSIX shells do more than look at the 8 bit value.
122        # Writing an automation friendly test of an interactive shell
123        # to confirm that our process died via a SIGINT proved too complex.
124
125
126@unittest.skipUnless(sys.platform == "win32", "Windows specific")
127class WindowsSignalTests(unittest.TestCase):
128
129    def test_valid_signals(self):
130        s = signal.valid_signals()
131        self.assertIsInstance(s, set)
132        self.assertGreaterEqual(len(s), 6)
133        self.assertIn(signal.Signals.SIGINT, s)
134        self.assertNotIn(0, s)
135        self.assertNotIn(signal.NSIG, s)
136        self.assertLess(len(s), signal.NSIG)
137
138    def test_issue9324(self):
139        # Updated for issue #10003, adding SIGBREAK
140        handler = lambda x, y: None
141        checked = set()
142        for sig in (signal.SIGABRT, signal.SIGBREAK, signal.SIGFPE,
143                    signal.SIGILL, signal.SIGINT, signal.SIGSEGV,
144                    signal.SIGTERM):
145            # Set and then reset a handler for signals that work on windows.
146            # Issue #18396, only for signals without a C-level handler.
147            if signal.getsignal(sig) is not None:
148                signal.signal(sig, signal.signal(sig, handler))
149                checked.add(sig)
150        # Issue #18396: Ensure the above loop at least tested *something*
151        self.assertTrue(checked)
152
153        with self.assertRaises(ValueError):
154            signal.signal(-1, handler)
155
156        with self.assertRaises(ValueError):
157            signal.signal(7, handler)
158
159    @unittest.skipUnless(sys.executable, "sys.executable required.")
160    def test_keyboard_interrupt_exit_code(self):
161        """KeyboardInterrupt triggers an exit using STATUS_CONTROL_C_EXIT."""
162        # We don't test via os.kill(os.getpid(), signal.CTRL_C_EVENT) here
163        # as that requires setting up a console control handler in a child
164        # in its own process group.  Doable, but quite complicated.  (see
165        # @eryksun on https://github.com/python/cpython/pull/11862)
166        process = subprocess.run(
167                [sys.executable, "-c", "raise KeyboardInterrupt"],
168                stderr=subprocess.PIPE)
169        self.assertIn(b"KeyboardInterrupt", process.stderr)
170        STATUS_CONTROL_C_EXIT = 0xC000013A
171        self.assertEqual(process.returncode, STATUS_CONTROL_C_EXIT)
172
173
174class WakeupFDTests(unittest.TestCase):
175
176    def test_invalid_call(self):
177        # First parameter is positional-only
178        with self.assertRaises(TypeError):
179            signal.set_wakeup_fd(signum=signal.SIGINT)
180
181        # warn_on_full_buffer is a keyword-only parameter
182        with self.assertRaises(TypeError):
183            signal.set_wakeup_fd(signal.SIGINT, False)
184
185    def test_invalid_fd(self):
186        fd = os_helper.make_bad_fd()
187        self.assertRaises((ValueError, OSError),
188                          signal.set_wakeup_fd, fd)
189
190    def test_invalid_socket(self):
191        sock = socket.socket()
192        fd = sock.fileno()
193        sock.close()
194        self.assertRaises((ValueError, OSError),
195                          signal.set_wakeup_fd, fd)
196
197    def test_set_wakeup_fd_result(self):
198        r1, w1 = os.pipe()
199        self.addCleanup(os.close, r1)
200        self.addCleanup(os.close, w1)
201        r2, w2 = os.pipe()
202        self.addCleanup(os.close, r2)
203        self.addCleanup(os.close, w2)
204
205        if hasattr(os, 'set_blocking'):
206            os.set_blocking(w1, False)
207            os.set_blocking(w2, False)
208
209        signal.set_wakeup_fd(w1)
210        self.assertEqual(signal.set_wakeup_fd(w2), w1)
211        self.assertEqual(signal.set_wakeup_fd(-1), w2)
212        self.assertEqual(signal.set_wakeup_fd(-1), -1)
213
214    def test_set_wakeup_fd_socket_result(self):
215        sock1 = socket.socket()
216        self.addCleanup(sock1.close)
217        sock1.setblocking(False)
218        fd1 = sock1.fileno()
219
220        sock2 = socket.socket()
221        self.addCleanup(sock2.close)
222        sock2.setblocking(False)
223        fd2 = sock2.fileno()
224
225        signal.set_wakeup_fd(fd1)
226        self.assertEqual(signal.set_wakeup_fd(fd2), fd1)
227        self.assertEqual(signal.set_wakeup_fd(-1), fd2)
228        self.assertEqual(signal.set_wakeup_fd(-1), -1)
229
230    # On Windows, files are always blocking and Windows does not provide a
231    # function to test if a socket is in non-blocking mode.
232    @unittest.skipIf(sys.platform == "win32", "tests specific to POSIX")
233    def test_set_wakeup_fd_blocking(self):
234        rfd, wfd = os.pipe()
235        self.addCleanup(os.close, rfd)
236        self.addCleanup(os.close, wfd)
237
238        # fd must be non-blocking
239        os.set_blocking(wfd, True)
240        with self.assertRaises(ValueError) as cm:
241            signal.set_wakeup_fd(wfd)
242        self.assertEqual(str(cm.exception),
243                         "the fd %s must be in non-blocking mode" % wfd)
244
245        # non-blocking is ok
246        os.set_blocking(wfd, False)
247        signal.set_wakeup_fd(wfd)
248        signal.set_wakeup_fd(-1)
249
250
251@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
252class WakeupSignalTests(unittest.TestCase):
253    @unittest.skipIf(_testcapi is None, 'need _testcapi')
254    def check_wakeup(self, test_body, *signals, ordered=True):
255        # use a subprocess to have only one thread
256        code = """if 1:
257        import _testcapi
258        import os
259        import signal
260        import struct
261
262        signals = {!r}
263
264        def handler(signum, frame):
265            pass
266
267        def check_signum(signals):
268            data = os.read(read, len(signals)+1)
269            raised = struct.unpack('%uB' % len(data), data)
270            if not {!r}:
271                raised = set(raised)
272                signals = set(signals)
273            if raised != signals:
274                raise Exception("%r != %r" % (raised, signals))
275
276        {}
277
278        signal.signal(signal.SIGALRM, handler)
279        read, write = os.pipe()
280        os.set_blocking(write, False)
281        signal.set_wakeup_fd(write)
282
283        test()
284        check_signum(signals)
285
286        os.close(read)
287        os.close(write)
288        """.format(tuple(map(int, signals)), ordered, test_body)
289
290        assert_python_ok('-c', code)
291
292    @unittest.skipIf(_testcapi is None, 'need _testcapi')
293    def test_wakeup_write_error(self):
294        # Issue #16105: write() errors in the C signal handler should not
295        # pass silently.
296        # Use a subprocess to have only one thread.
297        code = """if 1:
298        import _testcapi
299        import errno
300        import os
301        import signal
302        import sys
303        from test.support import captured_stderr
304
305        def handler(signum, frame):
306            1/0
307
308        signal.signal(signal.SIGALRM, handler)
309        r, w = os.pipe()
310        os.set_blocking(r, False)
311
312        # Set wakeup_fd a read-only file descriptor to trigger the error
313        signal.set_wakeup_fd(r)
314        try:
315            with captured_stderr() as err:
316                signal.raise_signal(signal.SIGALRM)
317        except ZeroDivisionError:
318            # An ignored exception should have been printed out on stderr
319            err = err.getvalue()
320            if ('Exception ignored when trying to write to the signal wakeup fd'
321                not in err):
322                raise AssertionError(err)
323            if ('OSError: [Errno %d]' % errno.EBADF) not in err:
324                raise AssertionError(err)
325        else:
326            raise AssertionError("ZeroDivisionError not raised")
327
328        os.close(r)
329        os.close(w)
330        """
331        r, w = os.pipe()
332        try:
333            os.write(r, b'x')
334        except OSError:
335            pass
336        else:
337            self.skipTest("OS doesn't report write() error on the read end of a pipe")
338        finally:
339            os.close(r)
340            os.close(w)
341
342        assert_python_ok('-c', code)
343
344    def test_wakeup_fd_early(self):
345        self.check_wakeup("""def test():
346            import select
347            import time
348
349            TIMEOUT_FULL = 10
350            TIMEOUT_HALF = 5
351
352            class InterruptSelect(Exception):
353                pass
354
355            def handler(signum, frame):
356                raise InterruptSelect
357            signal.signal(signal.SIGALRM, handler)
358
359            signal.alarm(1)
360
361            # We attempt to get a signal during the sleep,
362            # before select is called
363            try:
364                select.select([], [], [], TIMEOUT_FULL)
365            except InterruptSelect:
366                pass
367            else:
368                raise Exception("select() was not interrupted")
369
370            before_time = time.monotonic()
371            select.select([read], [], [], TIMEOUT_FULL)
372            after_time = time.monotonic()
373            dt = after_time - before_time
374            if dt >= TIMEOUT_HALF:
375                raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
376        """, signal.SIGALRM)
377
378    def test_wakeup_fd_during(self):
379        self.check_wakeup("""def test():
380            import select
381            import time
382
383            TIMEOUT_FULL = 10
384            TIMEOUT_HALF = 5
385
386            class InterruptSelect(Exception):
387                pass
388
389            def handler(signum, frame):
390                raise InterruptSelect
391            signal.signal(signal.SIGALRM, handler)
392
393            signal.alarm(1)
394            before_time = time.monotonic()
395            # We attempt to get a signal during the select call
396            try:
397                select.select([read], [], [], TIMEOUT_FULL)
398            except InterruptSelect:
399                pass
400            else:
401                raise Exception("select() was not interrupted")
402            after_time = time.monotonic()
403            dt = after_time - before_time
404            if dt >= TIMEOUT_HALF:
405                raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
406        """, signal.SIGALRM)
407
408    def test_signum(self):
409        self.check_wakeup("""def test():
410            signal.signal(signal.SIGUSR1, handler)
411            signal.raise_signal(signal.SIGUSR1)
412            signal.raise_signal(signal.SIGALRM)
413        """, signal.SIGUSR1, signal.SIGALRM)
414
415    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
416                         'need signal.pthread_sigmask()')
417    def test_pending(self):
418        self.check_wakeup("""def test():
419            signum1 = signal.SIGUSR1
420            signum2 = signal.SIGUSR2
421
422            signal.signal(signum1, handler)
423            signal.signal(signum2, handler)
424
425            signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2))
426            signal.raise_signal(signum1)
427            signal.raise_signal(signum2)
428            # Unblocking the 2 signals calls the C signal handler twice
429            signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2))
430        """,  signal.SIGUSR1, signal.SIGUSR2, ordered=False)
431
432
433@unittest.skipUnless(hasattr(socket, 'socketpair'), 'need socket.socketpair')
434class WakeupSocketSignalTests(unittest.TestCase):
435
436    @unittest.skipIf(_testcapi is None, 'need _testcapi')
437    def test_socket(self):
438        # use a subprocess to have only one thread
439        code = """if 1:
440        import signal
441        import socket
442        import struct
443        import _testcapi
444
445        signum = signal.SIGINT
446        signals = (signum,)
447
448        def handler(signum, frame):
449            pass
450
451        signal.signal(signum, handler)
452
453        read, write = socket.socketpair()
454        write.setblocking(False)
455        signal.set_wakeup_fd(write.fileno())
456
457        signal.raise_signal(signum)
458
459        data = read.recv(1)
460        if not data:
461            raise Exception("no signum written")
462        raised = struct.unpack('B', data)
463        if raised != signals:
464            raise Exception("%r != %r" % (raised, signals))
465
466        read.close()
467        write.close()
468        """
469
470        assert_python_ok('-c', code)
471
472    @unittest.skipIf(_testcapi is None, 'need _testcapi')
473    def test_send_error(self):
474        # Use a subprocess to have only one thread.
475        if os.name == 'nt':
476            action = 'send'
477        else:
478            action = 'write'
479        code = """if 1:
480        import errno
481        import signal
482        import socket
483        import sys
484        import time
485        import _testcapi
486        from test.support import captured_stderr
487
488        signum = signal.SIGINT
489
490        def handler(signum, frame):
491            pass
492
493        signal.signal(signum, handler)
494
495        read, write = socket.socketpair()
496        read.setblocking(False)
497        write.setblocking(False)
498
499        signal.set_wakeup_fd(write.fileno())
500
501        # Close sockets: send() will fail
502        read.close()
503        write.close()
504
505        with captured_stderr() as err:
506            signal.raise_signal(signum)
507
508        err = err.getvalue()
509        if ('Exception ignored when trying to {action} to the signal wakeup fd'
510            not in err):
511            raise AssertionError(err)
512        """.format(action=action)
513        assert_python_ok('-c', code)
514
515    @unittest.skipIf(_testcapi is None, 'need _testcapi')
516    def test_warn_on_full_buffer(self):
517        # Use a subprocess to have only one thread.
518        if os.name == 'nt':
519            action = 'send'
520        else:
521            action = 'write'
522        code = """if 1:
523        import errno
524        import signal
525        import socket
526        import sys
527        import time
528        import _testcapi
529        from test.support import captured_stderr
530
531        signum = signal.SIGINT
532
533        # This handler will be called, but we intentionally won't read from
534        # the wakeup fd.
535        def handler(signum, frame):
536            pass
537
538        signal.signal(signum, handler)
539
540        read, write = socket.socketpair()
541
542        # Fill the socketpair buffer
543        if sys.platform == 'win32':
544            # bpo-34130: On Windows, sometimes non-blocking send fails to fill
545            # the full socketpair buffer, so use a timeout of 50 ms instead.
546            write.settimeout(0.050)
547        else:
548            write.setblocking(False)
549
550        written = 0
551        if sys.platform == "vxworks":
552            CHUNK_SIZES = (1,)
553        else:
554            # Start with large chunk size to reduce the
555            # number of send needed to fill the buffer.
556            CHUNK_SIZES = (2 ** 16, 2 ** 8, 1)
557        for chunk_size in CHUNK_SIZES:
558            chunk = b"x" * chunk_size
559            try:
560                while True:
561                    write.send(chunk)
562                    written += chunk_size
563            except (BlockingIOError, TimeoutError):
564                pass
565
566        print(f"%s bytes written into the socketpair" % written, flush=True)
567
568        write.setblocking(False)
569        try:
570            write.send(b"x")
571        except BlockingIOError:
572            # The socketpair buffer seems full
573            pass
574        else:
575            raise AssertionError("%s bytes failed to fill the socketpair "
576                                 "buffer" % written)
577
578        # By default, we get a warning when a signal arrives
579        msg = ('Exception ignored when trying to {action} '
580               'to the signal wakeup fd')
581        signal.set_wakeup_fd(write.fileno())
582
583        with captured_stderr() as err:
584            signal.raise_signal(signum)
585
586        err = err.getvalue()
587        if msg not in err:
588            raise AssertionError("first set_wakeup_fd() test failed, "
589                                 "stderr: %r" % err)
590
591        # And also if warn_on_full_buffer=True
592        signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=True)
593
594        with captured_stderr() as err:
595            signal.raise_signal(signum)
596
597        err = err.getvalue()
598        if msg not in err:
599            raise AssertionError("set_wakeup_fd(warn_on_full_buffer=True) "
600                                 "test failed, stderr: %r" % err)
601
602        # But not if warn_on_full_buffer=False
603        signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=False)
604
605        with captured_stderr() as err:
606            signal.raise_signal(signum)
607
608        err = err.getvalue()
609        if err != "":
610            raise AssertionError("set_wakeup_fd(warn_on_full_buffer=False) "
611                                 "test failed, stderr: %r" % err)
612
613        # And then check the default again, to make sure warn_on_full_buffer
614        # settings don't leak across calls.
615        signal.set_wakeup_fd(write.fileno())
616
617        with captured_stderr() as err:
618            signal.raise_signal(signum)
619
620        err = err.getvalue()
621        if msg not in err:
622            raise AssertionError("second set_wakeup_fd() test failed, "
623                                 "stderr: %r" % err)
624
625        """.format(action=action)
626        assert_python_ok('-c', code)
627
628
629@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
630@unittest.skipUnless(hasattr(signal, 'siginterrupt'), "needs signal.siginterrupt()")
631class SiginterruptTest(unittest.TestCase):
632
633    def readpipe_interrupted(self, interrupt):
634        """Perform a read during which a signal will arrive.  Return True if the
635        read is interrupted by the signal and raises an exception.  Return False
636        if it returns normally.
637        """
638        # use a subprocess to have only one thread, to have a timeout on the
639        # blocking read and to not touch signal handling in this process
640        code = """if 1:
641            import errno
642            import os
643            import signal
644            import sys
645
646            interrupt = %r
647            r, w = os.pipe()
648
649            def handler(signum, frame):
650                1 / 0
651
652            signal.signal(signal.SIGALRM, handler)
653            if interrupt is not None:
654                signal.siginterrupt(signal.SIGALRM, interrupt)
655
656            print("ready")
657            sys.stdout.flush()
658
659            # run the test twice
660            try:
661                for loop in range(2):
662                    # send a SIGALRM in a second (during the read)
663                    signal.alarm(1)
664                    try:
665                        # blocking call: read from a pipe without data
666                        os.read(r, 1)
667                    except ZeroDivisionError:
668                        pass
669                    else:
670                        sys.exit(2)
671                sys.exit(3)
672            finally:
673                os.close(r)
674                os.close(w)
675        """ % (interrupt,)
676        with spawn_python('-c', code) as process:
677            try:
678                # wait until the child process is loaded and has started
679                first_line = process.stdout.readline()
680
681                stdout, stderr = process.communicate(timeout=support.SHORT_TIMEOUT)
682            except subprocess.TimeoutExpired:
683                process.kill()
684                return False
685            else:
686                stdout = first_line + stdout
687                exitcode = process.wait()
688                if exitcode not in (2, 3):
689                    raise Exception("Child error (exit code %s): %r"
690                                    % (exitcode, stdout))
691                return (exitcode == 3)
692
693    def test_without_siginterrupt(self):
694        # If a signal handler is installed and siginterrupt is not called
695        # at all, when that signal arrives, it interrupts a syscall that's in
696        # progress.
697        interrupted = self.readpipe_interrupted(None)
698        self.assertTrue(interrupted)
699
700    def test_siginterrupt_on(self):
701        # If a signal handler is installed and siginterrupt is called with
702        # a true value for the second argument, when that signal arrives, it
703        # interrupts a syscall that's in progress.
704        interrupted = self.readpipe_interrupted(True)
705        self.assertTrue(interrupted)
706
707    def test_siginterrupt_off(self):
708        # If a signal handler is installed and siginterrupt is called with
709        # a false value for the second argument, when that signal arrives, it
710        # does not interrupt a syscall that's in progress.
711        interrupted = self.readpipe_interrupted(False)
712        self.assertFalse(interrupted)
713
714
715@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
716@unittest.skipUnless(hasattr(signal, 'getitimer') and hasattr(signal, 'setitimer'),
717                         "needs signal.getitimer() and signal.setitimer()")
718class ItimerTest(unittest.TestCase):
719    def setUp(self):
720        self.hndl_called = False
721        self.hndl_count = 0
722        self.itimer = None
723        self.old_alarm = signal.signal(signal.SIGALRM, self.sig_alrm)
724
725    def tearDown(self):
726        signal.signal(signal.SIGALRM, self.old_alarm)
727        if self.itimer is not None: # test_itimer_exc doesn't change this attr
728            # just ensure that itimer is stopped
729            signal.setitimer(self.itimer, 0)
730
731    def sig_alrm(self, *args):
732        self.hndl_called = True
733
734    def sig_vtalrm(self, *args):
735        self.hndl_called = True
736
737        if self.hndl_count > 3:
738            # it shouldn't be here, because it should have been disabled.
739            raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL "
740                "timer.")
741        elif self.hndl_count == 3:
742            # disable ITIMER_VIRTUAL, this function shouldn't be called anymore
743            signal.setitimer(signal.ITIMER_VIRTUAL, 0)
744
745        self.hndl_count += 1
746
747    def sig_prof(self, *args):
748        self.hndl_called = True
749        signal.setitimer(signal.ITIMER_PROF, 0)
750
751    def test_itimer_exc(self):
752        # XXX I'm assuming -1 is an invalid itimer, but maybe some platform
753        # defines it ?
754        self.assertRaises(signal.ItimerError, signal.setitimer, -1, 0)
755        # Negative times are treated as zero on some platforms.
756        if 0:
757            self.assertRaises(signal.ItimerError,
758                              signal.setitimer, signal.ITIMER_REAL, -1)
759
760    def test_itimer_real(self):
761        self.itimer = signal.ITIMER_REAL
762        signal.setitimer(self.itimer, 1.0)
763        signal.pause()
764        self.assertEqual(self.hndl_called, True)
765
766    # Issue 3864, unknown if this affects earlier versions of freebsd also
767    @unittest.skipIf(sys.platform in ('netbsd5',),
768        'itimer not reliable (does not mix well with threading) on some BSDs.')
769    def test_itimer_virtual(self):
770        self.itimer = signal.ITIMER_VIRTUAL
771        signal.signal(signal.SIGVTALRM, self.sig_vtalrm)
772        signal.setitimer(self.itimer, 0.3, 0.2)
773
774        start_time = time.monotonic()
775        while time.monotonic() - start_time < 60.0:
776            # use up some virtual time by doing real work
777            _ = pow(12345, 67890, 10000019)
778            if signal.getitimer(self.itimer) == (0.0, 0.0):
779                break # sig_vtalrm handler stopped this itimer
780        else: # Issue 8424
781            self.skipTest("timeout: likely cause: machine too slow or load too "
782                          "high")
783
784        # virtual itimer should be (0.0, 0.0) now
785        self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
786        # and the handler should have been called
787        self.assertEqual(self.hndl_called, True)
788
789    def test_itimer_prof(self):
790        self.itimer = signal.ITIMER_PROF
791        signal.signal(signal.SIGPROF, self.sig_prof)
792        signal.setitimer(self.itimer, 0.2, 0.2)
793
794        start_time = time.monotonic()
795        while time.monotonic() - start_time < 60.0:
796            # do some work
797            _ = pow(12345, 67890, 10000019)
798            if signal.getitimer(self.itimer) == (0.0, 0.0):
799                break # sig_prof handler stopped this itimer
800        else: # Issue 8424
801            self.skipTest("timeout: likely cause: machine too slow or load too "
802                          "high")
803
804        # profiling itimer should be (0.0, 0.0) now
805        self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
806        # and the handler should have been called
807        self.assertEqual(self.hndl_called, True)
808
809    def test_setitimer_tiny(self):
810        # bpo-30807: C setitimer() takes a microsecond-resolution interval.
811        # Check that float -> timeval conversion doesn't round
812        # the interval down to zero, which would disable the timer.
813        self.itimer = signal.ITIMER_REAL
814        signal.setitimer(self.itimer, 1e-6)
815        time.sleep(1)
816        self.assertEqual(self.hndl_called, True)
817
818
819class PendingSignalsTests(unittest.TestCase):
820    """
821    Test pthread_sigmask(), pthread_kill(), sigpending() and sigwait()
822    functions.
823    """
824    @unittest.skipUnless(hasattr(signal, 'sigpending'),
825                         'need signal.sigpending()')
826    def test_sigpending_empty(self):
827        self.assertEqual(signal.sigpending(), set())
828
829    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
830                         'need signal.pthread_sigmask()')
831    @unittest.skipUnless(hasattr(signal, 'sigpending'),
832                         'need signal.sigpending()')
833    def test_sigpending(self):
834        code = """if 1:
835            import os
836            import signal
837
838            def handler(signum, frame):
839                1/0
840
841            signum = signal.SIGUSR1
842            signal.signal(signum, handler)
843
844            signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
845            os.kill(os.getpid(), signum)
846            pending = signal.sigpending()
847            for sig in pending:
848                assert isinstance(sig, signal.Signals), repr(pending)
849            if pending != {signum}:
850                raise Exception('%s != {%s}' % (pending, signum))
851            try:
852                signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
853            except ZeroDivisionError:
854                pass
855            else:
856                raise Exception("ZeroDivisionError not raised")
857        """
858        assert_python_ok('-c', code)
859
860    @unittest.skipUnless(hasattr(signal, 'pthread_kill'),
861                         'need signal.pthread_kill()')
862    def test_pthread_kill(self):
863        code = """if 1:
864            import signal
865            import threading
866            import sys
867
868            signum = signal.SIGUSR1
869
870            def handler(signum, frame):
871                1/0
872
873            signal.signal(signum, handler)
874
875            tid = threading.get_ident()
876            try:
877                signal.pthread_kill(tid, signum)
878            except ZeroDivisionError:
879                pass
880            else:
881                raise Exception("ZeroDivisionError not raised")
882        """
883        assert_python_ok('-c', code)
884
885    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
886                         'need signal.pthread_sigmask()')
887    def wait_helper(self, blocked, test):
888        """
889        test: body of the "def test(signum):" function.
890        blocked: number of the blocked signal
891        """
892        code = '''if 1:
893        import signal
894        import sys
895        from signal import Signals
896
897        def handler(signum, frame):
898            1/0
899
900        %s
901
902        blocked = %r
903        signum = signal.SIGALRM
904
905        # child: block and wait the signal
906        try:
907            signal.signal(signum, handler)
908            signal.pthread_sigmask(signal.SIG_BLOCK, [blocked])
909
910            # Do the tests
911            test(signum)
912
913            # The handler must not be called on unblock
914            try:
915                signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked])
916            except ZeroDivisionError:
917                print("the signal handler has been called",
918                      file=sys.stderr)
919                sys.exit(1)
920        except BaseException as err:
921            print("error: {}".format(err), file=sys.stderr)
922            sys.stderr.flush()
923            sys.exit(1)
924        ''' % (test.strip(), blocked)
925
926        # sig*wait* must be called with the signal blocked: since the current
927        # process might have several threads running, use a subprocess to have
928        # a single thread.
929        assert_python_ok('-c', code)
930
931    @unittest.skipUnless(hasattr(signal, 'sigwait'),
932                         'need signal.sigwait()')
933    def test_sigwait(self):
934        self.wait_helper(signal.SIGALRM, '''
935        def test(signum):
936            signal.alarm(1)
937            received = signal.sigwait([signum])
938            assert isinstance(received, signal.Signals), received
939            if received != signum:
940                raise Exception('received %s, not %s' % (received, signum))
941        ''')
942
943    @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'),
944                         'need signal.sigwaitinfo()')
945    def test_sigwaitinfo(self):
946        self.wait_helper(signal.SIGALRM, '''
947        def test(signum):
948            signal.alarm(1)
949            info = signal.sigwaitinfo([signum])
950            if info.si_signo != signum:
951                raise Exception("info.si_signo != %s" % signum)
952        ''')
953
954    @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
955                         'need signal.sigtimedwait()')
956    def test_sigtimedwait(self):
957        self.wait_helper(signal.SIGALRM, '''
958        def test(signum):
959            signal.alarm(1)
960            info = signal.sigtimedwait([signum], 10.1000)
961            if info.si_signo != signum:
962                raise Exception('info.si_signo != %s' % signum)
963        ''')
964
965    @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
966                         'need signal.sigtimedwait()')
967    def test_sigtimedwait_poll(self):
968        # check that polling with sigtimedwait works
969        self.wait_helper(signal.SIGALRM, '''
970        def test(signum):
971            import os
972            os.kill(os.getpid(), signum)
973            info = signal.sigtimedwait([signum], 0)
974            if info.si_signo != signum:
975                raise Exception('info.si_signo != %s' % signum)
976        ''')
977
978    @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
979                         'need signal.sigtimedwait()')
980    def test_sigtimedwait_timeout(self):
981        self.wait_helper(signal.SIGALRM, '''
982        def test(signum):
983            received = signal.sigtimedwait([signum], 1.0)
984            if received is not None:
985                raise Exception("received=%r" % (received,))
986        ''')
987
988    @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
989                         'need signal.sigtimedwait()')
990    def test_sigtimedwait_negative_timeout(self):
991        signum = signal.SIGALRM
992        self.assertRaises(ValueError, signal.sigtimedwait, [signum], -1.0)
993
994    @unittest.skipUnless(hasattr(signal, 'sigwait'),
995                         'need signal.sigwait()')
996    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
997                         'need signal.pthread_sigmask()')
998    def test_sigwait_thread(self):
999        # Check that calling sigwait() from a thread doesn't suspend the whole
1000        # process. A new interpreter is spawned to avoid problems when mixing
1001        # threads and fork(): only async-safe functions are allowed between
1002        # fork() and exec().
1003        assert_python_ok("-c", """if True:
1004            import os, threading, sys, time, signal
1005
1006            # the default handler terminates the process
1007            signum = signal.SIGUSR1
1008
1009            def kill_later():
1010                # wait until the main thread is waiting in sigwait()
1011                time.sleep(1)
1012                os.kill(os.getpid(), signum)
1013
1014            # the signal must be blocked by all the threads
1015            signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
1016            killer = threading.Thread(target=kill_later)
1017            killer.start()
1018            received = signal.sigwait([signum])
1019            if received != signum:
1020                print("sigwait() received %s, not %s" % (received, signum),
1021                      file=sys.stderr)
1022                sys.exit(1)
1023            killer.join()
1024            # unblock the signal, which should have been cleared by sigwait()
1025            signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
1026        """)
1027
1028    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
1029                         'need signal.pthread_sigmask()')
1030    def test_pthread_sigmask_arguments(self):
1031        self.assertRaises(TypeError, signal.pthread_sigmask)
1032        self.assertRaises(TypeError, signal.pthread_sigmask, 1)
1033        self.assertRaises(TypeError, signal.pthread_sigmask, 1, 2, 3)
1034        self.assertRaises(OSError, signal.pthread_sigmask, 1700, [])
1035        with self.assertRaises(ValueError):
1036            signal.pthread_sigmask(signal.SIG_BLOCK, [signal.NSIG])
1037        with self.assertRaises(ValueError):
1038            signal.pthread_sigmask(signal.SIG_BLOCK, [0])
1039        with self.assertRaises(ValueError):
1040            signal.pthread_sigmask(signal.SIG_BLOCK, [1<<1000])
1041
1042    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
1043                         'need signal.pthread_sigmask()')
1044    def test_pthread_sigmask_valid_signals(self):
1045        s = signal.pthread_sigmask(signal.SIG_BLOCK, signal.valid_signals())
1046        self.addCleanup(signal.pthread_sigmask, signal.SIG_SETMASK, s)
1047        # Get current blocked set
1048        s = signal.pthread_sigmask(signal.SIG_UNBLOCK, signal.valid_signals())
1049        self.assertLessEqual(s, signal.valid_signals())
1050
1051    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
1052                         'need signal.pthread_sigmask()')
1053    def test_pthread_sigmask(self):
1054        code = """if 1:
1055        import signal
1056        import os; import threading
1057
1058        def handler(signum, frame):
1059            1/0
1060
1061        def kill(signum):
1062            os.kill(os.getpid(), signum)
1063
1064        def check_mask(mask):
1065            for sig in mask:
1066                assert isinstance(sig, signal.Signals), repr(sig)
1067
1068        def read_sigmask():
1069            sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, [])
1070            check_mask(sigmask)
1071            return sigmask
1072
1073        signum = signal.SIGUSR1
1074
1075        # Install our signal handler
1076        old_handler = signal.signal(signum, handler)
1077
1078        # Unblock SIGUSR1 (and copy the old mask) to test our signal handler
1079        old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
1080        check_mask(old_mask)
1081        try:
1082            kill(signum)
1083        except ZeroDivisionError:
1084            pass
1085        else:
1086            raise Exception("ZeroDivisionError not raised")
1087
1088        # Block and then raise SIGUSR1. The signal is blocked: the signal
1089        # handler is not called, and the signal is now pending
1090        mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
1091        check_mask(mask)
1092        kill(signum)
1093
1094        # Check the new mask
1095        blocked = read_sigmask()
1096        check_mask(blocked)
1097        if signum not in blocked:
1098            raise Exception("%s not in %s" % (signum, blocked))
1099        if old_mask ^ blocked != {signum}:
1100            raise Exception("%s ^ %s != {%s}" % (old_mask, blocked, signum))
1101
1102        # Unblock SIGUSR1
1103        try:
1104            # unblock the pending signal calls immediately the signal handler
1105            signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
1106        except ZeroDivisionError:
1107            pass
1108        else:
1109            raise Exception("ZeroDivisionError not raised")
1110        try:
1111            kill(signum)
1112        except ZeroDivisionError:
1113            pass
1114        else:
1115            raise Exception("ZeroDivisionError not raised")
1116
1117        # Check the new mask
1118        unblocked = read_sigmask()
1119        if signum in unblocked:
1120            raise Exception("%s in %s" % (signum, unblocked))
1121        if blocked ^ unblocked != {signum}:
1122            raise Exception("%s ^ %s != {%s}" % (blocked, unblocked, signum))
1123        if old_mask != unblocked:
1124            raise Exception("%s != %s" % (old_mask, unblocked))
1125        """
1126        assert_python_ok('-c', code)
1127
1128    @unittest.skipUnless(hasattr(signal, 'pthread_kill'),
1129                         'need signal.pthread_kill()')
1130    def test_pthread_kill_main_thread(self):
1131        # Test that a signal can be sent to the main thread with pthread_kill()
1132        # before any other thread has been created (see issue #12392).
1133        code = """if True:
1134            import threading
1135            import signal
1136            import sys
1137
1138            def handler(signum, frame):
1139                sys.exit(3)
1140
1141            signal.signal(signal.SIGUSR1, handler)
1142            signal.pthread_kill(threading.get_ident(), signal.SIGUSR1)
1143            sys.exit(2)
1144        """
1145
1146        with spawn_python('-c', code) as process:
1147            stdout, stderr = process.communicate()
1148            exitcode = process.wait()
1149            if exitcode != 3:
1150                raise Exception("Child error (exit code %s): %s" %
1151                                (exitcode, stdout))
1152
1153
1154class StressTest(unittest.TestCase):
1155    """
1156    Stress signal delivery, especially when a signal arrives in
1157    the middle of recomputing the signal state or executing
1158    previously tripped signal handlers.
1159    """
1160
1161    def setsig(self, signum, handler):
1162        old_handler = signal.signal(signum, handler)
1163        self.addCleanup(signal.signal, signum, old_handler)
1164
1165    def measure_itimer_resolution(self):
1166        N = 20
1167        times = []
1168
1169        def handler(signum=None, frame=None):
1170            if len(times) < N:
1171                times.append(time.perf_counter())
1172                # 1 µs is the smallest possible timer interval,
1173                # we want to measure what the concrete duration
1174                # will be on this platform
1175                signal.setitimer(signal.ITIMER_REAL, 1e-6)
1176
1177        self.addCleanup(signal.setitimer, signal.ITIMER_REAL, 0)
1178        self.setsig(signal.SIGALRM, handler)
1179        handler()
1180        while len(times) < N:
1181            time.sleep(1e-3)
1182
1183        durations = [times[i+1] - times[i] for i in range(len(times) - 1)]
1184        med = statistics.median(durations)
1185        if support.verbose:
1186            print("detected median itimer() resolution: %.6f s." % (med,))
1187        return med
1188
1189    def decide_itimer_count(self):
1190        # Some systems have poor setitimer() resolution (for example
1191        # measured around 20 ms. on FreeBSD 9), so decide on a reasonable
1192        # number of sequential timers based on that.
1193        reso = self.measure_itimer_resolution()
1194        if reso <= 1e-4:
1195            return 10000
1196        elif reso <= 1e-2:
1197            return 100
1198        else:
1199            self.skipTest("detected itimer resolution (%.3f s.) too high "
1200                          "(> 10 ms.) on this platform (or system too busy)"
1201                          % (reso,))
1202
1203    @unittest.skipUnless(hasattr(signal, "setitimer"),
1204                         "test needs setitimer()")
1205    def test_stress_delivery_dependent(self):
1206        """
1207        This test uses dependent signal handlers.
1208        """
1209        N = self.decide_itimer_count()
1210        sigs = []
1211
1212        def first_handler(signum, frame):
1213            # 1e-6 is the minimum non-zero value for `setitimer()`.
1214            # Choose a random delay so as to improve chances of
1215            # triggering a race condition.  Ideally the signal is received
1216            # when inside critical signal-handling routines such as
1217            # Py_MakePendingCalls().
1218            signal.setitimer(signal.ITIMER_REAL, 1e-6 + random.random() * 1e-5)
1219
1220        def second_handler(signum=None, frame=None):
1221            sigs.append(signum)
1222
1223        # Here on Linux, SIGPROF > SIGALRM > SIGUSR1.  By using both
1224        # ascending and descending sequences (SIGUSR1 then SIGALRM,
1225        # SIGPROF then SIGALRM), we maximize chances of hitting a bug.
1226        self.setsig(signal.SIGPROF, first_handler)
1227        self.setsig(signal.SIGUSR1, first_handler)
1228        self.setsig(signal.SIGALRM, second_handler)  # for ITIMER_REAL
1229
1230        expected_sigs = 0
1231        deadline = time.monotonic() + support.SHORT_TIMEOUT
1232
1233        while expected_sigs < N:
1234            os.kill(os.getpid(), signal.SIGPROF)
1235            expected_sigs += 1
1236            # Wait for handlers to run to avoid signal coalescing
1237            while len(sigs) < expected_sigs and time.monotonic() < deadline:
1238                time.sleep(1e-5)
1239
1240            os.kill(os.getpid(), signal.SIGUSR1)
1241            expected_sigs += 1
1242            while len(sigs) < expected_sigs and time.monotonic() < deadline:
1243                time.sleep(1e-5)
1244
1245        # All ITIMER_REAL signals should have been delivered to the
1246        # Python handler
1247        self.assertEqual(len(sigs), N, "Some signals were lost")
1248
1249    @unittest.skipUnless(hasattr(signal, "setitimer"),
1250                         "test needs setitimer()")
1251    def test_stress_delivery_simultaneous(self):
1252        """
1253        This test uses simultaneous signal handlers.
1254        """
1255        N = self.decide_itimer_count()
1256        sigs = []
1257
1258        def handler(signum, frame):
1259            sigs.append(signum)
1260
1261        self.setsig(signal.SIGUSR1, handler)
1262        self.setsig(signal.SIGALRM, handler)  # for ITIMER_REAL
1263
1264        expected_sigs = 0
1265        deadline = time.monotonic() + support.SHORT_TIMEOUT
1266
1267        while expected_sigs < N:
1268            # Hopefully the SIGALRM will be received somewhere during
1269            # initial processing of SIGUSR1.
1270            signal.setitimer(signal.ITIMER_REAL, 1e-6 + random.random() * 1e-5)
1271            os.kill(os.getpid(), signal.SIGUSR1)
1272
1273            expected_sigs += 2
1274            # Wait for handlers to run to avoid signal coalescing
1275            while len(sigs) < expected_sigs and time.monotonic() < deadline:
1276                time.sleep(1e-5)
1277
1278        # All ITIMER_REAL signals should have been delivered to the
1279        # Python handler
1280        self.assertEqual(len(sigs), N, "Some signals were lost")
1281
1282    @unittest.skipUnless(hasattr(signal, "SIGUSR1"),
1283                         "test needs SIGUSR1")
1284    def test_stress_modifying_handlers(self):
1285        # bpo-43406: race condition between trip_signal() and signal.signal
1286        signum = signal.SIGUSR1
1287        num_sent_signals = 0
1288        num_received_signals = 0
1289        do_stop = False
1290
1291        def custom_handler(signum, frame):
1292            nonlocal num_received_signals
1293            num_received_signals += 1
1294
1295        def set_interrupts():
1296            nonlocal num_sent_signals
1297            while not do_stop:
1298                signal.raise_signal(signum)
1299                num_sent_signals += 1
1300
1301        def cycle_handlers():
1302            while num_sent_signals < 100:
1303                for i in range(20000):
1304                    # Cycle between a Python-defined and a non-Python handler
1305                    for handler in [custom_handler, signal.SIG_IGN]:
1306                        signal.signal(signum, handler)
1307
1308        old_handler = signal.signal(signum, custom_handler)
1309        self.addCleanup(signal.signal, signum, old_handler)
1310
1311        t = threading.Thread(target=set_interrupts)
1312        try:
1313            ignored = False
1314            with support.catch_unraisable_exception() as cm:
1315                t.start()
1316                cycle_handlers()
1317                do_stop = True
1318                t.join()
1319
1320                if cm.unraisable is not None:
1321                    # An unraisable exception may be printed out when
1322                    # a signal is ignored due to the aforementioned
1323                    # race condition, check it.
1324                    self.assertIsInstance(cm.unraisable.exc_value, OSError)
1325                    self.assertIn(
1326                        f"Signal {signum:d} ignored due to race condition",
1327                        str(cm.unraisable.exc_value))
1328                    ignored = True
1329
1330            # bpo-43406: Even if it is unlikely, it's technically possible that
1331            # all signals were ignored because of race conditions.
1332            if not ignored:
1333                # Sanity check that some signals were received, but not all
1334                self.assertGreater(num_received_signals, 0)
1335            self.assertLess(num_received_signals, num_sent_signals)
1336        finally:
1337            do_stop = True
1338            t.join()
1339
1340
1341class RaiseSignalTest(unittest.TestCase):
1342
1343    def test_sigint(self):
1344        with self.assertRaises(KeyboardInterrupt):
1345            signal.raise_signal(signal.SIGINT)
1346
1347    @unittest.skipIf(sys.platform != "win32", "Windows specific test")
1348    def test_invalid_argument(self):
1349        try:
1350            SIGHUP = 1 # not supported on win32
1351            signal.raise_signal(SIGHUP)
1352            self.fail("OSError (Invalid argument) expected")
1353        except OSError as e:
1354            if e.errno == errno.EINVAL:
1355                pass
1356            else:
1357                raise
1358
1359    def test_handler(self):
1360        is_ok = False
1361        def handler(a, b):
1362            nonlocal is_ok
1363            is_ok = True
1364        old_signal = signal.signal(signal.SIGINT, handler)
1365        self.addCleanup(signal.signal, signal.SIGINT, old_signal)
1366
1367        signal.raise_signal(signal.SIGINT)
1368        self.assertTrue(is_ok)
1369
1370
1371class PidfdSignalTest(unittest.TestCase):
1372
1373    @unittest.skipUnless(
1374        hasattr(signal, "pidfd_send_signal"),
1375        "pidfd support not built in",
1376    )
1377    def test_pidfd_send_signal(self):
1378        with self.assertRaises(OSError) as cm:
1379            signal.pidfd_send_signal(0, signal.SIGINT)
1380        if cm.exception.errno == errno.ENOSYS:
1381            self.skipTest("kernel does not support pidfds")
1382        elif cm.exception.errno == errno.EPERM:
1383            self.skipTest("Not enough privileges to use pidfs")
1384        self.assertEqual(cm.exception.errno, errno.EBADF)
1385        my_pidfd = os.open(f'/proc/{os.getpid()}', os.O_DIRECTORY)
1386        self.addCleanup(os.close, my_pidfd)
1387        with self.assertRaisesRegex(TypeError, "^siginfo must be None$"):
1388            signal.pidfd_send_signal(my_pidfd, signal.SIGINT, object(), 0)
1389        with self.assertRaises(KeyboardInterrupt):
1390            signal.pidfd_send_signal(my_pidfd, signal.SIGINT)
1391
1392def tearDownModule():
1393    support.reap_children()
1394
1395if __name__ == "__main__":
1396    unittest.main()
1397