1import errno 2import math 3import os 4import sys 5 6from .. import _core, _subprocess 7from .._sync import CapacityLimiter, Event 8from .._threads import to_thread_run_sync 9 10try: 11 from os import waitid 12 13 def sync_wait_reapable(pid): 14 waitid(os.P_PID, pid, os.WEXITED | os.WNOWAIT) 15 16 17except ImportError: 18 # pypy doesn't define os.waitid so we need to pull it out ourselves 19 # using cffi: https://bitbucket.org/pypy/pypy/issues/2922/ 20 import cffi 21 22 waitid_ffi = cffi.FFI() 23 24 # Believe it or not, siginfo_t starts with fields in the 25 # same layout on both Linux and Darwin. The Linux structure 26 # is bigger so that's what we use to size `pad`; while 27 # there are a few extra fields in there, most of it is 28 # true padding which would not be written by the syscall. 29 waitid_ffi.cdef( 30 """ 31typedef struct siginfo_s { 32 int si_signo; 33 int si_errno; 34 int si_code; 35 int si_pid; 36 int si_uid; 37 int si_status; 38 int pad[26]; 39} siginfo_t; 40int waitid(int idtype, int id, siginfo_t* result, int options); 41""" 42 ) 43 waitid = waitid_ffi.dlopen(None).waitid 44 45 def sync_wait_reapable(pid): 46 P_PID = 1 47 WEXITED = 0x00000004 48 if sys.platform == "darwin": # pragma: no cover 49 # waitid() is not exposed on Python on Darwin but does 50 # work through CFFI; note that we typically won't get 51 # here since Darwin also defines kqueue 52 WNOWAIT = 0x00000020 53 else: 54 WNOWAIT = 0x01000000 55 result = waitid_ffi.new("siginfo_t *") 56 while waitid(P_PID, pid, result, WEXITED | WNOWAIT) < 0: 57 got_errno = waitid_ffi.errno 58 if got_errno == errno.EINTR: 59 continue 60 raise OSError(got_errno, os.strerror(got_errno)) 61 62 63# adapted from 64# https://github.com/python-trio/trio/issues/4#issuecomment-398967572 65 66waitid_limiter = CapacityLimiter(math.inf) 67 68 69async def _waitid_system_task(pid: int, event: Event) -> None: 70 """Spawn a thread that waits for ``pid`` to exit, then wake any tasks 71 that were waiting on it. 72 """ 73 # cancellable=True: if this task is cancelled, then we abandon the 74 # thread to keep running waitpid in the background. Since this is 75 # always run as a system task, this will only happen if the whole 76 # call to trio.run is shutting down. 77 78 try: 79 await to_thread_run_sync( 80 sync_wait_reapable, pid, cancellable=True, limiter=waitid_limiter 81 ) 82 except OSError: 83 # If waitid fails, waitpid will fail too, so it still makes 84 # sense to wake up the callers of wait_process_exiting(). The 85 # most likely reason for this error in practice is a child 86 # exiting when wait() is not possible because SIGCHLD is 87 # ignored. 88 pass 89 finally: 90 event.set() 91 92 93async def wait_child_exiting(process: "_subprocess.Process") -> None: 94 # Logic of this function: 95 # - The first time we get called, we create an Event and start 96 # an instance of _waitid_system_task that will set the Event 97 # when waitid() completes. If that Event is set before 98 # we get cancelled, we're good. 99 # - Otherwise, a following call after the cancellation must 100 # reuse the Event created during the first call, lest we 101 # create an arbitrary number of threads waiting on the same 102 # process. 103 104 if process._wait_for_exit_data is None: 105 process._wait_for_exit_data = event = Event() # type: ignore 106 _core.spawn_system_task(_waitid_system_task, process.pid, event) 107 assert isinstance(process._wait_for_exit_data, Event) 108 await process._wait_for_exit_data.wait() 109