1 use crate::process::imp::orphan::{OrphanQueue, Wait};
2 use crate::process::kill::Kill;
3 use crate::signal::unix::Signal;
4 
5 use std::future::Future;
6 use std::io;
7 use std::ops::Deref;
8 use std::pin::Pin;
9 use std::process::ExitStatus;
10 use std::task::Context;
11 use std::task::Poll;
12 
13 /// Orchestrates between registering interest for receiving signals when a
14 /// child process has exited, and attempting to poll for process completion.
15 #[derive(Debug)]
16 pub(crate) struct Reaper<W, Q, S>
17 where
18     W: Wait + Unpin,
19     Q: OrphanQueue<W>,
20 {
21     inner: Option<W>,
22     orphan_queue: Q,
23     signal: S,
24 }
25 
26 // Work around removal of `futures_core` dependency
27 pub(crate) trait Stream: Unpin {
poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>>28     fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>>;
29 }
30 
31 impl Stream for Signal {
poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>>32     fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
33         Signal::poll_recv(self, cx)
34     }
35 }
36 
37 impl<W, Q, S> Deref for Reaper<W, Q, S>
38 where
39     W: Wait + Unpin,
40     Q: OrphanQueue<W>,
41 {
42     type Target = W;
43 
deref(&self) -> &Self::Target44     fn deref(&self) -> &Self::Target {
45         self.inner()
46     }
47 }
48 
49 impl<W, Q, S> Reaper<W, Q, S>
50 where
51     W: Wait + Unpin,
52     Q: OrphanQueue<W>,
53 {
new(inner: W, orphan_queue: Q, signal: S) -> Self54     pub(crate) fn new(inner: W, orphan_queue: Q, signal: S) -> Self {
55         Self {
56             inner: Some(inner),
57             orphan_queue,
58             signal,
59         }
60     }
61 
inner(&self) -> &W62     fn inner(&self) -> &W {
63         self.inner.as_ref().expect("inner has gone away")
64     }
65 
inner_mut(&mut self) -> &mut W66     fn inner_mut(&mut self) -> &mut W {
67         self.inner.as_mut().expect("inner has gone away")
68     }
69 }
70 
71 impl<W, Q, S> Future for Reaper<W, Q, S>
72 where
73     W: Wait + Unpin,
74     Q: OrphanQueue<W> + Unpin,
75     S: Stream,
76 {
77     type Output = io::Result<ExitStatus>;
78 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>79     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
80         loop {
81             // If the child hasn't exited yet, then it's our responsibility to
82             // ensure the current task gets notified when it might be able to
83             // make progress.
84             //
85             // As described in `spawn` above, we just indicate that we can
86             // next make progress once a SIGCHLD is received.
87             //
88             // However, we will register for a notification on the next signal
89             // BEFORE we poll the child. Otherwise it is possible that the child
90             // can exit and the signal can arrive after we last polled the child,
91             // but before we've registered for a notification on the next signal
92             // (this can cause a deadlock if there are no more spawned children
93             // which can generate a different signal for us). A side effect of
94             // pre-registering for signal notifications is that when the child
95             // exits, we will have already registered for an additional
96             // notification we don't need to consume. If another signal arrives,
97             // this future's task will be notified/woken up again. Since the
98             // futures model allows for spurious wake ups this extra wakeup
99             // should not cause significant issues with parent futures.
100             let registered_interest = self.signal.poll_recv(cx).is_pending();
101 
102             self.orphan_queue.reap_orphans();
103             if let Some(status) = self.inner_mut().try_wait()? {
104                 return Poll::Ready(Ok(status));
105             }
106 
107             // If our attempt to poll for the next signal was not ready, then
108             // we've arranged for our task to get notified and we can bail out.
109             if registered_interest {
110                 return Poll::Pending;
111             } else {
112                 // Otherwise, if the signal stream delivered a signal to us, we
113                 // won't get notified at the next signal, so we'll loop and try
114                 // again.
115                 continue;
116             }
117         }
118     }
119 }
120 
121 impl<W, Q, S> Kill for Reaper<W, Q, S>
122 where
123     W: Kill + Wait + Unpin,
124     Q: OrphanQueue<W>,
125 {
kill(&mut self) -> io::Result<()>126     fn kill(&mut self) -> io::Result<()> {
127         self.inner_mut().kill()
128     }
129 }
130 
131 impl<W, Q, S> Drop for Reaper<W, Q, S>
132 where
133     W: Wait + Unpin,
134     Q: OrphanQueue<W>,
135 {
drop(&mut self)136     fn drop(&mut self) {
137         if let Ok(Some(_)) = self.inner_mut().try_wait() {
138             return;
139         }
140 
141         let orphan = self.inner.take().unwrap();
142         self.orphan_queue.push_orphan(orphan);
143     }
144 }
145 
146 #[cfg(all(test, not(loom)))]
147 mod test {
148     use super::*;
149 
150     use futures::future::FutureExt;
151     use std::cell::{Cell, RefCell};
152     use std::os::unix::process::ExitStatusExt;
153     use std::process::ExitStatus;
154     use std::task::Context;
155     use std::task::Poll;
156 
157     #[derive(Debug)]
158     struct MockWait {
159         total_kills: usize,
160         total_waits: usize,
161         num_wait_until_status: usize,
162         status: ExitStatus,
163     }
164 
165     impl MockWait {
new(status: ExitStatus, num_wait_until_status: usize) -> Self166         fn new(status: ExitStatus, num_wait_until_status: usize) -> Self {
167             Self {
168                 total_kills: 0,
169                 total_waits: 0,
170                 num_wait_until_status,
171                 status,
172             }
173         }
174     }
175 
176     impl Wait for MockWait {
id(&self) -> u32177         fn id(&self) -> u32 {
178             0
179         }
180 
try_wait(&mut self) -> io::Result<Option<ExitStatus>>181         fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> {
182             let ret = if self.num_wait_until_status == self.total_waits {
183                 Some(self.status)
184             } else {
185                 None
186             };
187 
188             self.total_waits += 1;
189             Ok(ret)
190         }
191     }
192 
193     impl Kill for MockWait {
kill(&mut self) -> io::Result<()>194         fn kill(&mut self) -> io::Result<()> {
195             self.total_kills += 1;
196             Ok(())
197         }
198     }
199 
200     struct MockStream {
201         total_polls: usize,
202         values: Vec<Option<()>>,
203     }
204 
205     impl MockStream {
new(values: Vec<Option<()>>) -> Self206         fn new(values: Vec<Option<()>>) -> Self {
207             Self {
208                 total_polls: 0,
209                 values,
210             }
211         }
212     }
213 
214     impl Stream for MockStream {
poll_recv(&mut self, _cx: &mut Context<'_>) -> Poll<Option<()>>215         fn poll_recv(&mut self, _cx: &mut Context<'_>) -> Poll<Option<()>> {
216             self.total_polls += 1;
217             match self.values.remove(0) {
218                 Some(()) => Poll::Ready(Some(())),
219                 None => Poll::Pending,
220             }
221         }
222     }
223 
224     struct MockQueue<W> {
225         all_enqueued: RefCell<Vec<W>>,
226         total_reaps: Cell<usize>,
227     }
228 
229     impl<W> MockQueue<W> {
new() -> Self230         fn new() -> Self {
231             Self {
232                 all_enqueued: RefCell::new(Vec::new()),
233                 total_reaps: Cell::new(0),
234             }
235         }
236     }
237 
238     impl<W: Wait> OrphanQueue<W> for MockQueue<W> {
push_orphan(&self, orphan: W)239         fn push_orphan(&self, orphan: W) {
240             self.all_enqueued.borrow_mut().push(orphan);
241         }
242 
reap_orphans(&self)243         fn reap_orphans(&self) {
244             self.total_reaps.set(self.total_reaps.get() + 1);
245         }
246     }
247 
248     #[test]
reaper()249     fn reaper() {
250         let exit = ExitStatus::from_raw(0);
251         let mock = MockWait::new(exit, 3);
252         let mut grim = Reaper::new(
253             mock,
254             MockQueue::new(),
255             MockStream::new(vec![None, Some(()), None, None, None]),
256         );
257 
258         let waker = futures::task::noop_waker();
259         let mut context = Context::from_waker(&waker);
260 
261         // Not yet exited, interest registered
262         assert!(grim.poll_unpin(&mut context).is_pending());
263         assert_eq!(1, grim.signal.total_polls);
264         assert_eq!(1, grim.total_waits);
265         assert_eq!(1, grim.orphan_queue.total_reaps.get());
266         assert!(grim.orphan_queue.all_enqueued.borrow().is_empty());
267 
268         // Not yet exited, couldn't register interest the first time
269         // but managed to register interest the second time around
270         assert!(grim.poll_unpin(&mut context).is_pending());
271         assert_eq!(3, grim.signal.total_polls);
272         assert_eq!(3, grim.total_waits);
273         assert_eq!(3, grim.orphan_queue.total_reaps.get());
274         assert!(grim.orphan_queue.all_enqueued.borrow().is_empty());
275 
276         // Exited
277         if let Poll::Ready(r) = grim.poll_unpin(&mut context) {
278             assert!(r.is_ok());
279             let exit_code = r.unwrap();
280             assert_eq!(exit_code, exit);
281         } else {
282             unreachable!();
283         }
284         assert_eq!(4, grim.signal.total_polls);
285         assert_eq!(4, grim.total_waits);
286         assert_eq!(4, grim.orphan_queue.total_reaps.get());
287         assert!(grim.orphan_queue.all_enqueued.borrow().is_empty());
288     }
289 
290     #[test]
kill()291     fn kill() {
292         let exit = ExitStatus::from_raw(0);
293         let mut grim = Reaper::new(
294             MockWait::new(exit, 0),
295             MockQueue::new(),
296             MockStream::new(vec![None]),
297         );
298 
299         grim.kill().unwrap();
300         assert_eq!(1, grim.total_kills);
301         assert_eq!(0, grim.orphan_queue.total_reaps.get());
302         assert!(grim.orphan_queue.all_enqueued.borrow().is_empty());
303     }
304 
305     #[test]
drop_reaps_if_possible()306     fn drop_reaps_if_possible() {
307         let exit = ExitStatus::from_raw(0);
308         let mut mock = MockWait::new(exit, 0);
309 
310         {
311             let queue = MockQueue::new();
312 
313             let grim = Reaper::new(&mut mock, &queue, MockStream::new(vec![]));
314 
315             drop(grim);
316 
317             assert_eq!(0, queue.total_reaps.get());
318             assert!(queue.all_enqueued.borrow().is_empty());
319         }
320 
321         assert_eq!(1, mock.total_waits);
322         assert_eq!(0, mock.total_kills);
323     }
324 
325     #[test]
drop_enqueues_orphan_if_wait_fails()326     fn drop_enqueues_orphan_if_wait_fails() {
327         let exit = ExitStatus::from_raw(0);
328         let mut mock = MockWait::new(exit, 2);
329 
330         {
331             let queue = MockQueue::<&mut MockWait>::new();
332             let grim = Reaper::new(&mut mock, &queue, MockStream::new(vec![]));
333             drop(grim);
334 
335             assert_eq!(0, queue.total_reaps.get());
336             assert_eq!(1, queue.all_enqueued.borrow().len());
337         }
338 
339         assert_eq!(1, mock.total_waits);
340         assert_eq!(0, mock.total_kills);
341     }
342 }
343