1 use {TryRead, TryWrite};
2 use mio::*;
3 use mio::deprecated::{EventLoop, Handler};
4 use mio::deprecated::unix::*;
5 use bytes::{Buf, ByteBuf, SliceBuf};
6 use slab::Slab;
7 use std::path::PathBuf;
8 use std::io::{self, Read};
9 use std::os::unix::io::{AsRawFd, FromRawFd};
10 use tempdir::TempDir;
11
12 const SERVER: Token = Token(10_000_000);
13 const CLIENT: Token = Token(10_000_001);
14
15 struct EchoConn {
16 sock: UnixStream,
17 pipe_fd: Option<PipeReader>,
18 token: Option<Token>,
19 interest: Ready,
20 }
21
22 impl EchoConn {
new(sock: UnixStream) -> EchoConn23 fn new(sock: UnixStream) -> EchoConn {
24 EchoConn {
25 sock: sock,
26 pipe_fd: None,
27 token: None,
28 interest: Ready::hup(),
29 }
30 }
31
writable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()>32 fn writable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
33 let fd = self.pipe_fd.take().unwrap();
34
35 match self.sock.try_write_send_fd(b"x", fd.as_raw_fd()) {
36 Ok(None) => {
37 debug!("client flushing buf; WOULDBLOCK");
38
39 self.pipe_fd = Some(fd);
40 self.interest.insert(Ready::writable());
41 }
42 Ok(Some(r)) => {
43 debug!("CONN : we wrote {} bytes!", r);
44
45 self.interest.insert(Ready::readable());
46 self.interest.remove(Ready::writable());
47 }
48 Err(e) => debug!("not implemented; client err={:?}", e),
49 }
50
51 assert!(self.interest.is_readable() || self.interest.is_writable(), "actual={:?}", self.interest);
52 event_loop.reregister(&self.sock, self.token.unwrap(), self.interest, PollOpt::edge() | PollOpt::oneshot())
53 }
54
readable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()>55 fn readable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
56 let mut buf = ByteBuf::mut_with_capacity(2048);
57
58 match self.sock.try_read_buf(&mut buf) {
59 Ok(None) => {
60 panic!("We just got readable, but were unable to read from the socket?");
61 }
62 Ok(Some(r)) => {
63 debug!("CONN : we read {} bytes!", r);
64 self.interest.remove(Ready::readable());
65 self.interest.insert(Ready::writable());
66 }
67 Err(e) => {
68 debug!("not implemented; client err={:?}", e);
69 self.interest.remove(Ready::readable());
70 }
71
72 };
73
74 // create fd to pass back. Assume that the write will work
75 // without blocking, for simplicity -- we're only testing that
76 // the FD makes it through somehow
77 let (rd, mut wr) = pipe().unwrap();
78 let mut buf = buf.flip();
79 match wr.try_write_buf(&mut buf) {
80 Ok(None) => {
81 panic!("writing to our own pipe blocked :(");
82 }
83 Ok(Some(r)) => {
84 debug!("CONN: we wrote {} bytes to the FD", r);
85 }
86 Err(e) => {
87 panic!("not implemented; client err={:?}", e);
88 }
89 }
90 self.pipe_fd = Some(rd);
91
92 assert!(self.interest.is_readable() || self.interest.is_writable(), "actual={:?}", self.interest);
93 event_loop.reregister(&self.sock, self.token.unwrap(), self.interest, PollOpt::edge() | PollOpt::oneshot())
94 }
95 }
96
97 struct EchoServer {
98 sock: UnixListener,
99 conns: Slab<EchoConn>
100 }
101
102 impl EchoServer {
accept(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()>103 fn accept(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
104 debug!("server accepting socket");
105
106 let sock = self.sock.accept().unwrap();
107 let conn = EchoConn::new(sock);
108 let tok = self.conns.insert(conn);
109
110 // Register the connection
111 self.conns[tok].token = Some(Token(tok));
112 event_loop.register(&self.conns[tok].sock, Token(tok), Ready::readable(), PollOpt::edge() | PollOpt::oneshot())
113 .expect("could not register socket with event loop");
114
115 Ok(())
116 }
117
conn_readable(&mut self, event_loop: &mut EventLoop<Echo>, tok: Token) -> io::Result<()>118 fn conn_readable(&mut self, event_loop: &mut EventLoop<Echo>, tok: Token) -> io::Result<()> {
119 debug!("server conn readable; tok={:?}", tok);
120 self.conn(tok).readable(event_loop)
121 }
122
conn_writable(&mut self, event_loop: &mut EventLoop<Echo>, tok: Token) -> io::Result<()>123 fn conn_writable(&mut self, event_loop: &mut EventLoop<Echo>, tok: Token) -> io::Result<()> {
124 debug!("server conn writable; tok={:?}", tok);
125 self.conn(tok).writable(event_loop)
126 }
127
conn<'a>(&'a mut self, tok: Token) -> &'a mut EchoConn128 fn conn<'a>(&'a mut self, tok: Token) -> &'a mut EchoConn {
129 &mut self.conns[tok.into()]
130 }
131 }
132
133 struct EchoClient {
134 sock: UnixStream,
135 msgs: Vec<&'static str>,
136 tx: SliceBuf<'static>,
137 rx: SliceBuf<'static>,
138 token: Token,
139 interest: Ready,
140 }
141
142
143 // Sends a message and expects to receive the same exact message, one at a time
144 impl EchoClient {
new(sock: UnixStream, tok: Token, mut msgs: Vec<&'static str>) -> EchoClient145 fn new(sock: UnixStream, tok: Token, mut msgs: Vec<&'static str>) -> EchoClient {
146 let curr = msgs.remove(0);
147
148 EchoClient {
149 sock: sock,
150 msgs: msgs,
151 tx: SliceBuf::wrap(curr.as_bytes()),
152 rx: SliceBuf::wrap(curr.as_bytes()),
153 token: tok,
154 interest: Ready::none(),
155 }
156 }
157
readable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()>158 fn readable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
159 debug!("client socket readable");
160
161 let mut pipe: PipeReader;
162 let mut buf = [0; 256];
163
164 match self.sock.read_recv_fd(&mut buf) {
165 Ok((_, None)) => {
166 panic!("Did not receive passed file descriptor");
167 }
168 Ok((r, Some(fd))) => {
169 assert_eq!(r, 1);
170 assert_eq!(b'x', buf[0]);
171 debug!("CLIENT : We read {} bytes!", r);
172 pipe = From::<Io>::from(unsafe { Io::from_raw_fd(fd) });
173 }
174 Err(e) => {
175 panic!("not implemented; client err={:?}", e);
176 }
177 };
178
179 // read the data out of the FD itself
180 let n = match pipe.read(&mut buf) {
181 Ok(r) => {
182 debug!("CLIENT : We read {} bytes from the FD", r);
183 r
184 }
185 Err(e) => {
186 panic!("not implemented, client err={:?}", e);
187 }
188 };
189
190 for &actual in buf[0..n].iter() {
191 let expect = self.rx.read_byte().unwrap();
192 assert!(actual == expect, "actual={}; expect={}", actual, expect);
193 }
194
195 self.interest.remove(Ready::readable());
196
197 if !self.rx.has_remaining() {
198 self.next_msg(event_loop).unwrap();
199 }
200
201 if !self.interest.is_none() {
202 assert!(self.interest.is_readable() || self.interest.is_writable(), "actual={:?}", self.interest);
203 event_loop.reregister(&self.sock, self.token, self.interest, PollOpt::edge() | PollOpt::oneshot())?;
204 }
205
206 Ok(())
207 }
208
writable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()>209 fn writable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
210 debug!("client socket writable");
211
212 match self.sock.try_write_buf(&mut self.tx) {
213 Ok(None) => {
214 debug!("client flushing buf; WOULDBLOCK");
215 self.interest.insert(Ready::writable());
216 }
217 Ok(Some(r)) => {
218 debug!("CLIENT : we wrote {} bytes!", r);
219 self.interest.insert(Ready::readable());
220 self.interest.remove(Ready::writable());
221 }
222 Err(e) => debug!("not implemented; client err={:?}", e)
223 }
224
225 assert!(self.interest.is_readable() || self.interest.is_writable(), "actual={:?}", self.interest);
226 event_loop.reregister(&self.sock, self.token, self.interest, PollOpt::edge() | PollOpt::oneshot())
227 }
228
next_msg(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()>229 fn next_msg(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
230 if self.msgs.is_empty() {
231 event_loop.shutdown();
232 return Ok(());
233 }
234
235 let curr = self.msgs.remove(0);
236
237 debug!("client prepping next message");
238 self.tx = SliceBuf::wrap(curr.as_bytes());
239 self.rx = SliceBuf::wrap(curr.as_bytes());
240
241 self.interest.insert(Ready::writable());
242 event_loop.reregister(&self.sock, self.token, self.interest, PollOpt::edge() | PollOpt::oneshot())
243 }
244 }
245
246 struct Echo {
247 server: EchoServer,
248 client: EchoClient,
249 }
250
251 impl Echo {
new(srv: UnixListener, client: UnixStream, msgs: Vec<&'static str>) -> Echo252 fn new(srv: UnixListener, client: UnixStream, msgs: Vec<&'static str>) -> Echo {
253 Echo {
254 server: EchoServer {
255 sock: srv,
256 conns: Slab::with_capacity(128)
257 },
258 client: EchoClient::new(client, CLIENT, msgs)
259 }
260 }
261 }
262
263 impl Handler for Echo {
264 type Timeout = usize;
265 type Message = ();
266
ready(&mut self, event_loop: &mut EventLoop<Echo>, token: Token, events: Ready)267 fn ready(&mut self, event_loop: &mut EventLoop<Echo>, token: Token, events: Ready) {
268 if events.is_readable() {
269 match token {
270 SERVER => self.server.accept(event_loop).unwrap(),
271 CLIENT => self.client.readable(event_loop).unwrap(),
272 i => self.server.conn_readable(event_loop, i).unwrap()
273 };
274 }
275
276 if events.is_writable() {
277 match token {
278 SERVER => panic!("received writable for token 0"),
279 CLIENT => self.client.writable(event_loop).unwrap(),
280 _ => self.server.conn_writable(event_loop, token).unwrap()
281 };
282 }
283 }
284 }
285
286 #[test]
test_unix_pass_fd()287 pub fn test_unix_pass_fd() {
288 debug!("Starting TEST_UNIX_PASS_FD");
289 let mut event_loop = EventLoop::new().unwrap();
290
291 let tmp_dir = TempDir::new("mio").unwrap();
292 let addr = tmp_dir.path().join(&PathBuf::from("sock"));
293
294 let srv = UnixListener::bind(&addr).unwrap();
295
296 info!("listen for connections");
297 event_loop.register(&srv, SERVER, Ready::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap();
298
299 let sock = UnixStream::connect(&addr).unwrap();
300
301 // Connect to the server
302 event_loop.register(&sock, CLIENT, Ready::writable(), PollOpt::edge() | PollOpt::oneshot()).unwrap();
303
304 // Start the event loop
305 event_loop.run(&mut Echo::new(srv, sock, vec!["foo", "bar"])).unwrap();
306 }
307