1 use {TryRead, TryWrite};
2 use mio::*;
3 use mio::deprecated::{EventLoop, Handler};
4 use mio::deprecated::unix::*;
5 use bytes::{Buf, ByteBuf, MutByteBuf, SliceBuf};
6 use slab::Slab;
7 use std::io;
8 use std::path::PathBuf;
9 use tempdir::TempDir;
10
11 const SERVER: Token = Token(10_000_000);
12 const CLIENT: Token = Token(10_000_001);
13
14 struct EchoConn {
15 sock: UnixStream,
16 buf: Option<ByteBuf>,
17 mut_buf: Option<MutByteBuf>,
18 token: Option<Token>,
19 interest: Ready
20 }
21
22 impl EchoConn {
new(sock: UnixStream) -> EchoConn23 fn new(sock: UnixStream) -> EchoConn {
24 EchoConn {
25 sock: sock,
26 buf: None,
27 mut_buf: Some(ByteBuf::mut_with_capacity(2048)),
28 token: None,
29 interest: Ready::hup()
30 }
31 }
32
writable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()>33 fn writable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
34 let mut buf = self.buf.take().unwrap();
35
36 match self.sock.try_write_buf(&mut buf) {
37 Ok(None) => {
38 debug!("client flushing buf; WOULDBLOCK");
39
40 self.buf = Some(buf);
41 self.interest.insert(Ready::writable());
42 }
43 Ok(Some(r)) => {
44 debug!("CONN : we wrote {} bytes!", r);
45
46 self.mut_buf = Some(buf.flip());
47
48 self.interest.insert(Ready::readable());
49 self.interest.remove(Ready::writable());
50 match self.sock.shutdown(Shutdown::Write) {
51 Err(e) => panic!(e),
52 _ => {},
53 }
54 }
55 Err(e) => debug!("not implemented; client err={:?}", e),
56 }
57
58 event_loop.reregister(&self.sock, self.token.unwrap(), self.interest,
59 PollOpt::edge() | PollOpt::oneshot())
60 }
61
readable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()>62 fn readable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
63 let mut buf = self.mut_buf.take().unwrap();
64
65 match self.sock.try_read_buf(&mut buf) {
66 Ok(None) => {
67 debug!("CONN : spurious read wakeup");
68 self.mut_buf = Some(buf);
69 }
70 Ok(Some(r)) => {
71 debug!("CONN : we read {} bytes!", r);
72
73 // prepare to provide this to writable
74 self.buf = Some(buf.flip());
75
76 self.interest.remove(Ready::readable());
77 self.interest.insert(Ready::writable());
78 }
79 Err(e) => {
80 debug!("not implemented; client err={:?}", e);
81 self.interest.remove(Ready::readable());
82 }
83
84 };
85
86 event_loop.reregister(&self.sock, self.token.unwrap(), self.interest,
87 PollOpt::edge())
88 }
89 }
90
91 struct EchoServer {
92 sock: UnixListener,
93 conns: Slab<EchoConn>
94 }
95
96 impl EchoServer {
accept(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()>97 fn accept(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
98 debug!("server accepting socket");
99
100 let sock = self.sock.accept().unwrap();
101 let conn = EchoConn::new(sock,);
102 let tok = self.conns.insert(conn);
103
104 // Register the connection
105 self.conns[tok].token = Some(Token(tok));
106 event_loop.register(&self.conns[tok].sock, Token(tok), Ready::readable(),
107 PollOpt::edge() | PollOpt::oneshot())
108 .expect("could not register socket with event loop");
109
110 Ok(())
111 }
112
conn_readable(&mut self, event_loop: &mut EventLoop<Echo>, tok: Token) -> io::Result<()>113 fn conn_readable(&mut self, event_loop: &mut EventLoop<Echo>,
114 tok: Token) -> io::Result<()> {
115 debug!("server conn readable; tok={:?}", tok);
116 self.conn(tok).readable(event_loop)
117 }
118
conn_writable(&mut self, event_loop: &mut EventLoop<Echo>, tok: Token) -> io::Result<()>119 fn conn_writable(&mut self, event_loop: &mut EventLoop<Echo>,
120 tok: Token) -> io::Result<()> {
121 debug!("server conn writable; tok={:?}", tok);
122 self.conn(tok).writable(event_loop)
123 }
124
conn<'a>(&'a mut self, tok: Token) -> &'a mut EchoConn125 fn conn<'a>(&'a mut self, tok: Token) -> &'a mut EchoConn {
126 &mut self.conns[tok.into()]
127 }
128 }
129
130 struct EchoClient {
131 sock: UnixStream,
132 msgs: Vec<&'static str>,
133 tx: SliceBuf<'static>,
134 rx: SliceBuf<'static>,
135 mut_buf: Option<MutByteBuf>,
136 token: Token,
137 interest: Ready
138 }
139
140
141 // Sends a message and expects to receive the same exact message, one at a time
142 impl EchoClient {
new(sock: UnixStream, tok: Token, mut msgs: Vec<&'static str>) -> EchoClient143 fn new(sock: UnixStream, tok: Token, mut msgs: Vec<&'static str>) -> EchoClient {
144 let curr = msgs.remove(0);
145
146 EchoClient {
147 sock: sock,
148 msgs: msgs,
149 tx: SliceBuf::wrap(curr.as_bytes()),
150 rx: SliceBuf::wrap(curr.as_bytes()),
151 mut_buf: Some(ByteBuf::mut_with_capacity(2048)),
152 token: tok,
153 interest: Ready::none()
154 }
155 }
156
readable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()>157 fn readable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
158 debug!("client socket readable");
159
160 let mut buf = self.mut_buf.take().unwrap();
161
162 match self.sock.try_read_buf(&mut buf) {
163 Ok(None) => {
164 debug!("CLIENT : spurious read wakeup");
165 self.mut_buf = Some(buf);
166 }
167 Ok(Some(r)) => {
168 if r == 0 {
169 self.interest.remove(Ready::readable());
170 event_loop.shutdown();
171 } else {
172 debug!("CLIENT : We read {} bytes!", r);
173
174 // prepare for reading
175 let mut buf = buf.flip();
176
177 while buf.has_remaining() {
178 let actual = buf.read_byte().unwrap();
179 let expect = self.rx.read_byte().unwrap();
180
181 assert!(actual == expect, "actual={}; expect={}", actual, expect);
182 }
183
184 self.mut_buf = Some(buf.flip());
185 if !self.rx.has_remaining() {
186 self.next_msg(event_loop).unwrap();
187 }
188 }
189 }
190 Err(e) => {
191 panic!("not implemented; client err={:?}", e);
192 }
193 };
194
195 event_loop.reregister(&self.sock, self.token, self.interest,
196 PollOpt::edge() | PollOpt::oneshot())
197 }
198
writable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()>199 fn writable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
200 debug!("client socket writable");
201
202 match self.sock.try_write_buf(&mut self.tx) {
203 Ok(None) => {
204 debug!("client flushing buf; WOULDBLOCK");
205 self.interest.insert(Ready::writable());
206 }
207 Ok(Some(r)) => {
208 debug!("CLIENT : we wrote {} bytes!", r);
209 self.interest.insert(Ready::readable());
210 self.interest.remove(Ready::writable());
211 }
212 Err(e) => debug!("not implemented; client err={:?}", e)
213 }
214
215 event_loop.reregister(&self.sock, self.token, self.interest,
216 PollOpt::edge() | PollOpt::oneshot())
217 }
218
next_msg(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()>219 fn next_msg(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
220 if self.msgs.is_empty() {
221
222 return Ok(());
223 }
224
225 let curr = self.msgs.remove(0);
226
227 debug!("client prepping next message");
228 self.tx = SliceBuf::wrap(curr.as_bytes());
229 self.rx = SliceBuf::wrap(curr.as_bytes());
230
231 self.interest.insert(Ready::writable());
232 event_loop.reregister(&self.sock, self.token, self.interest,
233 PollOpt::edge() | PollOpt::oneshot())
234 }
235 }
236
237 struct Echo {
238 server: EchoServer,
239 client: EchoClient,
240 }
241
242 impl Echo {
new(srv: UnixListener, client: UnixStream, msgs: Vec<&'static str>) -> Echo243 fn new(srv: UnixListener, client: UnixStream, msgs: Vec<&'static str>) -> Echo {
244 Echo {
245 server: EchoServer {
246 sock: srv,
247 conns: Slab::with_capacity(128)
248 },
249 client: EchoClient::new(client, CLIENT, msgs)
250 }
251 }
252 }
253
254 impl Handler for Echo {
255 type Timeout = usize;
256 type Message = ();
257
ready(&mut self, event_loop: &mut EventLoop<Echo>, token: Token, events: Ready)258 fn ready(&mut self, event_loop: &mut EventLoop<Echo>, token: Token,
259 events: Ready) {
260 debug!("ready {:?} {:?}", token, events);
261 if events.is_readable() {
262 match token {
263 SERVER => self.server.accept(event_loop).unwrap(),
264 CLIENT => self.client.readable(event_loop).unwrap(),
265 i => self.server.conn_readable(event_loop, i).unwrap()
266 }
267 }
268
269 if events.is_writable() {
270 match token {
271 SERVER => panic!("received writable for token 0"),
272 CLIENT => self.client.writable(event_loop).unwrap(),
273 _ => self.server.conn_writable(event_loop, token).unwrap()
274 };
275 }
276 }
277 }
278
279 #[test]
test_echo_server()280 pub fn test_echo_server() {
281 debug!("Starting TEST_ECHO_SERVER");
282 let mut event_loop = EventLoop::new().unwrap();
283
284 let tmp_dir = TempDir::new("mio").unwrap();
285 let addr = tmp_dir.path().join(&PathBuf::from("sock"));
286
287 let srv = UnixListener::bind(&addr).unwrap();
288
289 event_loop.register(&srv, SERVER, Ready::readable(),
290 PollOpt::edge() | PollOpt::oneshot()).unwrap();
291
292 let sock = UnixStream::connect(&addr).unwrap();
293
294 // Connect to the server
295 event_loop.register(&sock, CLIENT, Ready::writable(),
296 PollOpt::edge() | PollOpt::oneshot()).unwrap();
297
298 // Start the event loop
299 event_loop.run(&mut Echo::new(srv, sock, vec!["foo", "bar"])).unwrap();
300 }
301