1import errno
2import math
3import os
4import sys
5
6from .. import _core, _subprocess
7from .._sync import CapacityLimiter, Event
8from .._threads import to_thread_run_sync
9
10try:
11    from os import waitid
12
13    def sync_wait_reapable(pid):
14        waitid(os.P_PID, pid, os.WEXITED | os.WNOWAIT)
15
16
17except ImportError:
18    # pypy doesn't define os.waitid so we need to pull it out ourselves
19    # using cffi: https://bitbucket.org/pypy/pypy/issues/2922/
20    import cffi
21
22    waitid_ffi = cffi.FFI()
23
24    # Believe it or not, siginfo_t starts with fields in the
25    # same layout on both Linux and Darwin. The Linux structure
26    # is bigger so that's what we use to size `pad`; while
27    # there are a few extra fields in there, most of it is
28    # true padding which would not be written by the syscall.
29    waitid_ffi.cdef(
30        """
31typedef struct siginfo_s {
32    int si_signo;
33    int si_errno;
34    int si_code;
35    int si_pid;
36    int si_uid;
37    int si_status;
38    int pad[26];
39} siginfo_t;
40int waitid(int idtype, int id, siginfo_t* result, int options);
41"""
42    )
43    waitid = waitid_ffi.dlopen(None).waitid
44
45    def sync_wait_reapable(pid):
46        P_PID = 1
47        WEXITED = 0x00000004
48        if sys.platform == "darwin":  # pragma: no cover
49            # waitid() is not exposed on Python on Darwin but does
50            # work through CFFI; note that we typically won't get
51            # here since Darwin also defines kqueue
52            WNOWAIT = 0x00000020
53        else:
54            WNOWAIT = 0x01000000
55        result = waitid_ffi.new("siginfo_t *")
56        while waitid(P_PID, pid, result, WEXITED | WNOWAIT) < 0:
57            got_errno = waitid_ffi.errno
58            if got_errno == errno.EINTR:
59                continue
60            raise OSError(got_errno, os.strerror(got_errno))
61
62
63# adapted from
64# https://github.com/python-trio/trio/issues/4#issuecomment-398967572
65
66waitid_limiter = CapacityLimiter(math.inf)
67
68
69async def _waitid_system_task(pid: int, event: Event) -> None:
70    """Spawn a thread that waits for ``pid`` to exit, then wake any tasks
71    that were waiting on it.
72    """
73    # cancellable=True: if this task is cancelled, then we abandon the
74    # thread to keep running waitpid in the background. Since this is
75    # always run as a system task, this will only happen if the whole
76    # call to trio.run is shutting down.
77
78    try:
79        await to_thread_run_sync(
80            sync_wait_reapable, pid, cancellable=True, limiter=waitid_limiter
81        )
82    except OSError:
83        # If waitid fails, waitpid will fail too, so it still makes
84        # sense to wake up the callers of wait_process_exiting(). The
85        # most likely reason for this error in practice is a child
86        # exiting when wait() is not possible because SIGCHLD is
87        # ignored.
88        pass
89    finally:
90        event.set()
91
92
93async def wait_child_exiting(process: "_subprocess.Process") -> None:
94    # Logic of this function:
95    # - The first time we get called, we create an Event and start
96    #   an instance of _waitid_system_task that will set the Event
97    #   when waitid() completes. If that Event is set before
98    #   we get cancelled, we're good.
99    # - Otherwise, a following call after the cancellation must
100    #   reuse the Event created during the first call, lest we
101    #   create an arbitrary number of threads waiting on the same
102    #   process.
103
104    if process._wait_for_exit_data is None:
105        process._wait_for_exit_data = event = Event()  # type: ignore
106        _core.spawn_system_task(_waitid_system_task, process.pid, event)
107    assert isinstance(process._wait_for_exit_data, Event)
108    await process._wait_for_exit_data.wait()
109