1 use {TryRead, TryWrite};
2 use std::mem;
3 use mio::*;
4 use std::io;
5 use mio::deprecated::{EventLoop, Handler};
6 use mio::deprecated::unix::{PipeReader, PipeWriter};
7 use std::process::{Command, Stdio, Child};
8 
9 
10 struct SubprocessClient {
11     stdin: Option<PipeWriter>,
12     stdout: Option<PipeReader>,
13     stderr: Option<PipeReader>,
14     stdin_token : Token,
15     stdout_token : Token,
16     stderr_token : Token,
17     output : Vec<u8>,
18     output_stderr : Vec<u8>,
19     input : Vec<u8>,
20     input_offset : usize,
21     buf : [u8; 65536],
22 }
23 
24 
25 // Sends a message and expects to receive the same exact message, one at a time
26 impl SubprocessClient {
new(stdin: Option<PipeWriter>, stdout : Option<PipeReader>, stderr : Option<PipeReader>, data : &[u8]) -> SubprocessClient27     fn new(stdin: Option<PipeWriter>, stdout : Option<PipeReader>, stderr : Option<PipeReader>, data : &[u8]) -> SubprocessClient {
28         SubprocessClient {
29             stdin: stdin,
30             stdout: stdout,
31             stderr: stderr,
32             stdin_token : Token(0),
33             stdout_token : Token(1),
34             stderr_token : Token(2),
35             output : Vec::<u8>::new(),
36             output_stderr : Vec::<u8>::new(),
37             buf : [0; 65536],
38             input : data.to_vec(),
39             input_offset : 0,
40         }
41     }
42 
readable(&mut self, event_loop: &mut EventLoop<SubprocessClient>) -> io::Result<()>43     fn readable(&mut self, event_loop: &mut EventLoop<SubprocessClient>) -> io::Result<()> {
44         let mut eof = false;
45         match self.stdout {
46             None => unreachable!(),
47             Some (ref mut stdout) => match stdout.try_read(&mut self.buf[..]) {
48                 Ok(None) => {
49                 }
50                 Ok(Some(r)) => {
51                     if r == 0 {
52                         eof = true;
53                     } else {
54                          self.output.extend(&self.buf[0..r]);
55                     }
56                 }
57                 Err(e) => {
58                     return Err(e);
59                 }
60             }
61         };
62         if eof {
63             drop(self.stdout.take());
64             match self.stderr {
65                 None => event_loop.shutdown(),
66                 Some(_) => {},
67             }
68         }
69         return Ok(());
70     }
71 
readable_stderr(&mut self, event_loop: &mut EventLoop<SubprocessClient>) -> io::Result<()>72     fn readable_stderr(&mut self, event_loop: &mut EventLoop<SubprocessClient>) -> io::Result<()> {
73         let mut eof = false;
74         match self.stderr {
75             None => unreachable!(),
76             Some(ref mut stderr) => match stderr.try_read(&mut self.buf[..]) {
77                 Ok(None) => {
78                 }
79                 Ok(Some(r)) => {
80                     if r == 0 {
81                         eof = true;
82                     } else {
83                         self.output_stderr.extend(&self.buf[0..r]);
84                     }
85                 }
86                 Err(e) => {
87                     return Err(e);
88                 }
89             }
90         };
91         if eof {
92                         drop(self.stderr.take());
93                         match self.stdout {
94                             None => event_loop.shutdown(),
95                             Some(_) => {},
96                         }
97         }
98         return Ok(());
99     }
100 
writable(&mut self, event_loop: &mut EventLoop<SubprocessClient>) -> io::Result<()>101     fn writable(&mut self, event_loop: &mut EventLoop<SubprocessClient>) -> io::Result<()> {
102         let mut ok = true;
103         match self.stdin {
104             None => unreachable!(),
105             Some(ref mut stdin) => match stdin.try_write(&(&self.input)[self.input_offset..]) {
106                 Ok(None) => {
107                 },
108                 Ok(Some(r)) => {
109                     if r == 0 {
110                         ok = false;
111                     } else {
112                         self.input_offset += r;
113                     }
114                 },
115                 Err(_) => {
116                     ok = false;
117                 },
118             }
119         }
120         if self.input_offset == self.input.len() || !ok {
121             drop(self.stdin.take());
122             match self.stderr {
123                 None => match self.stdout {
124                             None => event_loop.shutdown(),
125                             Some(_) => {},
126                 },
127                 Some(_) => {},
128             }
129         }
130         return Ok(());
131     }
132 
133 }
134 
135 impl Handler for SubprocessClient {
136     type Timeout = usize;
137     type Message = ();
138 
ready(&mut self, event_loop: &mut EventLoop<SubprocessClient>, token: Token, _: Ready)139     fn ready(&mut self, event_loop: &mut EventLoop<SubprocessClient>, token: Token,
140              _: Ready) {
141         if token == self.stderr_token {
142             let _x = self.readable_stderr(event_loop);
143         } else {
144             let _x = self.readable(event_loop);
145         }
146         if token == self.stdin_token {
147             let _y = self.writable(event_loop);
148         }
149     }
150 }
151 
152 
153 
154 
155 const TEST_DATA : [u8; 1024 * 4096] = [42; 1024 * 4096];
subprocess_communicate(mut process : Child, input : &[u8]) -> (Vec<u8>, Vec<u8>)156 pub fn subprocess_communicate(mut process : Child, input : &[u8]) -> (Vec<u8>, Vec<u8>) {
157     let mut event_loop = EventLoop::<SubprocessClient>::new().unwrap();
158     let stdin : Option<PipeWriter>;
159     let stdin_exists : bool;
160     match process.stdin {
161       None => stdin_exists = false,
162       Some(_) => stdin_exists = true,
163     }
164     if stdin_exists {
165         match PipeWriter::from_stdin(process.stdin.take().unwrap()) {
166             Err(e) => panic!(e),
167             Ok(pipe) => stdin = Some(pipe),
168         }
169     } else {
170         stdin = None;
171     }
172     let stdout_exists : bool;
173     let stdout : Option<PipeReader>;
174     match process.stdout {
175       None => stdout_exists = false,
176       Some(_) => stdout_exists = true,
177     }
178     if stdout_exists {
179         match PipeReader::from_stdout(process.stdout.take().unwrap()) {
180             Err(e) => panic!(e),
181             Ok(pipe) => stdout = Some(pipe),
182         }
183     } else {
184         stdout = None;
185     }
186     let stderr_exists : bool;
187     let stderr : Option<PipeReader>;
188     match process.stderr {
189       None => stderr_exists = false,
190       Some(_) => stderr_exists = true,
191     }
192     if stderr_exists {
193         match PipeReader::from_stderr(process.stderr.take().unwrap()) {
194               Err(e) => panic!(e),
195               Ok(pipe) => stderr = Some(pipe),
196         }
197     } else {
198         stderr = None
199     }
200 
201     let mut subprocess = SubprocessClient::new(stdin,
202                                                stdout,
203                                                stderr,
204                                                input);
205     match subprocess.stdout {
206        Some(ref sub_stdout) => event_loop.register(sub_stdout, subprocess.stdout_token, Ready::readable(),
207                                                    PollOpt::level()).unwrap(),
208        None => {},
209     }
210 
211     match subprocess.stderr {
212         Some(ref sub_stderr) => event_loop.register(sub_stderr, subprocess.stderr_token, Ready::readable(),
213                         PollOpt::level()).unwrap(),
214         None => {},
215     }
216 
217     // Connect to the server
218     match subprocess.stdin {
219         Some (ref sub_stdin) => event_loop.register(sub_stdin, subprocess.stdin_token, Ready::writable(),
220                         PollOpt::level()).unwrap(),
221          None => {},
222     }
223 
224     // Start the event loop
225     event_loop.run(&mut subprocess).unwrap();
226     let _ = process.wait();
227 
228     let ret_stdout = mem::replace(&mut subprocess.output, Vec::<u8>::new());
229     let ret_stderr = mem::replace(&mut subprocess.output_stderr, Vec::<u8>::new());
230     return (ret_stdout, ret_stderr);
231 }
232 
233 #[test]
test_subprocess_pipe()234 fn test_subprocess_pipe() {
235     let process =
236            Command::new("/bin/cat")
237            .stdin(Stdio::piped())
238            .stdout(Stdio::piped())
239            .stderr(Stdio::piped())
240            .spawn().unwrap();
241      let (ret_stdout, ret_stderr) = subprocess_communicate(process, &TEST_DATA[..]);
242      assert_eq!(TEST_DATA.len(), ret_stdout.len());
243      assert_eq!(0usize, ret_stderr.len());
244      let mut i : usize = 0;
245      for item in TEST_DATA.iter() {
246          assert_eq!(*item, ret_stdout[i]);
247          i += 1;
248      }
249 }
250