1import enum 2import errno 3import os 4import random 5import signal 6import socket 7import statistics 8import subprocess 9import sys 10import threading 11import time 12import unittest 13from test import support 14from test.support import os_helper 15from test.support.script_helper import assert_python_ok, spawn_python 16try: 17 import _testcapi 18except ImportError: 19 _testcapi = None 20 21 22class GenericTests(unittest.TestCase): 23 24 def test_enums(self): 25 for name in dir(signal): 26 sig = getattr(signal, name) 27 if name in {'SIG_DFL', 'SIG_IGN'}: 28 self.assertIsInstance(sig, signal.Handlers) 29 elif name in {'SIG_BLOCK', 'SIG_UNBLOCK', 'SIG_SETMASK'}: 30 self.assertIsInstance(sig, signal.Sigmasks) 31 elif name.startswith('SIG') and not name.startswith('SIG_'): 32 self.assertIsInstance(sig, signal.Signals) 33 elif name.startswith('CTRL_'): 34 self.assertIsInstance(sig, signal.Signals) 35 self.assertEqual(sys.platform, "win32") 36 37 CheckedSignals = enum._old_convert_( 38 enum.IntEnum, 'Signals', 'signal', 39 lambda name: 40 name.isupper() 41 and (name.startswith('SIG') and not name.startswith('SIG_')) 42 or name.startswith('CTRL_'), 43 source=signal, 44 ) 45 enum._test_simple_enum(CheckedSignals, signal.Signals) 46 47 CheckedHandlers = enum._old_convert_( 48 enum.IntEnum, 'Handlers', 'signal', 49 lambda name: name in ('SIG_DFL', 'SIG_IGN'), 50 source=signal, 51 ) 52 enum._test_simple_enum(CheckedHandlers, signal.Handlers) 53 54 Sigmasks = getattr(signal, 'Sigmasks', None) 55 if Sigmasks is not None: 56 CheckedSigmasks = enum._old_convert_( 57 enum.IntEnum, 'Sigmasks', 'signal', 58 lambda name: name in ('SIG_BLOCK', 'SIG_UNBLOCK', 'SIG_SETMASK'), 59 source=signal, 60 ) 61 enum._test_simple_enum(CheckedSigmasks, Sigmasks) 62 63 64@unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 65class PosixTests(unittest.TestCase): 66 def trivial_signal_handler(self, *args): 67 pass 68 69 def test_out_of_range_signal_number_raises_error(self): 70 self.assertRaises(ValueError, signal.getsignal, 4242) 71 72 self.assertRaises(ValueError, signal.signal, 4242, 73 self.trivial_signal_handler) 74 75 self.assertRaises(ValueError, signal.strsignal, 4242) 76 77 def test_setting_signal_handler_to_none_raises_error(self): 78 self.assertRaises(TypeError, signal.signal, 79 signal.SIGUSR1, None) 80 81 def test_getsignal(self): 82 hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler) 83 self.assertIsInstance(hup, signal.Handlers) 84 self.assertEqual(signal.getsignal(signal.SIGHUP), 85 self.trivial_signal_handler) 86 signal.signal(signal.SIGHUP, hup) 87 self.assertEqual(signal.getsignal(signal.SIGHUP), hup) 88 89 def test_strsignal(self): 90 self.assertIn("Interrupt", signal.strsignal(signal.SIGINT)) 91 self.assertIn("Terminated", signal.strsignal(signal.SIGTERM)) 92 self.assertIn("Hangup", signal.strsignal(signal.SIGHUP)) 93 94 # Issue 3864, unknown if this affects earlier versions of freebsd also 95 def test_interprocess_signal(self): 96 dirname = os.path.dirname(__file__) 97 script = os.path.join(dirname, 'signalinterproctester.py') 98 assert_python_ok(script) 99 100 def test_valid_signals(self): 101 s = signal.valid_signals() 102 self.assertIsInstance(s, set) 103 self.assertIn(signal.Signals.SIGINT, s) 104 self.assertIn(signal.Signals.SIGALRM, s) 105 self.assertNotIn(0, s) 106 self.assertNotIn(signal.NSIG, s) 107 self.assertLess(len(s), signal.NSIG) 108 109 @unittest.skipUnless(sys.executable, "sys.executable required.") 110 def test_keyboard_interrupt_exit_code(self): 111 """KeyboardInterrupt triggers exit via SIGINT.""" 112 process = subprocess.run( 113 [sys.executable, "-c", 114 "import os, signal, time\n" 115 "os.kill(os.getpid(), signal.SIGINT)\n" 116 "for _ in range(999): time.sleep(0.01)"], 117 stderr=subprocess.PIPE) 118 self.assertIn(b"KeyboardInterrupt", process.stderr) 119 self.assertEqual(process.returncode, -signal.SIGINT) 120 # Caveat: The exit code is insufficient to guarantee we actually died 121 # via a signal. POSIX shells do more than look at the 8 bit value. 122 # Writing an automation friendly test of an interactive shell 123 # to confirm that our process died via a SIGINT proved too complex. 124 125 126@unittest.skipUnless(sys.platform == "win32", "Windows specific") 127class WindowsSignalTests(unittest.TestCase): 128 129 def test_valid_signals(self): 130 s = signal.valid_signals() 131 self.assertIsInstance(s, set) 132 self.assertGreaterEqual(len(s), 6) 133 self.assertIn(signal.Signals.SIGINT, s) 134 self.assertNotIn(0, s) 135 self.assertNotIn(signal.NSIG, s) 136 self.assertLess(len(s), signal.NSIG) 137 138 def test_issue9324(self): 139 # Updated for issue #10003, adding SIGBREAK 140 handler = lambda x, y: None 141 checked = set() 142 for sig in (signal.SIGABRT, signal.SIGBREAK, signal.SIGFPE, 143 signal.SIGILL, signal.SIGINT, signal.SIGSEGV, 144 signal.SIGTERM): 145 # Set and then reset a handler for signals that work on windows. 146 # Issue #18396, only for signals without a C-level handler. 147 if signal.getsignal(sig) is not None: 148 signal.signal(sig, signal.signal(sig, handler)) 149 checked.add(sig) 150 # Issue #18396: Ensure the above loop at least tested *something* 151 self.assertTrue(checked) 152 153 with self.assertRaises(ValueError): 154 signal.signal(-1, handler) 155 156 with self.assertRaises(ValueError): 157 signal.signal(7, handler) 158 159 @unittest.skipUnless(sys.executable, "sys.executable required.") 160 def test_keyboard_interrupt_exit_code(self): 161 """KeyboardInterrupt triggers an exit using STATUS_CONTROL_C_EXIT.""" 162 # We don't test via os.kill(os.getpid(), signal.CTRL_C_EVENT) here 163 # as that requires setting up a console control handler in a child 164 # in its own process group. Doable, but quite complicated. (see 165 # @eryksun on https://github.com/python/cpython/pull/11862) 166 process = subprocess.run( 167 [sys.executable, "-c", "raise KeyboardInterrupt"], 168 stderr=subprocess.PIPE) 169 self.assertIn(b"KeyboardInterrupt", process.stderr) 170 STATUS_CONTROL_C_EXIT = 0xC000013A 171 self.assertEqual(process.returncode, STATUS_CONTROL_C_EXIT) 172 173 174class WakeupFDTests(unittest.TestCase): 175 176 def test_invalid_call(self): 177 # First parameter is positional-only 178 with self.assertRaises(TypeError): 179 signal.set_wakeup_fd(signum=signal.SIGINT) 180 181 # warn_on_full_buffer is a keyword-only parameter 182 with self.assertRaises(TypeError): 183 signal.set_wakeup_fd(signal.SIGINT, False) 184 185 def test_invalid_fd(self): 186 fd = os_helper.make_bad_fd() 187 self.assertRaises((ValueError, OSError), 188 signal.set_wakeup_fd, fd) 189 190 def test_invalid_socket(self): 191 sock = socket.socket() 192 fd = sock.fileno() 193 sock.close() 194 self.assertRaises((ValueError, OSError), 195 signal.set_wakeup_fd, fd) 196 197 def test_set_wakeup_fd_result(self): 198 r1, w1 = os.pipe() 199 self.addCleanup(os.close, r1) 200 self.addCleanup(os.close, w1) 201 r2, w2 = os.pipe() 202 self.addCleanup(os.close, r2) 203 self.addCleanup(os.close, w2) 204 205 if hasattr(os, 'set_blocking'): 206 os.set_blocking(w1, False) 207 os.set_blocking(w2, False) 208 209 signal.set_wakeup_fd(w1) 210 self.assertEqual(signal.set_wakeup_fd(w2), w1) 211 self.assertEqual(signal.set_wakeup_fd(-1), w2) 212 self.assertEqual(signal.set_wakeup_fd(-1), -1) 213 214 def test_set_wakeup_fd_socket_result(self): 215 sock1 = socket.socket() 216 self.addCleanup(sock1.close) 217 sock1.setblocking(False) 218 fd1 = sock1.fileno() 219 220 sock2 = socket.socket() 221 self.addCleanup(sock2.close) 222 sock2.setblocking(False) 223 fd2 = sock2.fileno() 224 225 signal.set_wakeup_fd(fd1) 226 self.assertEqual(signal.set_wakeup_fd(fd2), fd1) 227 self.assertEqual(signal.set_wakeup_fd(-1), fd2) 228 self.assertEqual(signal.set_wakeup_fd(-1), -1) 229 230 # On Windows, files are always blocking and Windows does not provide a 231 # function to test if a socket is in non-blocking mode. 232 @unittest.skipIf(sys.platform == "win32", "tests specific to POSIX") 233 def test_set_wakeup_fd_blocking(self): 234 rfd, wfd = os.pipe() 235 self.addCleanup(os.close, rfd) 236 self.addCleanup(os.close, wfd) 237 238 # fd must be non-blocking 239 os.set_blocking(wfd, True) 240 with self.assertRaises(ValueError) as cm: 241 signal.set_wakeup_fd(wfd) 242 self.assertEqual(str(cm.exception), 243 "the fd %s must be in non-blocking mode" % wfd) 244 245 # non-blocking is ok 246 os.set_blocking(wfd, False) 247 signal.set_wakeup_fd(wfd) 248 signal.set_wakeup_fd(-1) 249 250 251@unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 252class WakeupSignalTests(unittest.TestCase): 253 @unittest.skipIf(_testcapi is None, 'need _testcapi') 254 def check_wakeup(self, test_body, *signals, ordered=True): 255 # use a subprocess to have only one thread 256 code = """if 1: 257 import _testcapi 258 import os 259 import signal 260 import struct 261 262 signals = {!r} 263 264 def handler(signum, frame): 265 pass 266 267 def check_signum(signals): 268 data = os.read(read, len(signals)+1) 269 raised = struct.unpack('%uB' % len(data), data) 270 if not {!r}: 271 raised = set(raised) 272 signals = set(signals) 273 if raised != signals: 274 raise Exception("%r != %r" % (raised, signals)) 275 276 {} 277 278 signal.signal(signal.SIGALRM, handler) 279 read, write = os.pipe() 280 os.set_blocking(write, False) 281 signal.set_wakeup_fd(write) 282 283 test() 284 check_signum(signals) 285 286 os.close(read) 287 os.close(write) 288 """.format(tuple(map(int, signals)), ordered, test_body) 289 290 assert_python_ok('-c', code) 291 292 @unittest.skipIf(_testcapi is None, 'need _testcapi') 293 def test_wakeup_write_error(self): 294 # Issue #16105: write() errors in the C signal handler should not 295 # pass silently. 296 # Use a subprocess to have only one thread. 297 code = """if 1: 298 import _testcapi 299 import errno 300 import os 301 import signal 302 import sys 303 from test.support import captured_stderr 304 305 def handler(signum, frame): 306 1/0 307 308 signal.signal(signal.SIGALRM, handler) 309 r, w = os.pipe() 310 os.set_blocking(r, False) 311 312 # Set wakeup_fd a read-only file descriptor to trigger the error 313 signal.set_wakeup_fd(r) 314 try: 315 with captured_stderr() as err: 316 signal.raise_signal(signal.SIGALRM) 317 except ZeroDivisionError: 318 # An ignored exception should have been printed out on stderr 319 err = err.getvalue() 320 if ('Exception ignored when trying to write to the signal wakeup fd' 321 not in err): 322 raise AssertionError(err) 323 if ('OSError: [Errno %d]' % errno.EBADF) not in err: 324 raise AssertionError(err) 325 else: 326 raise AssertionError("ZeroDivisionError not raised") 327 328 os.close(r) 329 os.close(w) 330 """ 331 r, w = os.pipe() 332 try: 333 os.write(r, b'x') 334 except OSError: 335 pass 336 else: 337 self.skipTest("OS doesn't report write() error on the read end of a pipe") 338 finally: 339 os.close(r) 340 os.close(w) 341 342 assert_python_ok('-c', code) 343 344 def test_wakeup_fd_early(self): 345 self.check_wakeup("""def test(): 346 import select 347 import time 348 349 TIMEOUT_FULL = 10 350 TIMEOUT_HALF = 5 351 352 class InterruptSelect(Exception): 353 pass 354 355 def handler(signum, frame): 356 raise InterruptSelect 357 signal.signal(signal.SIGALRM, handler) 358 359 signal.alarm(1) 360 361 # We attempt to get a signal during the sleep, 362 # before select is called 363 try: 364 select.select([], [], [], TIMEOUT_FULL) 365 except InterruptSelect: 366 pass 367 else: 368 raise Exception("select() was not interrupted") 369 370 before_time = time.monotonic() 371 select.select([read], [], [], TIMEOUT_FULL) 372 after_time = time.monotonic() 373 dt = after_time - before_time 374 if dt >= TIMEOUT_HALF: 375 raise Exception("%s >= %s" % (dt, TIMEOUT_HALF)) 376 """, signal.SIGALRM) 377 378 def test_wakeup_fd_during(self): 379 self.check_wakeup("""def test(): 380 import select 381 import time 382 383 TIMEOUT_FULL = 10 384 TIMEOUT_HALF = 5 385 386 class InterruptSelect(Exception): 387 pass 388 389 def handler(signum, frame): 390 raise InterruptSelect 391 signal.signal(signal.SIGALRM, handler) 392 393 signal.alarm(1) 394 before_time = time.monotonic() 395 # We attempt to get a signal during the select call 396 try: 397 select.select([read], [], [], TIMEOUT_FULL) 398 except InterruptSelect: 399 pass 400 else: 401 raise Exception("select() was not interrupted") 402 after_time = time.monotonic() 403 dt = after_time - before_time 404 if dt >= TIMEOUT_HALF: 405 raise Exception("%s >= %s" % (dt, TIMEOUT_HALF)) 406 """, signal.SIGALRM) 407 408 def test_signum(self): 409 self.check_wakeup("""def test(): 410 signal.signal(signal.SIGUSR1, handler) 411 signal.raise_signal(signal.SIGUSR1) 412 signal.raise_signal(signal.SIGALRM) 413 """, signal.SIGUSR1, signal.SIGALRM) 414 415 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 416 'need signal.pthread_sigmask()') 417 def test_pending(self): 418 self.check_wakeup("""def test(): 419 signum1 = signal.SIGUSR1 420 signum2 = signal.SIGUSR2 421 422 signal.signal(signum1, handler) 423 signal.signal(signum2, handler) 424 425 signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2)) 426 signal.raise_signal(signum1) 427 signal.raise_signal(signum2) 428 # Unblocking the 2 signals calls the C signal handler twice 429 signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2)) 430 """, signal.SIGUSR1, signal.SIGUSR2, ordered=False) 431 432 433@unittest.skipUnless(hasattr(socket, 'socketpair'), 'need socket.socketpair') 434class WakeupSocketSignalTests(unittest.TestCase): 435 436 @unittest.skipIf(_testcapi is None, 'need _testcapi') 437 def test_socket(self): 438 # use a subprocess to have only one thread 439 code = """if 1: 440 import signal 441 import socket 442 import struct 443 import _testcapi 444 445 signum = signal.SIGINT 446 signals = (signum,) 447 448 def handler(signum, frame): 449 pass 450 451 signal.signal(signum, handler) 452 453 read, write = socket.socketpair() 454 write.setblocking(False) 455 signal.set_wakeup_fd(write.fileno()) 456 457 signal.raise_signal(signum) 458 459 data = read.recv(1) 460 if not data: 461 raise Exception("no signum written") 462 raised = struct.unpack('B', data) 463 if raised != signals: 464 raise Exception("%r != %r" % (raised, signals)) 465 466 read.close() 467 write.close() 468 """ 469 470 assert_python_ok('-c', code) 471 472 @unittest.skipIf(_testcapi is None, 'need _testcapi') 473 def test_send_error(self): 474 # Use a subprocess to have only one thread. 475 if os.name == 'nt': 476 action = 'send' 477 else: 478 action = 'write' 479 code = """if 1: 480 import errno 481 import signal 482 import socket 483 import sys 484 import time 485 import _testcapi 486 from test.support import captured_stderr 487 488 signum = signal.SIGINT 489 490 def handler(signum, frame): 491 pass 492 493 signal.signal(signum, handler) 494 495 read, write = socket.socketpair() 496 read.setblocking(False) 497 write.setblocking(False) 498 499 signal.set_wakeup_fd(write.fileno()) 500 501 # Close sockets: send() will fail 502 read.close() 503 write.close() 504 505 with captured_stderr() as err: 506 signal.raise_signal(signum) 507 508 err = err.getvalue() 509 if ('Exception ignored when trying to {action} to the signal wakeup fd' 510 not in err): 511 raise AssertionError(err) 512 """.format(action=action) 513 assert_python_ok('-c', code) 514 515 @unittest.skipIf(_testcapi is None, 'need _testcapi') 516 def test_warn_on_full_buffer(self): 517 # Use a subprocess to have only one thread. 518 if os.name == 'nt': 519 action = 'send' 520 else: 521 action = 'write' 522 code = """if 1: 523 import errno 524 import signal 525 import socket 526 import sys 527 import time 528 import _testcapi 529 from test.support import captured_stderr 530 531 signum = signal.SIGINT 532 533 # This handler will be called, but we intentionally won't read from 534 # the wakeup fd. 535 def handler(signum, frame): 536 pass 537 538 signal.signal(signum, handler) 539 540 read, write = socket.socketpair() 541 542 # Fill the socketpair buffer 543 if sys.platform == 'win32': 544 # bpo-34130: On Windows, sometimes non-blocking send fails to fill 545 # the full socketpair buffer, so use a timeout of 50 ms instead. 546 write.settimeout(0.050) 547 else: 548 write.setblocking(False) 549 550 written = 0 551 if sys.platform == "vxworks": 552 CHUNK_SIZES = (1,) 553 else: 554 # Start with large chunk size to reduce the 555 # number of send needed to fill the buffer. 556 CHUNK_SIZES = (2 ** 16, 2 ** 8, 1) 557 for chunk_size in CHUNK_SIZES: 558 chunk = b"x" * chunk_size 559 try: 560 while True: 561 write.send(chunk) 562 written += chunk_size 563 except (BlockingIOError, TimeoutError): 564 pass 565 566 print(f"%s bytes written into the socketpair" % written, flush=True) 567 568 write.setblocking(False) 569 try: 570 write.send(b"x") 571 except BlockingIOError: 572 # The socketpair buffer seems full 573 pass 574 else: 575 raise AssertionError("%s bytes failed to fill the socketpair " 576 "buffer" % written) 577 578 # By default, we get a warning when a signal arrives 579 msg = ('Exception ignored when trying to {action} ' 580 'to the signal wakeup fd') 581 signal.set_wakeup_fd(write.fileno()) 582 583 with captured_stderr() as err: 584 signal.raise_signal(signum) 585 586 err = err.getvalue() 587 if msg not in err: 588 raise AssertionError("first set_wakeup_fd() test failed, " 589 "stderr: %r" % err) 590 591 # And also if warn_on_full_buffer=True 592 signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=True) 593 594 with captured_stderr() as err: 595 signal.raise_signal(signum) 596 597 err = err.getvalue() 598 if msg not in err: 599 raise AssertionError("set_wakeup_fd(warn_on_full_buffer=True) " 600 "test failed, stderr: %r" % err) 601 602 # But not if warn_on_full_buffer=False 603 signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=False) 604 605 with captured_stderr() as err: 606 signal.raise_signal(signum) 607 608 err = err.getvalue() 609 if err != "": 610 raise AssertionError("set_wakeup_fd(warn_on_full_buffer=False) " 611 "test failed, stderr: %r" % err) 612 613 # And then check the default again, to make sure warn_on_full_buffer 614 # settings don't leak across calls. 615 signal.set_wakeup_fd(write.fileno()) 616 617 with captured_stderr() as err: 618 signal.raise_signal(signum) 619 620 err = err.getvalue() 621 if msg not in err: 622 raise AssertionError("second set_wakeup_fd() test failed, " 623 "stderr: %r" % err) 624 625 """.format(action=action) 626 assert_python_ok('-c', code) 627 628 629@unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 630@unittest.skipUnless(hasattr(signal, 'siginterrupt'), "needs signal.siginterrupt()") 631class SiginterruptTest(unittest.TestCase): 632 633 def readpipe_interrupted(self, interrupt): 634 """Perform a read during which a signal will arrive. Return True if the 635 read is interrupted by the signal and raises an exception. Return False 636 if it returns normally. 637 """ 638 # use a subprocess to have only one thread, to have a timeout on the 639 # blocking read and to not touch signal handling in this process 640 code = """if 1: 641 import errno 642 import os 643 import signal 644 import sys 645 646 interrupt = %r 647 r, w = os.pipe() 648 649 def handler(signum, frame): 650 1 / 0 651 652 signal.signal(signal.SIGALRM, handler) 653 if interrupt is not None: 654 signal.siginterrupt(signal.SIGALRM, interrupt) 655 656 print("ready") 657 sys.stdout.flush() 658 659 # run the test twice 660 try: 661 for loop in range(2): 662 # send a SIGALRM in a second (during the read) 663 signal.alarm(1) 664 try: 665 # blocking call: read from a pipe without data 666 os.read(r, 1) 667 except ZeroDivisionError: 668 pass 669 else: 670 sys.exit(2) 671 sys.exit(3) 672 finally: 673 os.close(r) 674 os.close(w) 675 """ % (interrupt,) 676 with spawn_python('-c', code) as process: 677 try: 678 # wait until the child process is loaded and has started 679 first_line = process.stdout.readline() 680 681 stdout, stderr = process.communicate(timeout=support.SHORT_TIMEOUT) 682 except subprocess.TimeoutExpired: 683 process.kill() 684 return False 685 else: 686 stdout = first_line + stdout 687 exitcode = process.wait() 688 if exitcode not in (2, 3): 689 raise Exception("Child error (exit code %s): %r" 690 % (exitcode, stdout)) 691 return (exitcode == 3) 692 693 def test_without_siginterrupt(self): 694 # If a signal handler is installed and siginterrupt is not called 695 # at all, when that signal arrives, it interrupts a syscall that's in 696 # progress. 697 interrupted = self.readpipe_interrupted(None) 698 self.assertTrue(interrupted) 699 700 def test_siginterrupt_on(self): 701 # If a signal handler is installed and siginterrupt is called with 702 # a true value for the second argument, when that signal arrives, it 703 # interrupts a syscall that's in progress. 704 interrupted = self.readpipe_interrupted(True) 705 self.assertTrue(interrupted) 706 707 def test_siginterrupt_off(self): 708 # If a signal handler is installed and siginterrupt is called with 709 # a false value for the second argument, when that signal arrives, it 710 # does not interrupt a syscall that's in progress. 711 interrupted = self.readpipe_interrupted(False) 712 self.assertFalse(interrupted) 713 714 715@unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 716@unittest.skipUnless(hasattr(signal, 'getitimer') and hasattr(signal, 'setitimer'), 717 "needs signal.getitimer() and signal.setitimer()") 718class ItimerTest(unittest.TestCase): 719 def setUp(self): 720 self.hndl_called = False 721 self.hndl_count = 0 722 self.itimer = None 723 self.old_alarm = signal.signal(signal.SIGALRM, self.sig_alrm) 724 725 def tearDown(self): 726 signal.signal(signal.SIGALRM, self.old_alarm) 727 if self.itimer is not None: # test_itimer_exc doesn't change this attr 728 # just ensure that itimer is stopped 729 signal.setitimer(self.itimer, 0) 730 731 def sig_alrm(self, *args): 732 self.hndl_called = True 733 734 def sig_vtalrm(self, *args): 735 self.hndl_called = True 736 737 if self.hndl_count > 3: 738 # it shouldn't be here, because it should have been disabled. 739 raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL " 740 "timer.") 741 elif self.hndl_count == 3: 742 # disable ITIMER_VIRTUAL, this function shouldn't be called anymore 743 signal.setitimer(signal.ITIMER_VIRTUAL, 0) 744 745 self.hndl_count += 1 746 747 def sig_prof(self, *args): 748 self.hndl_called = True 749 signal.setitimer(signal.ITIMER_PROF, 0) 750 751 def test_itimer_exc(self): 752 # XXX I'm assuming -1 is an invalid itimer, but maybe some platform 753 # defines it ? 754 self.assertRaises(signal.ItimerError, signal.setitimer, -1, 0) 755 # Negative times are treated as zero on some platforms. 756 if 0: 757 self.assertRaises(signal.ItimerError, 758 signal.setitimer, signal.ITIMER_REAL, -1) 759 760 def test_itimer_real(self): 761 self.itimer = signal.ITIMER_REAL 762 signal.setitimer(self.itimer, 1.0) 763 signal.pause() 764 self.assertEqual(self.hndl_called, True) 765 766 # Issue 3864, unknown if this affects earlier versions of freebsd also 767 @unittest.skipIf(sys.platform in ('netbsd5',), 768 'itimer not reliable (does not mix well with threading) on some BSDs.') 769 def test_itimer_virtual(self): 770 self.itimer = signal.ITIMER_VIRTUAL 771 signal.signal(signal.SIGVTALRM, self.sig_vtalrm) 772 signal.setitimer(self.itimer, 0.3, 0.2) 773 774 start_time = time.monotonic() 775 while time.monotonic() - start_time < 60.0: 776 # use up some virtual time by doing real work 777 _ = pow(12345, 67890, 10000019) 778 if signal.getitimer(self.itimer) == (0.0, 0.0): 779 break # sig_vtalrm handler stopped this itimer 780 else: # Issue 8424 781 self.skipTest("timeout: likely cause: machine too slow or load too " 782 "high") 783 784 # virtual itimer should be (0.0, 0.0) now 785 self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0)) 786 # and the handler should have been called 787 self.assertEqual(self.hndl_called, True) 788 789 def test_itimer_prof(self): 790 self.itimer = signal.ITIMER_PROF 791 signal.signal(signal.SIGPROF, self.sig_prof) 792 signal.setitimer(self.itimer, 0.2, 0.2) 793 794 start_time = time.monotonic() 795 while time.monotonic() - start_time < 60.0: 796 # do some work 797 _ = pow(12345, 67890, 10000019) 798 if signal.getitimer(self.itimer) == (0.0, 0.0): 799 break # sig_prof handler stopped this itimer 800 else: # Issue 8424 801 self.skipTest("timeout: likely cause: machine too slow or load too " 802 "high") 803 804 # profiling itimer should be (0.0, 0.0) now 805 self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0)) 806 # and the handler should have been called 807 self.assertEqual(self.hndl_called, True) 808 809 def test_setitimer_tiny(self): 810 # bpo-30807: C setitimer() takes a microsecond-resolution interval. 811 # Check that float -> timeval conversion doesn't round 812 # the interval down to zero, which would disable the timer. 813 self.itimer = signal.ITIMER_REAL 814 signal.setitimer(self.itimer, 1e-6) 815 time.sleep(1) 816 self.assertEqual(self.hndl_called, True) 817 818 819class PendingSignalsTests(unittest.TestCase): 820 """ 821 Test pthread_sigmask(), pthread_kill(), sigpending() and sigwait() 822 functions. 823 """ 824 @unittest.skipUnless(hasattr(signal, 'sigpending'), 825 'need signal.sigpending()') 826 def test_sigpending_empty(self): 827 self.assertEqual(signal.sigpending(), set()) 828 829 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 830 'need signal.pthread_sigmask()') 831 @unittest.skipUnless(hasattr(signal, 'sigpending'), 832 'need signal.sigpending()') 833 def test_sigpending(self): 834 code = """if 1: 835 import os 836 import signal 837 838 def handler(signum, frame): 839 1/0 840 841 signum = signal.SIGUSR1 842 signal.signal(signum, handler) 843 844 signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) 845 os.kill(os.getpid(), signum) 846 pending = signal.sigpending() 847 for sig in pending: 848 assert isinstance(sig, signal.Signals), repr(pending) 849 if pending != {signum}: 850 raise Exception('%s != {%s}' % (pending, signum)) 851 try: 852 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) 853 except ZeroDivisionError: 854 pass 855 else: 856 raise Exception("ZeroDivisionError not raised") 857 """ 858 assert_python_ok('-c', code) 859 860 @unittest.skipUnless(hasattr(signal, 'pthread_kill'), 861 'need signal.pthread_kill()') 862 def test_pthread_kill(self): 863 code = """if 1: 864 import signal 865 import threading 866 import sys 867 868 signum = signal.SIGUSR1 869 870 def handler(signum, frame): 871 1/0 872 873 signal.signal(signum, handler) 874 875 tid = threading.get_ident() 876 try: 877 signal.pthread_kill(tid, signum) 878 except ZeroDivisionError: 879 pass 880 else: 881 raise Exception("ZeroDivisionError not raised") 882 """ 883 assert_python_ok('-c', code) 884 885 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 886 'need signal.pthread_sigmask()') 887 def wait_helper(self, blocked, test): 888 """ 889 test: body of the "def test(signum):" function. 890 blocked: number of the blocked signal 891 """ 892 code = '''if 1: 893 import signal 894 import sys 895 from signal import Signals 896 897 def handler(signum, frame): 898 1/0 899 900 %s 901 902 blocked = %r 903 signum = signal.SIGALRM 904 905 # child: block and wait the signal 906 try: 907 signal.signal(signum, handler) 908 signal.pthread_sigmask(signal.SIG_BLOCK, [blocked]) 909 910 # Do the tests 911 test(signum) 912 913 # The handler must not be called on unblock 914 try: 915 signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked]) 916 except ZeroDivisionError: 917 print("the signal handler has been called", 918 file=sys.stderr) 919 sys.exit(1) 920 except BaseException as err: 921 print("error: {}".format(err), file=sys.stderr) 922 sys.stderr.flush() 923 sys.exit(1) 924 ''' % (test.strip(), blocked) 925 926 # sig*wait* must be called with the signal blocked: since the current 927 # process might have several threads running, use a subprocess to have 928 # a single thread. 929 assert_python_ok('-c', code) 930 931 @unittest.skipUnless(hasattr(signal, 'sigwait'), 932 'need signal.sigwait()') 933 def test_sigwait(self): 934 self.wait_helper(signal.SIGALRM, ''' 935 def test(signum): 936 signal.alarm(1) 937 received = signal.sigwait([signum]) 938 assert isinstance(received, signal.Signals), received 939 if received != signum: 940 raise Exception('received %s, not %s' % (received, signum)) 941 ''') 942 943 @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'), 944 'need signal.sigwaitinfo()') 945 def test_sigwaitinfo(self): 946 self.wait_helper(signal.SIGALRM, ''' 947 def test(signum): 948 signal.alarm(1) 949 info = signal.sigwaitinfo([signum]) 950 if info.si_signo != signum: 951 raise Exception("info.si_signo != %s" % signum) 952 ''') 953 954 @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), 955 'need signal.sigtimedwait()') 956 def test_sigtimedwait(self): 957 self.wait_helper(signal.SIGALRM, ''' 958 def test(signum): 959 signal.alarm(1) 960 info = signal.sigtimedwait([signum], 10.1000) 961 if info.si_signo != signum: 962 raise Exception('info.si_signo != %s' % signum) 963 ''') 964 965 @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), 966 'need signal.sigtimedwait()') 967 def test_sigtimedwait_poll(self): 968 # check that polling with sigtimedwait works 969 self.wait_helper(signal.SIGALRM, ''' 970 def test(signum): 971 import os 972 os.kill(os.getpid(), signum) 973 info = signal.sigtimedwait([signum], 0) 974 if info.si_signo != signum: 975 raise Exception('info.si_signo != %s' % signum) 976 ''') 977 978 @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), 979 'need signal.sigtimedwait()') 980 def test_sigtimedwait_timeout(self): 981 self.wait_helper(signal.SIGALRM, ''' 982 def test(signum): 983 received = signal.sigtimedwait([signum], 1.0) 984 if received is not None: 985 raise Exception("received=%r" % (received,)) 986 ''') 987 988 @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), 989 'need signal.sigtimedwait()') 990 def test_sigtimedwait_negative_timeout(self): 991 signum = signal.SIGALRM 992 self.assertRaises(ValueError, signal.sigtimedwait, [signum], -1.0) 993 994 @unittest.skipUnless(hasattr(signal, 'sigwait'), 995 'need signal.sigwait()') 996 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 997 'need signal.pthread_sigmask()') 998 def test_sigwait_thread(self): 999 # Check that calling sigwait() from a thread doesn't suspend the whole 1000 # process. A new interpreter is spawned to avoid problems when mixing 1001 # threads and fork(): only async-safe functions are allowed between 1002 # fork() and exec(). 1003 assert_python_ok("-c", """if True: 1004 import os, threading, sys, time, signal 1005 1006 # the default handler terminates the process 1007 signum = signal.SIGUSR1 1008 1009 def kill_later(): 1010 # wait until the main thread is waiting in sigwait() 1011 time.sleep(1) 1012 os.kill(os.getpid(), signum) 1013 1014 # the signal must be blocked by all the threads 1015 signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) 1016 killer = threading.Thread(target=kill_later) 1017 killer.start() 1018 received = signal.sigwait([signum]) 1019 if received != signum: 1020 print("sigwait() received %s, not %s" % (received, signum), 1021 file=sys.stderr) 1022 sys.exit(1) 1023 killer.join() 1024 # unblock the signal, which should have been cleared by sigwait() 1025 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) 1026 """) 1027 1028 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 1029 'need signal.pthread_sigmask()') 1030 def test_pthread_sigmask_arguments(self): 1031 self.assertRaises(TypeError, signal.pthread_sigmask) 1032 self.assertRaises(TypeError, signal.pthread_sigmask, 1) 1033 self.assertRaises(TypeError, signal.pthread_sigmask, 1, 2, 3) 1034 self.assertRaises(OSError, signal.pthread_sigmask, 1700, []) 1035 with self.assertRaises(ValueError): 1036 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.NSIG]) 1037 with self.assertRaises(ValueError): 1038 signal.pthread_sigmask(signal.SIG_BLOCK, [0]) 1039 with self.assertRaises(ValueError): 1040 signal.pthread_sigmask(signal.SIG_BLOCK, [1<<1000]) 1041 1042 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 1043 'need signal.pthread_sigmask()') 1044 def test_pthread_sigmask_valid_signals(self): 1045 s = signal.pthread_sigmask(signal.SIG_BLOCK, signal.valid_signals()) 1046 self.addCleanup(signal.pthread_sigmask, signal.SIG_SETMASK, s) 1047 # Get current blocked set 1048 s = signal.pthread_sigmask(signal.SIG_UNBLOCK, signal.valid_signals()) 1049 self.assertLessEqual(s, signal.valid_signals()) 1050 1051 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 1052 'need signal.pthread_sigmask()') 1053 def test_pthread_sigmask(self): 1054 code = """if 1: 1055 import signal 1056 import os; import threading 1057 1058 def handler(signum, frame): 1059 1/0 1060 1061 def kill(signum): 1062 os.kill(os.getpid(), signum) 1063 1064 def check_mask(mask): 1065 for sig in mask: 1066 assert isinstance(sig, signal.Signals), repr(sig) 1067 1068 def read_sigmask(): 1069 sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, []) 1070 check_mask(sigmask) 1071 return sigmask 1072 1073 signum = signal.SIGUSR1 1074 1075 # Install our signal handler 1076 old_handler = signal.signal(signum, handler) 1077 1078 # Unblock SIGUSR1 (and copy the old mask) to test our signal handler 1079 old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) 1080 check_mask(old_mask) 1081 try: 1082 kill(signum) 1083 except ZeroDivisionError: 1084 pass 1085 else: 1086 raise Exception("ZeroDivisionError not raised") 1087 1088 # Block and then raise SIGUSR1. The signal is blocked: the signal 1089 # handler is not called, and the signal is now pending 1090 mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) 1091 check_mask(mask) 1092 kill(signum) 1093 1094 # Check the new mask 1095 blocked = read_sigmask() 1096 check_mask(blocked) 1097 if signum not in blocked: 1098 raise Exception("%s not in %s" % (signum, blocked)) 1099 if old_mask ^ blocked != {signum}: 1100 raise Exception("%s ^ %s != {%s}" % (old_mask, blocked, signum)) 1101 1102 # Unblock SIGUSR1 1103 try: 1104 # unblock the pending signal calls immediately the signal handler 1105 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) 1106 except ZeroDivisionError: 1107 pass 1108 else: 1109 raise Exception("ZeroDivisionError not raised") 1110 try: 1111 kill(signum) 1112 except ZeroDivisionError: 1113 pass 1114 else: 1115 raise Exception("ZeroDivisionError not raised") 1116 1117 # Check the new mask 1118 unblocked = read_sigmask() 1119 if signum in unblocked: 1120 raise Exception("%s in %s" % (signum, unblocked)) 1121 if blocked ^ unblocked != {signum}: 1122 raise Exception("%s ^ %s != {%s}" % (blocked, unblocked, signum)) 1123 if old_mask != unblocked: 1124 raise Exception("%s != %s" % (old_mask, unblocked)) 1125 """ 1126 assert_python_ok('-c', code) 1127 1128 @unittest.skipUnless(hasattr(signal, 'pthread_kill'), 1129 'need signal.pthread_kill()') 1130 def test_pthread_kill_main_thread(self): 1131 # Test that a signal can be sent to the main thread with pthread_kill() 1132 # before any other thread has been created (see issue #12392). 1133 code = """if True: 1134 import threading 1135 import signal 1136 import sys 1137 1138 def handler(signum, frame): 1139 sys.exit(3) 1140 1141 signal.signal(signal.SIGUSR1, handler) 1142 signal.pthread_kill(threading.get_ident(), signal.SIGUSR1) 1143 sys.exit(2) 1144 """ 1145 1146 with spawn_python('-c', code) as process: 1147 stdout, stderr = process.communicate() 1148 exitcode = process.wait() 1149 if exitcode != 3: 1150 raise Exception("Child error (exit code %s): %s" % 1151 (exitcode, stdout)) 1152 1153 1154class StressTest(unittest.TestCase): 1155 """ 1156 Stress signal delivery, especially when a signal arrives in 1157 the middle of recomputing the signal state or executing 1158 previously tripped signal handlers. 1159 """ 1160 1161 def setsig(self, signum, handler): 1162 old_handler = signal.signal(signum, handler) 1163 self.addCleanup(signal.signal, signum, old_handler) 1164 1165 def measure_itimer_resolution(self): 1166 N = 20 1167 times = [] 1168 1169 def handler(signum=None, frame=None): 1170 if len(times) < N: 1171 times.append(time.perf_counter()) 1172 # 1 µs is the smallest possible timer interval, 1173 # we want to measure what the concrete duration 1174 # will be on this platform 1175 signal.setitimer(signal.ITIMER_REAL, 1e-6) 1176 1177 self.addCleanup(signal.setitimer, signal.ITIMER_REAL, 0) 1178 self.setsig(signal.SIGALRM, handler) 1179 handler() 1180 while len(times) < N: 1181 time.sleep(1e-3) 1182 1183 durations = [times[i+1] - times[i] for i in range(len(times) - 1)] 1184 med = statistics.median(durations) 1185 if support.verbose: 1186 print("detected median itimer() resolution: %.6f s." % (med,)) 1187 return med 1188 1189 def decide_itimer_count(self): 1190 # Some systems have poor setitimer() resolution (for example 1191 # measured around 20 ms. on FreeBSD 9), so decide on a reasonable 1192 # number of sequential timers based on that. 1193 reso = self.measure_itimer_resolution() 1194 if reso <= 1e-4: 1195 return 10000 1196 elif reso <= 1e-2: 1197 return 100 1198 else: 1199 self.skipTest("detected itimer resolution (%.3f s.) too high " 1200 "(> 10 ms.) on this platform (or system too busy)" 1201 % (reso,)) 1202 1203 @unittest.skipUnless(hasattr(signal, "setitimer"), 1204 "test needs setitimer()") 1205 def test_stress_delivery_dependent(self): 1206 """ 1207 This test uses dependent signal handlers. 1208 """ 1209 N = self.decide_itimer_count() 1210 sigs = [] 1211 1212 def first_handler(signum, frame): 1213 # 1e-6 is the minimum non-zero value for `setitimer()`. 1214 # Choose a random delay so as to improve chances of 1215 # triggering a race condition. Ideally the signal is received 1216 # when inside critical signal-handling routines such as 1217 # Py_MakePendingCalls(). 1218 signal.setitimer(signal.ITIMER_REAL, 1e-6 + random.random() * 1e-5) 1219 1220 def second_handler(signum=None, frame=None): 1221 sigs.append(signum) 1222 1223 # Here on Linux, SIGPROF > SIGALRM > SIGUSR1. By using both 1224 # ascending and descending sequences (SIGUSR1 then SIGALRM, 1225 # SIGPROF then SIGALRM), we maximize chances of hitting a bug. 1226 self.setsig(signal.SIGPROF, first_handler) 1227 self.setsig(signal.SIGUSR1, first_handler) 1228 self.setsig(signal.SIGALRM, second_handler) # for ITIMER_REAL 1229 1230 expected_sigs = 0 1231 deadline = time.monotonic() + support.SHORT_TIMEOUT 1232 1233 while expected_sigs < N: 1234 os.kill(os.getpid(), signal.SIGPROF) 1235 expected_sigs += 1 1236 # Wait for handlers to run to avoid signal coalescing 1237 while len(sigs) < expected_sigs and time.monotonic() < deadline: 1238 time.sleep(1e-5) 1239 1240 os.kill(os.getpid(), signal.SIGUSR1) 1241 expected_sigs += 1 1242 while len(sigs) < expected_sigs and time.monotonic() < deadline: 1243 time.sleep(1e-5) 1244 1245 # All ITIMER_REAL signals should have been delivered to the 1246 # Python handler 1247 self.assertEqual(len(sigs), N, "Some signals were lost") 1248 1249 @unittest.skipUnless(hasattr(signal, "setitimer"), 1250 "test needs setitimer()") 1251 def test_stress_delivery_simultaneous(self): 1252 """ 1253 This test uses simultaneous signal handlers. 1254 """ 1255 N = self.decide_itimer_count() 1256 sigs = [] 1257 1258 def handler(signum, frame): 1259 sigs.append(signum) 1260 1261 self.setsig(signal.SIGUSR1, handler) 1262 self.setsig(signal.SIGALRM, handler) # for ITIMER_REAL 1263 1264 expected_sigs = 0 1265 deadline = time.monotonic() + support.SHORT_TIMEOUT 1266 1267 while expected_sigs < N: 1268 # Hopefully the SIGALRM will be received somewhere during 1269 # initial processing of SIGUSR1. 1270 signal.setitimer(signal.ITIMER_REAL, 1e-6 + random.random() * 1e-5) 1271 os.kill(os.getpid(), signal.SIGUSR1) 1272 1273 expected_sigs += 2 1274 # Wait for handlers to run to avoid signal coalescing 1275 while len(sigs) < expected_sigs and time.monotonic() < deadline: 1276 time.sleep(1e-5) 1277 1278 # All ITIMER_REAL signals should have been delivered to the 1279 # Python handler 1280 self.assertEqual(len(sigs), N, "Some signals were lost") 1281 1282 @unittest.skipUnless(hasattr(signal, "SIGUSR1"), 1283 "test needs SIGUSR1") 1284 def test_stress_modifying_handlers(self): 1285 # bpo-43406: race condition between trip_signal() and signal.signal 1286 signum = signal.SIGUSR1 1287 num_sent_signals = 0 1288 num_received_signals = 0 1289 do_stop = False 1290 1291 def custom_handler(signum, frame): 1292 nonlocal num_received_signals 1293 num_received_signals += 1 1294 1295 def set_interrupts(): 1296 nonlocal num_sent_signals 1297 while not do_stop: 1298 signal.raise_signal(signum) 1299 num_sent_signals += 1 1300 1301 def cycle_handlers(): 1302 while num_sent_signals < 100: 1303 for i in range(20000): 1304 # Cycle between a Python-defined and a non-Python handler 1305 for handler in [custom_handler, signal.SIG_IGN]: 1306 signal.signal(signum, handler) 1307 1308 old_handler = signal.signal(signum, custom_handler) 1309 self.addCleanup(signal.signal, signum, old_handler) 1310 1311 t = threading.Thread(target=set_interrupts) 1312 try: 1313 ignored = False 1314 with support.catch_unraisable_exception() as cm: 1315 t.start() 1316 cycle_handlers() 1317 do_stop = True 1318 t.join() 1319 1320 if cm.unraisable is not None: 1321 # An unraisable exception may be printed out when 1322 # a signal is ignored due to the aforementioned 1323 # race condition, check it. 1324 self.assertIsInstance(cm.unraisable.exc_value, OSError) 1325 self.assertIn( 1326 f"Signal {signum:d} ignored due to race condition", 1327 str(cm.unraisable.exc_value)) 1328 ignored = True 1329 1330 # bpo-43406: Even if it is unlikely, it's technically possible that 1331 # all signals were ignored because of race conditions. 1332 if not ignored: 1333 # Sanity check that some signals were received, but not all 1334 self.assertGreater(num_received_signals, 0) 1335 self.assertLess(num_received_signals, num_sent_signals) 1336 finally: 1337 do_stop = True 1338 t.join() 1339 1340 1341class RaiseSignalTest(unittest.TestCase): 1342 1343 def test_sigint(self): 1344 with self.assertRaises(KeyboardInterrupt): 1345 signal.raise_signal(signal.SIGINT) 1346 1347 @unittest.skipIf(sys.platform != "win32", "Windows specific test") 1348 def test_invalid_argument(self): 1349 try: 1350 SIGHUP = 1 # not supported on win32 1351 signal.raise_signal(SIGHUP) 1352 self.fail("OSError (Invalid argument) expected") 1353 except OSError as e: 1354 if e.errno == errno.EINVAL: 1355 pass 1356 else: 1357 raise 1358 1359 def test_handler(self): 1360 is_ok = False 1361 def handler(a, b): 1362 nonlocal is_ok 1363 is_ok = True 1364 old_signal = signal.signal(signal.SIGINT, handler) 1365 self.addCleanup(signal.signal, signal.SIGINT, old_signal) 1366 1367 signal.raise_signal(signal.SIGINT) 1368 self.assertTrue(is_ok) 1369 1370 1371class PidfdSignalTest(unittest.TestCase): 1372 1373 @unittest.skipUnless( 1374 hasattr(signal, "pidfd_send_signal"), 1375 "pidfd support not built in", 1376 ) 1377 def test_pidfd_send_signal(self): 1378 with self.assertRaises(OSError) as cm: 1379 signal.pidfd_send_signal(0, signal.SIGINT) 1380 if cm.exception.errno == errno.ENOSYS: 1381 self.skipTest("kernel does not support pidfds") 1382 elif cm.exception.errno == errno.EPERM: 1383 self.skipTest("Not enough privileges to use pidfs") 1384 self.assertEqual(cm.exception.errno, errno.EBADF) 1385 my_pidfd = os.open(f'/proc/{os.getpid()}', os.O_DIRECTORY) 1386 self.addCleanup(os.close, my_pidfd) 1387 with self.assertRaisesRegex(TypeError, "^siginfo must be None$"): 1388 signal.pidfd_send_signal(my_pidfd, signal.SIGINT, object(), 0) 1389 with self.assertRaises(KeyboardInterrupt): 1390 signal.pidfd_send_signal(my_pidfd, signal.SIGINT) 1391 1392def tearDownModule(): 1393 support.reap_children() 1394 1395if __name__ == "__main__": 1396 unittest.main() 1397