1 use {TryRead, TryWrite};
2 use std::mem;
3 use mio::*;
4 use std::io;
5 use mio::deprecated::{EventLoop, Handler};
6 use mio::deprecated::unix::{PipeReader, PipeWriter};
7 use std::process::{Command, Stdio, Child};
8
9
10 struct SubprocessClient {
11 stdin: Option<PipeWriter>,
12 stdout: Option<PipeReader>,
13 stderr: Option<PipeReader>,
14 stdin_token : Token,
15 stdout_token : Token,
16 stderr_token : Token,
17 output : Vec<u8>,
18 output_stderr : Vec<u8>,
19 input : Vec<u8>,
20 input_offset : usize,
21 buf : [u8; 65536],
22 }
23
24
25 // Sends a message and expects to receive the same exact message, one at a time
26 impl SubprocessClient {
new(stdin: Option<PipeWriter>, stdout : Option<PipeReader>, stderr : Option<PipeReader>, data : &[u8]) -> SubprocessClient27 fn new(stdin: Option<PipeWriter>, stdout : Option<PipeReader>, stderr : Option<PipeReader>, data : &[u8]) -> SubprocessClient {
28 SubprocessClient {
29 stdin: stdin,
30 stdout: stdout,
31 stderr: stderr,
32 stdin_token : Token(0),
33 stdout_token : Token(1),
34 stderr_token : Token(2),
35 output : Vec::<u8>::new(),
36 output_stderr : Vec::<u8>::new(),
37 buf : [0; 65536],
38 input : data.to_vec(),
39 input_offset : 0,
40 }
41 }
42
readable(&mut self, event_loop: &mut EventLoop<SubprocessClient>) -> io::Result<()>43 fn readable(&mut self, event_loop: &mut EventLoop<SubprocessClient>) -> io::Result<()> {
44 let mut eof = false;
45 match self.stdout {
46 None => unreachable!(),
47 Some (ref mut stdout) => match stdout.try_read(&mut self.buf[..]) {
48 Ok(None) => {
49 }
50 Ok(Some(r)) => {
51 if r == 0 {
52 eof = true;
53 } else {
54 self.output.extend(&self.buf[0..r]);
55 }
56 }
57 Err(e) => {
58 return Err(e);
59 }
60 }
61 };
62 if eof {
63 drop(self.stdout.take());
64 match self.stderr {
65 None => event_loop.shutdown(),
66 Some(_) => {},
67 }
68 }
69 return Ok(());
70 }
71
readable_stderr(&mut self, event_loop: &mut EventLoop<SubprocessClient>) -> io::Result<()>72 fn readable_stderr(&mut self, event_loop: &mut EventLoop<SubprocessClient>) -> io::Result<()> {
73 let mut eof = false;
74 match self.stderr {
75 None => unreachable!(),
76 Some(ref mut stderr) => match stderr.try_read(&mut self.buf[..]) {
77 Ok(None) => {
78 }
79 Ok(Some(r)) => {
80 if r == 0 {
81 eof = true;
82 } else {
83 self.output_stderr.extend(&self.buf[0..r]);
84 }
85 }
86 Err(e) => {
87 return Err(e);
88 }
89 }
90 };
91 if eof {
92 drop(self.stderr.take());
93 match self.stdout {
94 None => event_loop.shutdown(),
95 Some(_) => {},
96 }
97 }
98 return Ok(());
99 }
100
writable(&mut self, event_loop: &mut EventLoop<SubprocessClient>) -> io::Result<()>101 fn writable(&mut self, event_loop: &mut EventLoop<SubprocessClient>) -> io::Result<()> {
102 let mut ok = true;
103 match self.stdin {
104 None => unreachable!(),
105 Some(ref mut stdin) => match stdin.try_write(&(&self.input)[self.input_offset..]) {
106 Ok(None) => {
107 },
108 Ok(Some(r)) => {
109 if r == 0 {
110 ok = false;
111 } else {
112 self.input_offset += r;
113 }
114 },
115 Err(_) => {
116 ok = false;
117 },
118 }
119 }
120 if self.input_offset == self.input.len() || !ok {
121 drop(self.stdin.take());
122 match self.stderr {
123 None => match self.stdout {
124 None => event_loop.shutdown(),
125 Some(_) => {},
126 },
127 Some(_) => {},
128 }
129 }
130 return Ok(());
131 }
132
133 }
134
135 impl Handler for SubprocessClient {
136 type Timeout = usize;
137 type Message = ();
138
ready(&mut self, event_loop: &mut EventLoop<SubprocessClient>, token: Token, _: Ready)139 fn ready(&mut self, event_loop: &mut EventLoop<SubprocessClient>, token: Token,
140 _: Ready) {
141 if token == self.stderr_token {
142 let _x = self.readable_stderr(event_loop);
143 } else {
144 let _x = self.readable(event_loop);
145 }
146 if token == self.stdin_token {
147 let _y = self.writable(event_loop);
148 }
149 }
150 }
151
152
153
154
155 const TEST_DATA : [u8; 1024 * 4096] = [42; 1024 * 4096];
subprocess_communicate(mut process : Child, input : &[u8]) -> (Vec<u8>, Vec<u8>)156 pub fn subprocess_communicate(mut process : Child, input : &[u8]) -> (Vec<u8>, Vec<u8>) {
157 let mut event_loop = EventLoop::<SubprocessClient>::new().unwrap();
158 let stdin : Option<PipeWriter>;
159 let stdin_exists : bool;
160 match process.stdin {
161 None => stdin_exists = false,
162 Some(_) => stdin_exists = true,
163 }
164 if stdin_exists {
165 match PipeWriter::from_stdin(process.stdin.take().unwrap()) {
166 Err(e) => panic!(e),
167 Ok(pipe) => stdin = Some(pipe),
168 }
169 } else {
170 stdin = None;
171 }
172 let stdout_exists : bool;
173 let stdout : Option<PipeReader>;
174 match process.stdout {
175 None => stdout_exists = false,
176 Some(_) => stdout_exists = true,
177 }
178 if stdout_exists {
179 match PipeReader::from_stdout(process.stdout.take().unwrap()) {
180 Err(e) => panic!(e),
181 Ok(pipe) => stdout = Some(pipe),
182 }
183 } else {
184 stdout = None;
185 }
186 let stderr_exists : bool;
187 let stderr : Option<PipeReader>;
188 match process.stderr {
189 None => stderr_exists = false,
190 Some(_) => stderr_exists = true,
191 }
192 if stderr_exists {
193 match PipeReader::from_stderr(process.stderr.take().unwrap()) {
194 Err(e) => panic!(e),
195 Ok(pipe) => stderr = Some(pipe),
196 }
197 } else {
198 stderr = None
199 }
200
201 let mut subprocess = SubprocessClient::new(stdin,
202 stdout,
203 stderr,
204 input);
205 match subprocess.stdout {
206 Some(ref sub_stdout) => event_loop.register(sub_stdout, subprocess.stdout_token, Ready::readable(),
207 PollOpt::level()).unwrap(),
208 None => {},
209 }
210
211 match subprocess.stderr {
212 Some(ref sub_stderr) => event_loop.register(sub_stderr, subprocess.stderr_token, Ready::readable(),
213 PollOpt::level()).unwrap(),
214 None => {},
215 }
216
217 // Connect to the server
218 match subprocess.stdin {
219 Some (ref sub_stdin) => event_loop.register(sub_stdin, subprocess.stdin_token, Ready::writable(),
220 PollOpt::level()).unwrap(),
221 None => {},
222 }
223
224 // Start the event loop
225 event_loop.run(&mut subprocess).unwrap();
226 let _ = process.wait();
227
228 let ret_stdout = mem::replace(&mut subprocess.output, Vec::<u8>::new());
229 let ret_stderr = mem::replace(&mut subprocess.output_stderr, Vec::<u8>::new());
230 return (ret_stdout, ret_stderr);
231 }
232
233 #[test]
test_subprocess_pipe()234 fn test_subprocess_pipe() {
235 let process =
236 Command::new("/bin/cat")
237 .stdin(Stdio::piped())
238 .stdout(Stdio::piped())
239 .stderr(Stdio::piped())
240 .spawn().unwrap();
241 let (ret_stdout, ret_stderr) = subprocess_communicate(process, &TEST_DATA[..]);
242 assert_eq!(TEST_DATA.len(), ret_stdout.len());
243 assert_eq!(0usize, ret_stderr.len());
244 let mut i : usize = 0;
245 for item in TEST_DATA.iter() {
246 assert_eq!(*item, ret_stdout[i]);
247 i += 1;
248 }
249 }
250