1 extern crate crossbeam_queue;
2 
3 use self::crossbeam_queue::SegQueue;
4 use std::io;
5 use std::process::ExitStatus;
6 
7 /// An interface for waiting on a process to exit.
8 pub(crate) trait Wait {
9     /// Get the identifier for this process or diagnostics.
id(&self) -> u3210     fn id(&self) -> u32;
11     /// Try waiting for a process to exit in a non-blocking manner.
try_wait(&mut self) -> io::Result<Option<ExitStatus>>12     fn try_wait(&mut self) -> io::Result<Option<ExitStatus>>;
13 }
14 
15 impl<'a, T: 'a + Wait> Wait for &'a mut T {
id(&self) -> u3216     fn id(&self) -> u32 {
17         (**self).id()
18     }
19 
try_wait(&mut self) -> io::Result<Option<ExitStatus>>20     fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> {
21         (**self).try_wait()
22     }
23 }
24 
25 /// An interface for queueing up an orphaned process so that it can be reaped.
26 pub(crate) trait OrphanQueue<T> {
27     /// Add an orphan to the queue.
push_orphan(&self, orphan: T)28     fn push_orphan(&self, orphan: T);
29     /// Attempt to reap every process in the queue, ignoring any errors and
30     /// enqueueing any orphans which have not yet exited.
reap_orphans(&self)31     fn reap_orphans(&self);
32 }
33 
34 impl<'a, T, O: 'a + OrphanQueue<T>> OrphanQueue<T> for &'a O {
push_orphan(&self, orphan: T)35     fn push_orphan(&self, orphan: T) {
36         (**self).push_orphan(orphan);
37     }
38 
reap_orphans(&self)39     fn reap_orphans(&self) {
40         (**self).reap_orphans()
41     }
42 }
43 
44 /// An atomic implementation of `OrphanQueue`.
45 #[derive(Debug)]
46 pub(crate) struct AtomicOrphanQueue<T> {
47     queue: SegQueue<T>,
48 }
49 
50 impl<T> AtomicOrphanQueue<T> {
new() -> Self51     pub(crate) fn new() -> Self {
52         Self {
53             queue: SegQueue::new(),
54         }
55     }
56 }
57 
58 impl<T: Wait> OrphanQueue<T> for AtomicOrphanQueue<T> {
push_orphan(&self, orphan: T)59     fn push_orphan(&self, orphan: T) {
60         self.queue.push(orphan)
61     }
62 
reap_orphans(&self)63     fn reap_orphans(&self) {
64         let len = self.queue.len();
65 
66         if len == 0 {
67             return;
68         }
69 
70         let mut orphans = Vec::with_capacity(len);
71         while let Ok(mut orphan) = self.queue.pop() {
72             match orphan.try_wait() {
73                 Ok(Some(_)) => {},
74                 Err(e) => error!(
75                     "leaking orphaned process {} due to try_wait() error: {}",
76                     orphan.id(),
77                     e,
78                 ),
79 
80                 // Still not done yet, we need to put it back in the queue
81                 // when were done draining it, so that we don't get stuck
82                 // in an infinite loop here
83                 Ok(None) => orphans.push(orphan),
84             }
85         }
86 
87         for orphan in orphans {
88             self.queue.push(orphan);
89         }
90     }
91 }
92 
93 #[cfg(test)]
94 mod test {
95     use std::cell::Cell;
96     use std::io;
97     use std::os::unix::process::ExitStatusExt;
98     use std::process::ExitStatus;
99     use std::rc::Rc;
100     use super::{AtomicOrphanQueue, OrphanQueue};
101     use super::Wait;
102 
103     struct MockWait {
104         total_waits: Rc<Cell<usize>>,
105         num_wait_until_status: usize,
106         return_err: bool,
107     }
108 
109     impl MockWait {
new(num_wait_until_status: usize) -> Self110         fn new(num_wait_until_status: usize) -> Self {
111             Self {
112                 total_waits: Rc::new(Cell::new(0)),
113                 num_wait_until_status,
114                 return_err: false,
115             }
116         }
117 
with_err() -> Self118         fn with_err() -> Self {
119             Self {
120                 total_waits: Rc::new(Cell::new(0)),
121                 num_wait_until_status: 0,
122                 return_err: true,
123             }
124         }
125     }
126 
127     impl Wait for MockWait {
id(&self) -> u32128         fn id(&self) -> u32 {
129             42
130         }
131 
try_wait(&mut self) -> io::Result<Option<ExitStatus>>132         fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> {
133             let waits = self.total_waits.get();
134 
135             let ret = if self.num_wait_until_status == waits {
136                 if self.return_err {
137                     Ok(Some(ExitStatus::from_raw(0)))
138                 } else {
139                     Err(io::Error::new(io::ErrorKind::Other, "mock err"))
140                 }
141             } else {
142                 Ok(None)
143             };
144 
145             self.total_waits.set(waits + 1);
146             ret
147         }
148     }
149 
150     #[test]
drain_attempts_a_single_reap_of_all_queued_orphans()151     fn drain_attempts_a_single_reap_of_all_queued_orphans() {
152         let first_orphan = MockWait::new(0);
153         let second_orphan = MockWait::new(1);
154         let third_orphan = MockWait::new(2);
155         let fourth_orphan = MockWait::with_err();
156 
157         let first_waits = first_orphan.total_waits.clone();
158         let second_waits = second_orphan.total_waits.clone();
159         let third_waits = third_orphan.total_waits.clone();
160         let fourth_waits = fourth_orphan.total_waits.clone();
161 
162         let orphanage = AtomicOrphanQueue::new();
163         orphanage.push_orphan(first_orphan);
164         orphanage.push_orphan(third_orphan);
165         orphanage.push_orphan(second_orphan);
166         orphanage.push_orphan(fourth_orphan);
167 
168         assert_eq!(orphanage.queue.len(), 4);
169 
170         orphanage.reap_orphans();
171         assert_eq!(orphanage.queue.len(), 2);
172         assert_eq!(first_waits.get(), 1);
173         assert_eq!(second_waits.get(), 1);
174         assert_eq!(third_waits.get(), 1);
175         assert_eq!(fourth_waits.get(), 1);
176 
177         orphanage.reap_orphans();
178         assert_eq!(orphanage.queue.len(), 1);
179         assert_eq!(first_waits.get(), 1);
180         assert_eq!(second_waits.get(), 2);
181         assert_eq!(third_waits.get(), 2);
182         assert_eq!(fourth_waits.get(), 1);
183 
184         orphanage.reap_orphans();
185         assert_eq!(orphanage.queue.len(), 0);
186         assert_eq!(first_waits.get(), 1);
187         assert_eq!(second_waits.get(), 2);
188         assert_eq!(third_waits.get(), 3);
189         assert_eq!(fourth_waits.get(), 1);
190 
191         orphanage.reap_orphans(); // Safe to reap when empty
192     }
193 }
194