1 use {localhost, sleep_ms, TryRead, TryWrite};
2 use mio::*;
3 use mio::deprecated::{EventLoop, EventLoopBuilder, Handler};
4 use mio::net::{TcpListener, TcpStream};
5 use std::collections::LinkedList;
6 use slab::Slab;
7 use std::{io, thread};
8 use std::time::Duration;
9
10 // Don't touch the connection slab
11 const SERVER: Token = Token(10_000_000);
12 const CLIENT: Token = Token(10_000_001);
13
14 #[cfg(windows)]
15 const N: usize = 10_000;
16 #[cfg(unix)]
17 const N: usize = 1_000_000;
18
19 struct EchoConn {
20 sock: TcpStream,
21 token: Option<Token>,
22 count: usize,
23 buf: Vec<u8>
24 }
25
26 impl EchoConn {
new(sock: TcpStream) -> EchoConn27 fn new(sock: TcpStream) -> EchoConn {
28 let mut ec =
29 EchoConn {
30 sock: sock,
31 token: None,
32 buf: Vec::with_capacity(22),
33 count: 0
34 };
35 unsafe { ec.buf.set_len(22) };
36 ec
37 }
38
writable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()>39 fn writable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
40 event_loop.reregister(&self.sock, self.token.unwrap(),
41 Ready::readable(),
42 PollOpt::edge() | PollOpt::oneshot())
43 }
44
readable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()>45 fn readable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
46 loop {
47 match self.sock.try_read(&mut self.buf[..]) {
48 Ok(None) => {
49 break;
50 }
51 Ok(Some(_)) => {
52 self.count += 1;
53 if self.count % 10000 == 0 {
54 info!("Received {} messages", self.count);
55 }
56 if self.count == N {
57 event_loop.shutdown();
58 }
59 }
60 Err(_) => {
61 break;
62 }
63
64 };
65 }
66
67 event_loop.reregister(&self.sock, self.token.unwrap(), Ready::readable(), PollOpt::edge() | PollOpt::oneshot())
68 }
69 }
70
71 struct EchoServer {
72 sock: TcpListener,
73 conns: Slab<EchoConn>
74 }
75
76 impl EchoServer {
accept(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()>77 fn accept(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
78 debug!("server accepting socket");
79
80 let sock = self.sock.accept().unwrap().0;
81 let conn = EchoConn::new(sock,);
82 let tok = self.conns.insert(conn);
83
84 // Register the connection
85 self.conns[tok].token = Some(Token(tok));
86 event_loop.register(&self.conns[tok].sock, Token(tok), Ready::readable(),
87 PollOpt::edge() | PollOpt::oneshot())
88 .expect("could not register socket with event loop");
89
90 Ok(())
91 }
92
conn_readable(&mut self, event_loop: &mut EventLoop<Echo>, tok: Token) -> io::Result<()>93 fn conn_readable(&mut self, event_loop: &mut EventLoop<Echo>,
94 tok: Token) -> io::Result<()> {
95 debug!("server conn readable; tok={:?}", tok);
96 self.conn(tok).readable(event_loop)
97 }
98
conn_writable(&mut self, event_loop: &mut EventLoop<Echo>, tok: Token) -> io::Result<()>99 fn conn_writable(&mut self, event_loop: &mut EventLoop<Echo>,
100 tok: Token) -> io::Result<()> {
101 debug!("server conn writable; tok={:?}", tok);
102 self.conn(tok).writable(event_loop)
103 }
104
conn<'a>(&'a mut self, tok: Token) -> &'a mut EchoConn105 fn conn<'a>(&'a mut self, tok: Token) -> &'a mut EchoConn {
106 &mut self.conns[tok.into()]
107 }
108 }
109
110 struct EchoClient {
111 sock: TcpStream,
112 backlog: LinkedList<String>,
113 token: Token,
114 count: u32
115 }
116
117
118 // Sends a message and expects to receive the same exact message, one at a time
119 impl EchoClient {
new(sock: TcpStream, tok: Token) -> EchoClient120 fn new(sock: TcpStream, tok: Token) -> EchoClient {
121
122 EchoClient {
123 sock: sock,
124 backlog: LinkedList::new(),
125 token: tok,
126 count: 0
127 }
128 }
129
readable(&mut self, _event_loop: &mut EventLoop<Echo>) -> io::Result<()>130 fn readable(&mut self, _event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
131 Ok(())
132 }
133
writable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()>134 fn writable(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
135 debug!("client socket writable");
136
137 while self.backlog.len() > 0 {
138 match self.sock.try_write(self.backlog.front().unwrap().as_bytes()) {
139 Ok(None) => {
140 break;
141 }
142 Ok(Some(_)) => {
143 self.backlog.pop_front();
144 self.count += 1;
145 if self.count % 10000 == 0 {
146 info!("Sent {} messages", self.count);
147 }
148 }
149 Err(e) => { debug!("not implemented; client err={:?}", e); break; }
150 }
151 }
152 if self.backlog.len() > 0 {
153 event_loop.reregister(&self.sock, self.token, Ready::writable(),
154 PollOpt::edge() | PollOpt::oneshot()).unwrap();
155 }
156
157 Ok(())
158 }
159 }
160
161 struct Echo {
162 server: EchoServer,
163 client: EchoClient,
164 }
165
166 impl Echo {
new(srv: TcpListener, client: TcpStream) -> Echo167 fn new(srv: TcpListener, client: TcpStream) -> Echo {
168 Echo {
169 server: EchoServer {
170 sock: srv,
171 conns: Slab::with_capacity(128),
172 },
173 client: EchoClient::new(client, CLIENT),
174 }
175 }
176 }
177
178 impl Handler for Echo {
179 type Timeout = usize;
180 type Message = String;
181
ready(&mut self, event_loop: &mut EventLoop<Echo>, token: Token, events: Ready)182 fn ready(&mut self, event_loop: &mut EventLoop<Echo>, token: Token,
183 events: Ready) {
184
185 if events.is_readable() {
186 match token {
187 SERVER => self.server.accept(event_loop).unwrap(),
188 CLIENT => self.client.readable(event_loop).unwrap(),
189 i => self.server.conn_readable(event_loop, i).unwrap()
190 }
191 }
192 if events.is_writable() {
193 match token {
194 SERVER => panic!("received writable for token 0"),
195 CLIENT => self.client.writable(event_loop).unwrap(),
196 _ => self.server.conn_writable(event_loop, token).unwrap()
197 }
198 }
199 }
200
notify(&mut self, event_loop: &mut EventLoop<Echo>, msg: String)201 fn notify(&mut self, event_loop: &mut EventLoop<Echo>, msg: String) {
202 match self.client.sock.try_write(msg.as_bytes()) {
203 Ok(Some(n)) => {
204 self.client.count += 1;
205 if self.client.count % 10000 == 0 {
206 info!("Sent {} bytes: count {}", n, self.client.count);
207 }
208 },
209
210 _ => {
211 self.client.backlog.push_back(msg);
212 event_loop.reregister(
213 &self.client.sock,
214 self.client.token,
215 Ready::writable(),
216 PollOpt::edge() | PollOpt::oneshot()).unwrap();
217 }
218 }
219 }
220 }
221
222 #[test]
test_echo_server()223 pub fn test_echo_server() {
224 debug!("Starting TEST_ECHO_SERVER");
225 let mut b = EventLoopBuilder::new();
226 b.notify_capacity(1_048_576)
227 .messages_per_tick(64)
228 .timer_tick(Duration::from_millis(100))
229 .timer_wheel_size(1_024)
230 .timer_capacity(65_536);
231
232 let mut event_loop = b.build().unwrap();
233
234 let addr = localhost();
235
236 let srv = TcpListener::bind(&addr).unwrap();
237
238 info!("listen for connections");
239 event_loop.register(&srv, SERVER, Ready::readable(),
240 PollOpt::edge() | PollOpt::oneshot()).unwrap();
241
242 let sock = TcpStream::connect(&addr).unwrap();
243
244 // Connect to the server
245 event_loop.register(&sock, CLIENT, Ready::writable(),
246 PollOpt::edge() | PollOpt::oneshot()).unwrap();
247 let chan = event_loop.channel();
248
249 let go = move || {
250 let mut i = N;
251
252 sleep_ms(1_000);
253
254 let message = "THIS IS A TEST MESSAGE".to_string();
255 while i > 0 {
256 chan.send(message.clone()).unwrap();
257 i -= 1;
258 if i % 10000 == 0 {
259 info!("Enqueued {} messages", N - i);
260 }
261 }
262 };
263
264 let t = thread::spawn(go);
265
266 // Start the event loop
267 event_loop.run(&mut Echo::new(srv, sock)).unwrap();
268 t.join().unwrap();
269 }
270