1 use {TryRead, TryWrite};
2 use mio::*;
3 use mio::deprecated::{EventLoop, Handler};
4 use mio::deprecated::unix::*;
5 use bytes::{Buf, ByteBuf, MutByteBuf, SliceBuf};
6 use slab::Slab;
7 use std::io;
8 use std::path::PathBuf;
9 use tempdir::TempDir;
10 
11 const SERVER: Token = Token(10_000_000);
12 const CLIENT: Token = Token(10_000_001);
13 
14 struct EchoConn {
15     sock: UnixStream,
16     buf: Option<ByteBuf>,
17     mut_buf: Option<MutByteBuf>,
18     token: Option<Token>,
19     interest: Ready
20 }
21 
22 impl EchoConn {
new(sock: UnixStream) -> EchoConn23     fn new(sock: UnixStream) -> EchoConn {
24         EchoConn {
25             sock: sock,
26             buf: None,
27             mut_buf: Some(ByteBuf::mut_with_capacity(2048)),
28             token: None,
29             interest: Ready::hup()
30         }
31     }
32 
writable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()>33     fn writable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
34         let mut buf = self.buf.take().unwrap();
35 
36         match self.sock.try_write_buf(&mut buf) {
37             Ok(None) => {
38                 debug!("client flushing buf; WOULDBLOCK");
39 
40                 self.buf = Some(buf);
41                 self.interest.insert(Ready::writable());
42             }
43             Ok(Some(r)) => {
44                 debug!("CONN : we wrote {} bytes!", r);
45 
46                 self.mut_buf = Some(buf.flip());
47 
48                 self.interest.insert(Ready::readable());
49                 self.interest.remove(Ready::writable());
50                 match self.sock.shutdown(Shutdown::Write) {
51                    Err(e) => panic!(e),
52                    _ => {},
53                 }
54             }
55             Err(e) => debug!("not implemented; client err={:?}", e),
56         }
57 
58         event_loop.reregister(&self.sock, self.token.unwrap(), self.interest,
59                               PollOpt::edge() | PollOpt::oneshot())
60     }
61 
readable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()>62     fn readable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
63         let mut buf = self.mut_buf.take().unwrap();
64 
65         match self.sock.try_read_buf(&mut buf) {
66             Ok(None) => {
67                 debug!("CONN : spurious read wakeup");
68                 self.mut_buf = Some(buf);
69             }
70             Ok(Some(r)) => {
71                 debug!("CONN : we read {} bytes!", r);
72 
73                 // prepare to provide this to writable
74                 self.buf = Some(buf.flip());
75 
76                 self.interest.remove(Ready::readable());
77                 self.interest.insert(Ready::writable());
78             }
79             Err(e) => {
80                 debug!("not implemented; client err={:?}", e);
81                 self.interest.remove(Ready::readable());
82             }
83 
84         };
85 
86         event_loop.reregister(&self.sock, self.token.unwrap(), self.interest,
87                               PollOpt::edge())
88     }
89 }
90 
91 struct EchoServer {
92     sock: UnixListener,
93     conns: Slab<EchoConn>
94 }
95 
96 impl EchoServer {
accept(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()>97     fn accept(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
98         debug!("server accepting socket");
99 
100         let sock = self.sock.accept().unwrap();
101         let conn = EchoConn::new(sock,);
102         let tok = self.conns.insert(conn);
103 
104         // Register the connection
105         self.conns[tok].token = Some(Token(tok));
106         event_loop.register(&self.conns[tok].sock, Token(tok), Ready::readable(),
107                                 PollOpt::edge() | PollOpt::oneshot())
108             .expect("could not register socket with event loop");
109 
110         Ok(())
111     }
112 
conn_readable(&mut self, event_loop: &mut EventLoop<Echo>, tok: Token) -> io::Result<()>113     fn conn_readable(&mut self, event_loop: &mut EventLoop<Echo>,
114                      tok: Token) -> io::Result<()> {
115         debug!("server conn readable; tok={:?}", tok);
116         self.conn(tok).readable(event_loop)
117     }
118 
conn_writable(&mut self, event_loop: &mut EventLoop<Echo>, tok: Token) -> io::Result<()>119     fn conn_writable(&mut self, event_loop: &mut EventLoop<Echo>,
120                      tok: Token) -> io::Result<()> {
121         debug!("server conn writable; tok={:?}", tok);
122         self.conn(tok).writable(event_loop)
123     }
124 
conn<'a>(&'a mut self, tok: Token) -> &'a mut EchoConn125     fn conn<'a>(&'a mut self, tok: Token) -> &'a mut EchoConn {
126         &mut self.conns[tok.into()]
127     }
128 }
129 
130 struct EchoClient {
131     sock: UnixStream,
132     msgs: Vec<&'static str>,
133     tx: SliceBuf<'static>,
134     rx: SliceBuf<'static>,
135     mut_buf: Option<MutByteBuf>,
136     token: Token,
137     interest: Ready
138 }
139 
140 
141 // Sends a message and expects to receive the same exact message, one at a time
142 impl EchoClient {
new(sock: UnixStream, tok: Token, mut msgs: Vec<&'static str>) -> EchoClient143     fn new(sock: UnixStream, tok: Token,  mut msgs: Vec<&'static str>) -> EchoClient {
144         let curr = msgs.remove(0);
145 
146         EchoClient {
147             sock: sock,
148             msgs: msgs,
149             tx: SliceBuf::wrap(curr.as_bytes()),
150             rx: SliceBuf::wrap(curr.as_bytes()),
151             mut_buf: Some(ByteBuf::mut_with_capacity(2048)),
152             token: tok,
153             interest: Ready::none()
154         }
155     }
156 
readable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()>157     fn readable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
158         debug!("client socket readable");
159 
160         let mut buf = self.mut_buf.take().unwrap();
161 
162         match self.sock.try_read_buf(&mut buf) {
163             Ok(None) => {
164                 debug!("CLIENT : spurious read wakeup");
165                 self.mut_buf = Some(buf);
166             }
167             Ok(Some(r)) => {
168                 if r == 0 {
169                     self.interest.remove(Ready::readable());
170                      event_loop.shutdown();
171                 } else {
172                 debug!("CLIENT : We read {} bytes!", r);
173 
174                 // prepare for reading
175                 let mut buf = buf.flip();
176 
177                 while buf.has_remaining() {
178                     let actual = buf.read_byte().unwrap();
179                     let expect = self.rx.read_byte().unwrap();
180 
181                     assert!(actual == expect, "actual={}; expect={}", actual, expect);
182                 }
183 
184                 self.mut_buf = Some(buf.flip());
185                 if !self.rx.has_remaining() {
186                     self.next_msg(event_loop).unwrap();
187                 }
188                 }
189             }
190             Err(e) => {
191                 panic!("not implemented; client err={:?}", e);
192             }
193         };
194 
195         event_loop.reregister(&self.sock, self.token, self.interest,
196                               PollOpt::edge() | PollOpt::oneshot())
197     }
198 
writable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()>199     fn writable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
200         debug!("client socket writable");
201 
202         match self.sock.try_write_buf(&mut self.tx) {
203             Ok(None) => {
204                 debug!("client flushing buf; WOULDBLOCK");
205                 self.interest.insert(Ready::writable());
206             }
207             Ok(Some(r)) => {
208                 debug!("CLIENT : we wrote {} bytes!", r);
209                 self.interest.insert(Ready::readable());
210                 self.interest.remove(Ready::writable());
211             }
212             Err(e) => debug!("not implemented; client err={:?}", e)
213         }
214 
215         event_loop.reregister(&self.sock, self.token, self.interest,
216                               PollOpt::edge() | PollOpt::oneshot())
217     }
218 
next_msg(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()>219     fn next_msg(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
220         if self.msgs.is_empty() {
221 
222             return Ok(());
223         }
224 
225         let curr = self.msgs.remove(0);
226 
227         debug!("client prepping next message");
228         self.tx = SliceBuf::wrap(curr.as_bytes());
229         self.rx = SliceBuf::wrap(curr.as_bytes());
230 
231         self.interest.insert(Ready::writable());
232         event_loop.reregister(&self.sock, self.token, self.interest,
233                               PollOpt::edge() | PollOpt::oneshot())
234     }
235 }
236 
237 struct Echo {
238     server: EchoServer,
239     client: EchoClient,
240 }
241 
242 impl Echo {
new(srv: UnixListener, client: UnixStream, msgs: Vec<&'static str>) -> Echo243     fn new(srv: UnixListener, client: UnixStream, msgs: Vec<&'static str>) -> Echo {
244         Echo {
245             server: EchoServer {
246                 sock: srv,
247                 conns: Slab::with_capacity(128)
248             },
249             client: EchoClient::new(client, CLIENT, msgs)
250         }
251     }
252 }
253 
254 impl Handler for Echo {
255     type Timeout = usize;
256     type Message = ();
257 
ready(&mut self, event_loop: &mut EventLoop<Echo>, token: Token, events: Ready)258     fn ready(&mut self, event_loop: &mut EventLoop<Echo>, token: Token,
259              events: Ready) {
260         debug!("ready {:?} {:?}", token, events);
261         if events.is_readable() {
262             match token {
263                 SERVER => self.server.accept(event_loop).unwrap(),
264                 CLIENT => self.client.readable(event_loop).unwrap(),
265                 i => self.server.conn_readable(event_loop, i).unwrap()
266             }
267         }
268 
269         if events.is_writable() {
270             match token {
271                 SERVER => panic!("received writable for token 0"),
272                 CLIENT => self.client.writable(event_loop).unwrap(),
273                 _ => self.server.conn_writable(event_loop, token).unwrap()
274             };
275         }
276     }
277 }
278 
279 #[test]
test_echo_server()280 pub fn test_echo_server() {
281     debug!("Starting TEST_ECHO_SERVER");
282     let mut event_loop = EventLoop::new().unwrap();
283 
284     let tmp_dir = TempDir::new("mio").unwrap();
285     let addr = tmp_dir.path().join(&PathBuf::from("sock"));
286 
287     let srv = UnixListener::bind(&addr).unwrap();
288 
289     event_loop.register(&srv, SERVER, Ready::readable(),
290                             PollOpt::edge() | PollOpt::oneshot()).unwrap();
291 
292     let sock = UnixStream::connect(&addr).unwrap();
293 
294     // Connect to the server
295     event_loop.register(&sock, CLIENT, Ready::writable(),
296                         PollOpt::edge() | PollOpt::oneshot()).unwrap();
297 
298     // Start the event loop
299     event_loop.run(&mut Echo::new(srv, sock, vec!["foo", "bar"])).unwrap();
300 }
301