1 use {localhost, sleep_ms, TryRead, TryWrite};
2 use mio::*;
3 use mio::deprecated::{EventLoop, EventLoopBuilder, Handler};
4 use mio::net::{TcpListener, TcpStream};
5 use std::collections::LinkedList;
6 use slab::Slab;
7 use std::{io, thread};
8 use std::time::Duration;
9 
10 // Don't touch the connection slab
11 const SERVER: Token = Token(10_000_000);
12 const CLIENT: Token = Token(10_000_001);
13 
14 #[cfg(windows)]
15 const N: usize = 10_000;
16 #[cfg(unix)]
17 const N: usize = 1_000_000;
18 
19 struct EchoConn {
20     sock: TcpStream,
21     token: Option<Token>,
22     count: usize,
23     buf: Vec<u8>
24 }
25 
26 impl EchoConn {
new(sock: TcpStream) -> EchoConn27     fn new(sock: TcpStream) -> EchoConn {
28         let mut ec =
29         EchoConn {
30             sock: sock,
31             token: None,
32             buf: Vec::with_capacity(22),
33             count: 0
34         };
35         unsafe { ec.buf.set_len(22) };
36         ec
37     }
38 
writable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()>39     fn writable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
40         event_loop.reregister(&self.sock, self.token.unwrap(),
41                               Ready::readable(),
42                               PollOpt::edge() | PollOpt::oneshot())
43     }
44 
readable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()>45     fn readable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
46         loop {
47             match self.sock.try_read(&mut self.buf[..]) {
48                 Ok(None) => {
49                     break;
50                 }
51                 Ok(Some(_)) => {
52                     self.count += 1;
53                     if self.count % 10000 == 0 {
54                         info!("Received {} messages", self.count);
55                     }
56                     if self.count == N {
57                         event_loop.shutdown();
58                     }
59                 }
60                 Err(_) => {
61                     break;
62                 }
63 
64             };
65         }
66 
67         event_loop.reregister(&self.sock, self.token.unwrap(), Ready::readable(), PollOpt::edge() | PollOpt::oneshot())
68     }
69 }
70 
71 struct EchoServer {
72     sock: TcpListener,
73     conns: Slab<EchoConn>
74 }
75 
76 impl EchoServer {
accept(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()>77     fn accept(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
78         debug!("server accepting socket");
79 
80         let sock = self.sock.accept().unwrap().0;
81         let conn = EchoConn::new(sock,);
82         let tok = self.conns.insert(conn);
83 
84         // Register the connection
85         self.conns[tok].token = Some(Token(tok));
86         event_loop.register(&self.conns[tok].sock, Token(tok), Ready::readable(),
87                             PollOpt::edge() | PollOpt::oneshot())
88             .expect("could not register socket with event loop");
89 
90         Ok(())
91     }
92 
conn_readable(&mut self, event_loop: &mut EventLoop<Echo>, tok: Token) -> io::Result<()>93     fn conn_readable(&mut self, event_loop: &mut EventLoop<Echo>,
94                      tok: Token) -> io::Result<()> {
95         debug!("server conn readable; tok={:?}", tok);
96         self.conn(tok).readable(event_loop)
97     }
98 
conn_writable(&mut self, event_loop: &mut EventLoop<Echo>, tok: Token) -> io::Result<()>99     fn conn_writable(&mut self, event_loop: &mut EventLoop<Echo>,
100                      tok: Token) -> io::Result<()> {
101         debug!("server conn writable; tok={:?}", tok);
102         self.conn(tok).writable(event_loop)
103     }
104 
conn<'a>(&'a mut self, tok: Token) -> &'a mut EchoConn105     fn conn<'a>(&'a mut self, tok: Token) -> &'a mut EchoConn {
106         &mut self.conns[tok.into()]
107     }
108 }
109 
110 struct EchoClient {
111     sock: TcpStream,
112     backlog: LinkedList<String>,
113     token: Token,
114     count: u32
115 }
116 
117 
118 // Sends a message and expects to receive the same exact message, one at a time
119 impl EchoClient {
new(sock: TcpStream, tok: Token) -> EchoClient120     fn new(sock: TcpStream, tok: Token) -> EchoClient {
121 
122         EchoClient {
123             sock: sock,
124             backlog: LinkedList::new(),
125             token: tok,
126             count: 0
127         }
128     }
129 
readable(&mut self, _event_loop: &mut EventLoop<Echo>) -> io::Result<()>130     fn readable(&mut self, _event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
131         Ok(())
132     }
133 
writable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()>134     fn writable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
135         debug!("client socket writable");
136 
137         while self.backlog.len() > 0 {
138             match self.sock.try_write(self.backlog.front().unwrap().as_bytes()) {
139                 Ok(None) => {
140                     break;
141                 }
142                 Ok(Some(_)) => {
143                     self.backlog.pop_front();
144                     self.count += 1;
145                     if self.count % 10000 == 0 {
146                         info!("Sent {} messages", self.count);
147                     }
148                 }
149                 Err(e) => { debug!("not implemented; client err={:?}", e); break; }
150             }
151         }
152         if self.backlog.len() > 0 {
153             event_loop.reregister(&self.sock, self.token, Ready::writable(),
154                                   PollOpt::edge() | PollOpt::oneshot()).unwrap();
155         }
156 
157         Ok(())
158     }
159 }
160 
161 struct Echo {
162     server: EchoServer,
163     client: EchoClient,
164 }
165 
166 impl Echo {
new(srv: TcpListener, client: TcpStream) -> Echo167     fn new(srv: TcpListener, client: TcpStream) -> Echo {
168         Echo {
169             server: EchoServer {
170                 sock: srv,
171                 conns: Slab::with_capacity(128),
172             },
173             client: EchoClient::new(client, CLIENT),
174         }
175     }
176 }
177 
178 impl Handler for Echo {
179     type Timeout = usize;
180     type Message = String;
181 
ready(&mut self, event_loop: &mut EventLoop<Echo>, token: Token, events: Ready)182     fn ready(&mut self, event_loop: &mut EventLoop<Echo>, token: Token,
183              events: Ready) {
184 
185         if events.is_readable() {
186             match token {
187                 SERVER => self.server.accept(event_loop).unwrap(),
188                 CLIENT => self.client.readable(event_loop).unwrap(),
189                 i => self.server.conn_readable(event_loop, i).unwrap()
190             }
191         }
192         if events.is_writable() {
193             match token {
194                 SERVER => panic!("received writable for token 0"),
195                 CLIENT => self.client.writable(event_loop).unwrap(),
196                 _ => self.server.conn_writable(event_loop, token).unwrap()
197             }
198         }
199     }
200 
notify(&mut self, event_loop: &mut EventLoop<Echo>, msg: String)201     fn notify(&mut self, event_loop: &mut EventLoop<Echo>, msg: String) {
202         match self.client.sock.try_write(msg.as_bytes()) {
203             Ok(Some(n)) => {
204                 self.client.count += 1;
205                 if self.client.count % 10000 == 0 {
206                     info!("Sent {} bytes:   count {}", n, self.client.count);
207                 }
208             },
209 
210             _ => {
211                 self.client.backlog.push_back(msg);
212                 event_loop.reregister(
213                     &self.client.sock,
214                     self.client.token,
215                     Ready::writable(),
216                     PollOpt::edge() | PollOpt::oneshot()).unwrap();
217             }
218         }
219     }
220 }
221 
222 #[test]
test_echo_server()223 pub fn test_echo_server() {
224     debug!("Starting TEST_ECHO_SERVER");
225     let mut b = EventLoopBuilder::new();
226     b.notify_capacity(1_048_576)
227         .messages_per_tick(64)
228         .timer_tick(Duration::from_millis(100))
229         .timer_wheel_size(1_024)
230         .timer_capacity(65_536);
231 
232     let mut event_loop = b.build().unwrap();
233 
234     let addr = localhost();
235 
236     let srv = TcpListener::bind(&addr).unwrap();
237 
238     info!("listen for connections");
239     event_loop.register(&srv, SERVER, Ready::readable(),
240                         PollOpt::edge() | PollOpt::oneshot()).unwrap();
241 
242     let sock = TcpStream::connect(&addr).unwrap();
243 
244     // Connect to the server
245     event_loop.register(&sock, CLIENT, Ready::writable(),
246                         PollOpt::edge() | PollOpt::oneshot()).unwrap();
247     let chan = event_loop.channel();
248 
249     let go = move || {
250         let mut i = N;
251 
252         sleep_ms(1_000);
253 
254         let message = "THIS IS A TEST MESSAGE".to_string();
255         while i > 0 {
256             chan.send(message.clone()).unwrap();
257             i -= 1;
258             if i % 10000 == 0 {
259                 info!("Enqueued {} messages", N - i);
260             }
261         }
262     };
263 
264     let t = thread::spawn(go);
265 
266     // Start the event loop
267     event_loop.run(&mut Echo::new(srv, sock)).unwrap();
268     t.join().unwrap();
269 }
270