1 use {TryRead, TryWrite};
2 use mio::*;
3 use mio::deprecated::{EventLoop, Handler};
4 use mio::deprecated::unix::*;
5 use bytes::{Buf, ByteBuf, SliceBuf};
6 use slab::Slab;
7 use std::path::PathBuf;
8 use std::io::{self, Read};
9 use std::os::unix::io::{AsRawFd, FromRawFd};
10 use tempdir::TempDir;
11 
12 const SERVER: Token = Token(10_000_000);
13 const CLIENT: Token = Token(10_000_001);
14 
15 struct EchoConn {
16     sock: UnixStream,
17     pipe_fd: Option<PipeReader>,
18     token: Option<Token>,
19     interest: Ready,
20 }
21 
22 impl EchoConn {
new(sock: UnixStream) -> EchoConn23     fn new(sock: UnixStream) -> EchoConn {
24         EchoConn {
25             sock: sock,
26             pipe_fd: None,
27             token: None,
28             interest: Ready::hup(),
29         }
30     }
31 
writable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()>32     fn writable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
33         let fd = self.pipe_fd.take().unwrap();
34 
35         match self.sock.try_write_send_fd(b"x", fd.as_raw_fd()) {
36             Ok(None) => {
37                 debug!("client flushing buf; WOULDBLOCK");
38 
39                 self.pipe_fd = Some(fd);
40                 self.interest.insert(Ready::writable());
41             }
42             Ok(Some(r)) => {
43                 debug!("CONN : we wrote {} bytes!", r);
44 
45                 self.interest.insert(Ready::readable());
46                 self.interest.remove(Ready::writable());
47             }
48             Err(e) => debug!("not implemented; client err={:?}", e),
49         }
50 
51         assert!(self.interest.is_readable() || self.interest.is_writable(), "actual={:?}", self.interest);
52         event_loop.reregister(&self.sock, self.token.unwrap(), self.interest, PollOpt::edge() | PollOpt::oneshot())
53     }
54 
readable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()>55     fn readable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
56         let mut buf = ByteBuf::mut_with_capacity(2048);
57 
58         match self.sock.try_read_buf(&mut buf) {
59             Ok(None) => {
60                 panic!("We just got readable, but were unable to read from the socket?");
61             }
62             Ok(Some(r)) => {
63                 debug!("CONN : we read {} bytes!", r);
64                 self.interest.remove(Ready::readable());
65                 self.interest.insert(Ready::writable());
66             }
67             Err(e) => {
68                 debug!("not implemented; client err={:?}", e);
69                 self.interest.remove(Ready::readable());
70             }
71 
72         };
73 
74         // create fd to pass back. Assume that the write will work
75         // without blocking, for simplicity -- we're only testing that
76         // the FD makes it through somehow
77         let (rd, mut wr) = pipe().unwrap();
78         let mut buf = buf.flip();
79         match wr.try_write_buf(&mut buf) {
80             Ok(None) => {
81                 panic!("writing to our own pipe blocked :(");
82             }
83             Ok(Some(r)) => {
84                 debug!("CONN: we wrote {} bytes to the FD", r);
85             }
86             Err(e) => {
87                 panic!("not implemented; client err={:?}", e);
88             }
89         }
90         self.pipe_fd = Some(rd);
91 
92         assert!(self.interest.is_readable() || self.interest.is_writable(), "actual={:?}", self.interest);
93         event_loop.reregister(&self.sock, self.token.unwrap(), self.interest, PollOpt::edge() | PollOpt::oneshot())
94     }
95 }
96 
97 struct EchoServer {
98     sock: UnixListener,
99     conns: Slab<EchoConn>
100 }
101 
102 impl EchoServer {
accept(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()>103     fn accept(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
104         debug!("server accepting socket");
105 
106         let sock = self.sock.accept().unwrap();
107         let conn = EchoConn::new(sock);
108         let tok = self.conns.insert(conn);
109 
110         // Register the connection
111         self.conns[tok].token = Some(Token(tok));
112         event_loop.register(&self.conns[tok].sock, Token(tok), Ready::readable(), PollOpt::edge() | PollOpt::oneshot())
113             .expect("could not register socket with event loop");
114 
115         Ok(())
116     }
117 
conn_readable(&mut self, event_loop: &mut EventLoop<Echo>, tok: Token) -> io::Result<()>118     fn conn_readable(&mut self, event_loop: &mut EventLoop<Echo>, tok: Token) -> io::Result<()> {
119         debug!("server conn readable; tok={:?}", tok);
120         self.conn(tok).readable(event_loop)
121     }
122 
conn_writable(&mut self, event_loop: &mut EventLoop<Echo>, tok: Token) -> io::Result<()>123     fn conn_writable(&mut self, event_loop: &mut EventLoop<Echo>, tok: Token) -> io::Result<()> {
124         debug!("server conn writable; tok={:?}", tok);
125         self.conn(tok).writable(event_loop)
126     }
127 
conn<'a>(&'a mut self, tok: Token) -> &'a mut EchoConn128     fn conn<'a>(&'a mut self, tok: Token) -> &'a mut EchoConn {
129         &mut self.conns[tok.into()]
130     }
131 }
132 
133 struct EchoClient {
134     sock: UnixStream,
135     msgs: Vec<&'static str>,
136     tx: SliceBuf<'static>,
137     rx: SliceBuf<'static>,
138     token: Token,
139     interest: Ready,
140 }
141 
142 
143 // Sends a message and expects to receive the same exact message, one at a time
144 impl EchoClient {
new(sock: UnixStream, tok: Token, mut msgs: Vec<&'static str>) -> EchoClient145     fn new(sock: UnixStream, tok: Token,  mut msgs: Vec<&'static str>) -> EchoClient {
146         let curr = msgs.remove(0);
147 
148         EchoClient {
149             sock: sock,
150             msgs: msgs,
151             tx: SliceBuf::wrap(curr.as_bytes()),
152             rx: SliceBuf::wrap(curr.as_bytes()),
153             token: tok,
154             interest: Ready::none(),
155         }
156     }
157 
readable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()>158     fn readable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
159         debug!("client socket readable");
160 
161         let mut pipe: PipeReader;
162         let mut buf = [0; 256];
163 
164         match self.sock.read_recv_fd(&mut buf) {
165             Ok((_, None)) => {
166                 panic!("Did not receive passed file descriptor");
167             }
168             Ok((r, Some(fd))) => {
169                 assert_eq!(r, 1);
170                 assert_eq!(b'x', buf[0]);
171                 debug!("CLIENT : We read {} bytes!", r);
172                 pipe = From::<Io>::from(unsafe { Io::from_raw_fd(fd) });
173             }
174             Err(e) => {
175                 panic!("not implemented; client err={:?}", e);
176             }
177         };
178 
179         // read the data out of the FD itself
180         let n = match pipe.read(&mut buf) {
181             Ok(r) => {
182                 debug!("CLIENT : We read {} bytes from the FD", r);
183                 r
184             }
185             Err(e) => {
186                 panic!("not implemented, client err={:?}", e);
187             }
188         };
189 
190         for &actual in buf[0..n].iter() {
191             let expect = self.rx.read_byte().unwrap();
192             assert!(actual == expect, "actual={}; expect={}", actual, expect);
193         }
194 
195         self.interest.remove(Ready::readable());
196 
197         if !self.rx.has_remaining() {
198             self.next_msg(event_loop).unwrap();
199         }
200 
201         if !self.interest.is_none() {
202             assert!(self.interest.is_readable() || self.interest.is_writable(), "actual={:?}", self.interest);
203             event_loop.reregister(&self.sock, self.token, self.interest, PollOpt::edge() | PollOpt::oneshot())?;
204         }
205 
206         Ok(())
207     }
208 
writable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()>209     fn writable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
210         debug!("client socket writable");
211 
212         match self.sock.try_write_buf(&mut self.tx) {
213             Ok(None) => {
214                 debug!("client flushing buf; WOULDBLOCK");
215                 self.interest.insert(Ready::writable());
216             }
217             Ok(Some(r)) => {
218                 debug!("CLIENT : we wrote {} bytes!", r);
219                 self.interest.insert(Ready::readable());
220                 self.interest.remove(Ready::writable());
221             }
222             Err(e) => debug!("not implemented; client err={:?}", e)
223         }
224 
225         assert!(self.interest.is_readable() || self.interest.is_writable(), "actual={:?}", self.interest);
226         event_loop.reregister(&self.sock, self.token, self.interest, PollOpt::edge() | PollOpt::oneshot())
227     }
228 
next_msg(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()>229     fn next_msg(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
230         if self.msgs.is_empty() {
231             event_loop.shutdown();
232             return Ok(());
233         }
234 
235         let curr = self.msgs.remove(0);
236 
237         debug!("client prepping next message");
238         self.tx = SliceBuf::wrap(curr.as_bytes());
239         self.rx = SliceBuf::wrap(curr.as_bytes());
240 
241         self.interest.insert(Ready::writable());
242         event_loop.reregister(&self.sock, self.token, self.interest, PollOpt::edge() | PollOpt::oneshot())
243     }
244 }
245 
246 struct Echo {
247     server: EchoServer,
248     client: EchoClient,
249 }
250 
251 impl Echo {
new(srv: UnixListener, client: UnixStream, msgs: Vec<&'static str>) -> Echo252     fn new(srv: UnixListener, client: UnixStream, msgs: Vec<&'static str>) -> Echo {
253         Echo {
254             server: EchoServer {
255                 sock: srv,
256                 conns: Slab::with_capacity(128)
257             },
258             client: EchoClient::new(client, CLIENT, msgs)
259         }
260     }
261 }
262 
263 impl Handler for Echo {
264     type Timeout = usize;
265     type Message = ();
266 
ready(&mut self, event_loop: &mut EventLoop<Echo>, token: Token, events: Ready)267     fn ready(&mut self, event_loop: &mut EventLoop<Echo>, token: Token, events: Ready) {
268         if events.is_readable() {
269             match token {
270                 SERVER => self.server.accept(event_loop).unwrap(),
271                 CLIENT => self.client.readable(event_loop).unwrap(),
272                 i => self.server.conn_readable(event_loop, i).unwrap()
273             };
274         }
275 
276         if events.is_writable() {
277             match token {
278                 SERVER => panic!("received writable for token 0"),
279                 CLIENT => self.client.writable(event_loop).unwrap(),
280                 _ => self.server.conn_writable(event_loop, token).unwrap()
281             };
282         }
283     }
284 }
285 
286 #[test]
test_unix_pass_fd()287 pub fn test_unix_pass_fd() {
288     debug!("Starting TEST_UNIX_PASS_FD");
289     let mut event_loop = EventLoop::new().unwrap();
290 
291     let tmp_dir = TempDir::new("mio").unwrap();
292     let addr = tmp_dir.path().join(&PathBuf::from("sock"));
293 
294     let srv = UnixListener::bind(&addr).unwrap();
295 
296     info!("listen for connections");
297     event_loop.register(&srv, SERVER, Ready::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap();
298 
299     let sock = UnixStream::connect(&addr).unwrap();
300 
301     // Connect to the server
302     event_loop.register(&sock, CLIENT, Ready::writable(), PollOpt::edge() | PollOpt::oneshot()).unwrap();
303 
304     // Start the event loop
305     event_loop.run(&mut Echo::new(srv, sock, vec!["foo", "bar"])).unwrap();
306 }
307