1 use crate::process::imp::orphan::{OrphanQueue, Wait}; 2 use crate::process::kill::Kill; 3 use crate::signal::unix::Signal; 4 5 use std::future::Future; 6 use std::io; 7 use std::ops::Deref; 8 use std::pin::Pin; 9 use std::process::ExitStatus; 10 use std::task::Context; 11 use std::task::Poll; 12 13 /// Orchestrates between registering interest for receiving signals when a 14 /// child process has exited, and attempting to poll for process completion. 15 #[derive(Debug)] 16 pub(crate) struct Reaper<W, Q, S> 17 where 18 W: Wait + Unpin, 19 Q: OrphanQueue<W>, 20 { 21 inner: Option<W>, 22 orphan_queue: Q, 23 signal: S, 24 } 25 26 // Work around removal of `futures_core` dependency 27 pub(crate) trait Stream: Unpin { poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>>28 fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>>; 29 } 30 31 impl Stream for Signal { poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>>32 fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> { 33 Signal::poll_recv(self, cx) 34 } 35 } 36 37 impl<W, Q, S> Deref for Reaper<W, Q, S> 38 where 39 W: Wait + Unpin, 40 Q: OrphanQueue<W>, 41 { 42 type Target = W; 43 deref(&self) -> &Self::Target44 fn deref(&self) -> &Self::Target { 45 self.inner() 46 } 47 } 48 49 impl<W, Q, S> Reaper<W, Q, S> 50 where 51 W: Wait + Unpin, 52 Q: OrphanQueue<W>, 53 { new(inner: W, orphan_queue: Q, signal: S) -> Self54 pub(crate) fn new(inner: W, orphan_queue: Q, signal: S) -> Self { 55 Self { 56 inner: Some(inner), 57 orphan_queue, 58 signal, 59 } 60 } 61 inner(&self) -> &W62 fn inner(&self) -> &W { 63 self.inner.as_ref().expect("inner has gone away") 64 } 65 inner_mut(&mut self) -> &mut W66 fn inner_mut(&mut self) -> &mut W { 67 self.inner.as_mut().expect("inner has gone away") 68 } 69 } 70 71 impl<W, Q, S> Future for Reaper<W, Q, S> 72 where 73 W: Wait + Unpin, 74 Q: OrphanQueue<W> + Unpin, 75 S: Stream, 76 { 77 type Output = io::Result<ExitStatus>; 78 poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>79 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 80 loop { 81 // If the child hasn't exited yet, then it's our responsibility to 82 // ensure the current task gets notified when it might be able to 83 // make progress. 84 // 85 // As described in `spawn` above, we just indicate that we can 86 // next make progress once a SIGCHLD is received. 87 // 88 // However, we will register for a notification on the next signal 89 // BEFORE we poll the child. Otherwise it is possible that the child 90 // can exit and the signal can arrive after we last polled the child, 91 // but before we've registered for a notification on the next signal 92 // (this can cause a deadlock if there are no more spawned children 93 // which can generate a different signal for us). A side effect of 94 // pre-registering for signal notifications is that when the child 95 // exits, we will have already registered for an additional 96 // notification we don't need to consume. If another signal arrives, 97 // this future's task will be notified/woken up again. Since the 98 // futures model allows for spurious wake ups this extra wakeup 99 // should not cause significant issues with parent futures. 100 let registered_interest = self.signal.poll_recv(cx).is_pending(); 101 102 self.orphan_queue.reap_orphans(); 103 if let Some(status) = self.inner_mut().try_wait()? { 104 return Poll::Ready(Ok(status)); 105 } 106 107 // If our attempt to poll for the next signal was not ready, then 108 // we've arranged for our task to get notified and we can bail out. 109 if registered_interest { 110 return Poll::Pending; 111 } else { 112 // Otherwise, if the signal stream delivered a signal to us, we 113 // won't get notified at the next signal, so we'll loop and try 114 // again. 115 continue; 116 } 117 } 118 } 119 } 120 121 impl<W, Q, S> Kill for Reaper<W, Q, S> 122 where 123 W: Kill + Wait + Unpin, 124 Q: OrphanQueue<W>, 125 { kill(&mut self) -> io::Result<()>126 fn kill(&mut self) -> io::Result<()> { 127 self.inner_mut().kill() 128 } 129 } 130 131 impl<W, Q, S> Drop for Reaper<W, Q, S> 132 where 133 W: Wait + Unpin, 134 Q: OrphanQueue<W>, 135 { drop(&mut self)136 fn drop(&mut self) { 137 if let Ok(Some(_)) = self.inner_mut().try_wait() { 138 return; 139 } 140 141 let orphan = self.inner.take().unwrap(); 142 self.orphan_queue.push_orphan(orphan); 143 } 144 } 145 146 #[cfg(all(test, not(loom)))] 147 mod test { 148 use super::*; 149 150 use futures::future::FutureExt; 151 use std::cell::{Cell, RefCell}; 152 use std::os::unix::process::ExitStatusExt; 153 use std::process::ExitStatus; 154 use std::task::Context; 155 use std::task::Poll; 156 157 #[derive(Debug)] 158 struct MockWait { 159 total_kills: usize, 160 total_waits: usize, 161 num_wait_until_status: usize, 162 status: ExitStatus, 163 } 164 165 impl MockWait { new(status: ExitStatus, num_wait_until_status: usize) -> Self166 fn new(status: ExitStatus, num_wait_until_status: usize) -> Self { 167 Self { 168 total_kills: 0, 169 total_waits: 0, 170 num_wait_until_status, 171 status, 172 } 173 } 174 } 175 176 impl Wait for MockWait { id(&self) -> u32177 fn id(&self) -> u32 { 178 0 179 } 180 try_wait(&mut self) -> io::Result<Option<ExitStatus>>181 fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> { 182 let ret = if self.num_wait_until_status == self.total_waits { 183 Some(self.status) 184 } else { 185 None 186 }; 187 188 self.total_waits += 1; 189 Ok(ret) 190 } 191 } 192 193 impl Kill for MockWait { kill(&mut self) -> io::Result<()>194 fn kill(&mut self) -> io::Result<()> { 195 self.total_kills += 1; 196 Ok(()) 197 } 198 } 199 200 struct MockStream { 201 total_polls: usize, 202 values: Vec<Option<()>>, 203 } 204 205 impl MockStream { new(values: Vec<Option<()>>) -> Self206 fn new(values: Vec<Option<()>>) -> Self { 207 Self { 208 total_polls: 0, 209 values, 210 } 211 } 212 } 213 214 impl Stream for MockStream { poll_recv(&mut self, _cx: &mut Context<'_>) -> Poll<Option<()>>215 fn poll_recv(&mut self, _cx: &mut Context<'_>) -> Poll<Option<()>> { 216 self.total_polls += 1; 217 match self.values.remove(0) { 218 Some(()) => Poll::Ready(Some(())), 219 None => Poll::Pending, 220 } 221 } 222 } 223 224 struct MockQueue<W> { 225 all_enqueued: RefCell<Vec<W>>, 226 total_reaps: Cell<usize>, 227 } 228 229 impl<W> MockQueue<W> { new() -> Self230 fn new() -> Self { 231 Self { 232 all_enqueued: RefCell::new(Vec::new()), 233 total_reaps: Cell::new(0), 234 } 235 } 236 } 237 238 impl<W: Wait> OrphanQueue<W> for MockQueue<W> { push_orphan(&self, orphan: W)239 fn push_orphan(&self, orphan: W) { 240 self.all_enqueued.borrow_mut().push(orphan); 241 } 242 reap_orphans(&self)243 fn reap_orphans(&self) { 244 self.total_reaps.set(self.total_reaps.get() + 1); 245 } 246 } 247 248 #[test] reaper()249 fn reaper() { 250 let exit = ExitStatus::from_raw(0); 251 let mock = MockWait::new(exit, 3); 252 let mut grim = Reaper::new( 253 mock, 254 MockQueue::new(), 255 MockStream::new(vec![None, Some(()), None, None, None]), 256 ); 257 258 let waker = futures::task::noop_waker(); 259 let mut context = Context::from_waker(&waker); 260 261 // Not yet exited, interest registered 262 assert!(grim.poll_unpin(&mut context).is_pending()); 263 assert_eq!(1, grim.signal.total_polls); 264 assert_eq!(1, grim.total_waits); 265 assert_eq!(1, grim.orphan_queue.total_reaps.get()); 266 assert!(grim.orphan_queue.all_enqueued.borrow().is_empty()); 267 268 // Not yet exited, couldn't register interest the first time 269 // but managed to register interest the second time around 270 assert!(grim.poll_unpin(&mut context).is_pending()); 271 assert_eq!(3, grim.signal.total_polls); 272 assert_eq!(3, grim.total_waits); 273 assert_eq!(3, grim.orphan_queue.total_reaps.get()); 274 assert!(grim.orphan_queue.all_enqueued.borrow().is_empty()); 275 276 // Exited 277 if let Poll::Ready(r) = grim.poll_unpin(&mut context) { 278 assert!(r.is_ok()); 279 let exit_code = r.unwrap(); 280 assert_eq!(exit_code, exit); 281 } else { 282 unreachable!(); 283 } 284 assert_eq!(4, grim.signal.total_polls); 285 assert_eq!(4, grim.total_waits); 286 assert_eq!(4, grim.orphan_queue.total_reaps.get()); 287 assert!(grim.orphan_queue.all_enqueued.borrow().is_empty()); 288 } 289 290 #[test] kill()291 fn kill() { 292 let exit = ExitStatus::from_raw(0); 293 let mut grim = Reaper::new( 294 MockWait::new(exit, 0), 295 MockQueue::new(), 296 MockStream::new(vec![None]), 297 ); 298 299 grim.kill().unwrap(); 300 assert_eq!(1, grim.total_kills); 301 assert_eq!(0, grim.orphan_queue.total_reaps.get()); 302 assert!(grim.orphan_queue.all_enqueued.borrow().is_empty()); 303 } 304 305 #[test] drop_reaps_if_possible()306 fn drop_reaps_if_possible() { 307 let exit = ExitStatus::from_raw(0); 308 let mut mock = MockWait::new(exit, 0); 309 310 { 311 let queue = MockQueue::new(); 312 313 let grim = Reaper::new(&mut mock, &queue, MockStream::new(vec![])); 314 315 drop(grim); 316 317 assert_eq!(0, queue.total_reaps.get()); 318 assert!(queue.all_enqueued.borrow().is_empty()); 319 } 320 321 assert_eq!(1, mock.total_waits); 322 assert_eq!(0, mock.total_kills); 323 } 324 325 #[test] drop_enqueues_orphan_if_wait_fails()326 fn drop_enqueues_orphan_if_wait_fails() { 327 let exit = ExitStatus::from_raw(0); 328 let mut mock = MockWait::new(exit, 2); 329 330 { 331 let queue = MockQueue::<&mut MockWait>::new(); 332 let grim = Reaper::new(&mut mock, &queue, MockStream::new(vec![])); 333 drop(grim); 334 335 assert_eq!(0, queue.total_reaps.get()); 336 assert_eq!(1, queue.all_enqueued.borrow().len()); 337 } 338 339 assert_eq!(1, mock.total_waits); 340 assert_eq!(0, mock.total_kills); 341 } 342 } 343