1 extern crate crossbeam_queue; 2 3 use self::crossbeam_queue::SegQueue; 4 use std::io; 5 use std::process::ExitStatus; 6 7 /// An interface for waiting on a process to exit. 8 pub(crate) trait Wait { 9 /// Get the identifier for this process or diagnostics. id(&self) -> u3210 fn id(&self) -> u32; 11 /// Try waiting for a process to exit in a non-blocking manner. try_wait(&mut self) -> io::Result<Option<ExitStatus>>12 fn try_wait(&mut self) -> io::Result<Option<ExitStatus>>; 13 } 14 15 impl<'a, T: 'a + Wait> Wait for &'a mut T { id(&self) -> u3216 fn id(&self) -> u32 { 17 (**self).id() 18 } 19 try_wait(&mut self) -> io::Result<Option<ExitStatus>>20 fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> { 21 (**self).try_wait() 22 } 23 } 24 25 /// An interface for queueing up an orphaned process so that it can be reaped. 26 pub(crate) trait OrphanQueue<T> { 27 /// Add an orphan to the queue. push_orphan(&self, orphan: T)28 fn push_orphan(&self, orphan: T); 29 /// Attempt to reap every process in the queue, ignoring any errors and 30 /// enqueueing any orphans which have not yet exited. reap_orphans(&self)31 fn reap_orphans(&self); 32 } 33 34 impl<'a, T, O: 'a + OrphanQueue<T>> OrphanQueue<T> for &'a O { push_orphan(&self, orphan: T)35 fn push_orphan(&self, orphan: T) { 36 (**self).push_orphan(orphan); 37 } 38 reap_orphans(&self)39 fn reap_orphans(&self) { 40 (**self).reap_orphans() 41 } 42 } 43 44 /// An atomic implementation of `OrphanQueue`. 45 #[derive(Debug)] 46 pub(crate) struct AtomicOrphanQueue<T> { 47 queue: SegQueue<T>, 48 } 49 50 impl<T> AtomicOrphanQueue<T> { new() -> Self51 pub(crate) fn new() -> Self { 52 Self { 53 queue: SegQueue::new(), 54 } 55 } 56 } 57 58 impl<T: Wait> OrphanQueue<T> for AtomicOrphanQueue<T> { push_orphan(&self, orphan: T)59 fn push_orphan(&self, orphan: T) { 60 self.queue.push(orphan) 61 } 62 reap_orphans(&self)63 fn reap_orphans(&self) { 64 let len = self.queue.len(); 65 66 if len == 0 { 67 return; 68 } 69 70 let mut orphans = Vec::with_capacity(len); 71 while let Ok(mut orphan) = self.queue.pop() { 72 match orphan.try_wait() { 73 Ok(Some(_)) => {}, 74 Err(e) => error!( 75 "leaking orphaned process {} due to try_wait() error: {}", 76 orphan.id(), 77 e, 78 ), 79 80 // Still not done yet, we need to put it back in the queue 81 // when were done draining it, so that we don't get stuck 82 // in an infinite loop here 83 Ok(None) => orphans.push(orphan), 84 } 85 } 86 87 for orphan in orphans { 88 self.queue.push(orphan); 89 } 90 } 91 } 92 93 #[cfg(test)] 94 mod test { 95 use std::cell::Cell; 96 use std::io; 97 use std::os::unix::process::ExitStatusExt; 98 use std::process::ExitStatus; 99 use std::rc::Rc; 100 use super::{AtomicOrphanQueue, OrphanQueue}; 101 use super::Wait; 102 103 struct MockWait { 104 total_waits: Rc<Cell<usize>>, 105 num_wait_until_status: usize, 106 return_err: bool, 107 } 108 109 impl MockWait { new(num_wait_until_status: usize) -> Self110 fn new(num_wait_until_status: usize) -> Self { 111 Self { 112 total_waits: Rc::new(Cell::new(0)), 113 num_wait_until_status, 114 return_err: false, 115 } 116 } 117 with_err() -> Self118 fn with_err() -> Self { 119 Self { 120 total_waits: Rc::new(Cell::new(0)), 121 num_wait_until_status: 0, 122 return_err: true, 123 } 124 } 125 } 126 127 impl Wait for MockWait { id(&self) -> u32128 fn id(&self) -> u32 { 129 42 130 } 131 try_wait(&mut self) -> io::Result<Option<ExitStatus>>132 fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> { 133 let waits = self.total_waits.get(); 134 135 let ret = if self.num_wait_until_status == waits { 136 if self.return_err { 137 Ok(Some(ExitStatus::from_raw(0))) 138 } else { 139 Err(io::Error::new(io::ErrorKind::Other, "mock err")) 140 } 141 } else { 142 Ok(None) 143 }; 144 145 self.total_waits.set(waits + 1); 146 ret 147 } 148 } 149 150 #[test] drain_attempts_a_single_reap_of_all_queued_orphans()151 fn drain_attempts_a_single_reap_of_all_queued_orphans() { 152 let first_orphan = MockWait::new(0); 153 let second_orphan = MockWait::new(1); 154 let third_orphan = MockWait::new(2); 155 let fourth_orphan = MockWait::with_err(); 156 157 let first_waits = first_orphan.total_waits.clone(); 158 let second_waits = second_orphan.total_waits.clone(); 159 let third_waits = third_orphan.total_waits.clone(); 160 let fourth_waits = fourth_orphan.total_waits.clone(); 161 162 let orphanage = AtomicOrphanQueue::new(); 163 orphanage.push_orphan(first_orphan); 164 orphanage.push_orphan(third_orphan); 165 orphanage.push_orphan(second_orphan); 166 orphanage.push_orphan(fourth_orphan); 167 168 assert_eq!(orphanage.queue.len(), 4); 169 170 orphanage.reap_orphans(); 171 assert_eq!(orphanage.queue.len(), 2); 172 assert_eq!(first_waits.get(), 1); 173 assert_eq!(second_waits.get(), 1); 174 assert_eq!(third_waits.get(), 1); 175 assert_eq!(fourth_waits.get(), 1); 176 177 orphanage.reap_orphans(); 178 assert_eq!(orphanage.queue.len(), 1); 179 assert_eq!(first_waits.get(), 1); 180 assert_eq!(second_waits.get(), 2); 181 assert_eq!(third_waits.get(), 2); 182 assert_eq!(fourth_waits.get(), 1); 183 184 orphanage.reap_orphans(); 185 assert_eq!(orphanage.queue.len(), 0); 186 assert_eq!(first_waits.get(), 1); 187 assert_eq!(second_waits.get(), 2); 188 assert_eq!(third_waits.get(), 3); 189 assert_eq!(fourth_waits.get(), 1); 190 191 orphanage.reap_orphans(); // Safe to reap when empty 192 } 193 } 194