1 use std::collections::HashMap;
2 use std::net::{self, Shutdown};
3 use std::time::{Duration, Instant};
4 
5 use mio::{Token, Ready, PollOpt, Poll, Events};
6 use mio::event::{Evented, Event};
7 use mio::net::TcpStream;
8 
9 struct TestPoll {
10     poll: Poll,
11     events: Events,
12     buf: HashMap<Token, Ready>,
13 }
14 
15 impl TestPoll {
new() -> TestPoll16     fn new() -> TestPoll {
17         TestPoll {
18             poll: Poll::new().unwrap(),
19             events: Events::with_capacity(1024),
20             buf: HashMap::new(),
21         }
22     }
23 
register<E: ?Sized>(&self, handle: &E, token: Token, interest: Ready, opts: PollOpt) where E: Evented24     fn register<E: ?Sized>(&self, handle: &E, token: Token, interest: Ready, opts: PollOpt)
25         where E: Evented
26     {
27         self.poll.register(handle, token, interest, opts).unwrap();
28     }
29 
wait_for(&mut self, token: Token, ready: Ready) -> Result<(), &'static str>30     fn wait_for(&mut self, token: Token, ready: Ready) -> Result<(), &'static str> {
31         let now = Instant::now();
32 
33         loop {
34             if now.elapsed() > Duration::from_secs(1) {
35                 return Err("not ready");
36             }
37 
38             if let Some(curr) = self.buf.get(&token) {
39                 if curr.contains(ready) {
40                     break;
41                 }
42             }
43 
44             self.poll.poll(&mut self.events, Some(Duration::from_millis(250))).unwrap();
45 
46             for event in &self.events {
47                 let curr = self.buf.entry(event.token())
48                     .or_insert(Ready::empty());
49 
50                 *curr |= event.readiness();
51             }
52         }
53 
54         *self.buf.get_mut(&token).unwrap() -= ready;
55         Ok(())
56     }
57 
check_idle(&mut self) -> Result<(), Event>58     fn check_idle(&mut self) -> Result<(), Event> {
59         self.poll.poll(&mut self.events, Some(Duration::from_millis(100))).unwrap();
60 
61         if let Some(e) = self.events.iter().next() {
62             Err(e)
63         } else {
64             Ok(())
65         }
66     }
67 }
68 
69 macro_rules! assert_ready {
70     ($poll:expr, $token:expr, $ready:expr) => {{
71         match $poll.wait_for($token, $ready) {
72             Ok(_) => {}
73             Err(_) => panic!("not ready; token = {:?}; interest = {:?}", $token, $ready),
74         }
75     }}
76 }
77 
78 macro_rules! assert_not_ready {
79     ($poll:expr, $token:expr, $ready:expr) => {{
80         match $poll.wait_for($token, $ready) {
81             Ok(_) => panic!("is ready; token = {:?}; interest = {:?}", $token, $ready),
82             Err(_) => {}
83         }
84     }}
85 }
86 
87 macro_rules! assert_hup_ready {
88     ($poll:expr) => {
89         #[cfg(unix)]
90         {
91             use mio::unix::UnixReady;
92             assert_ready!($poll, Token(0), Ready::from(UnixReady::hup()))
93         }
94     }
95 }
96 
97 macro_rules! assert_not_hup_ready {
98     ($poll:expr) => {
99         #[cfg(unix)]
100         {
101             use mio::unix::UnixReady;
102             assert_not_ready!($poll, Token(0), Ready::from(UnixReady::hup()))
103         }
104     }
105 }
106 
107 macro_rules! assert_idle {
108     ($poll:expr) => {
109         match $poll.check_idle() {
110             Ok(()) => {}
111             Err(e) => panic!("not idle; event = {:?}", e),
112         }
113     }
114 }
115 
116 // TODO: replace w/ assertive
117 // https://github.com/carllerche/assertive
118 macro_rules! assert_ok {
119     ($e:expr) => {
120         assert_ok!($e,)
121     };
122     ($e:expr,) => {{
123         use std::result::Result::*;
124         match $e {
125             Ok(v) => v,
126             Err(e) => panic!("assertion failed: error = {:?}", e),
127         }
128     }};
129     ($e:expr, $($arg:tt)+) => {{
130         use std::result::Result::*;
131         match $e {
132             Ok(v) => v,
133             Err(e) => panic!("assertion failed: error = {:?}: {}", e, format_args!($($arg)+)),
134         }
135     }};
136 }
137 
138 #[test]
test_write_shutdown()139 fn test_write_shutdown() {
140     use std::io::prelude::*;
141 
142     let mut poll = TestPoll::new();
143     let mut buf = [0; 1024];
144 
145     let listener = assert_ok!(net::TcpListener::bind("127.0.0.1:0"));
146     let addr = assert_ok!(listener.local_addr());
147 
148     let mut client = assert_ok!(TcpStream::connect(&addr));
149     poll.register(&client,
150                   Token(0),
151                   Ready::readable() | Ready::writable(),
152                   PollOpt::edge());
153 
154     let (socket, _) = assert_ok!(listener.accept());
155 
156     assert_ready!(poll, Token(0), Ready::writable());
157 
158     // Polling should not have any events
159     assert_idle!(poll);
160 
161     // Now, shutdown the write half of the socket.
162     assert_ok!(socket.shutdown(Shutdown::Write));
163 
164     assert_ready!(poll, Token(0), Ready::readable());
165 
166     assert_not_hup_ready!(poll);
167 
168     let n = assert_ok!(client.read(&mut buf));
169     assert_eq!(n, 0);
170 }
171 
172 #[test]
test_graceful_shutdown()173 fn test_graceful_shutdown() {
174     use std::io::prelude::*;
175 
176     let mut poll = TestPoll::new();
177     let mut buf = [0; 1024];
178 
179     let listener = assert_ok!(net::TcpListener::bind("127.0.0.1:0"));
180     let addr = assert_ok!(listener.local_addr());
181 
182     let mut client = assert_ok!(TcpStream::connect(&addr));
183     poll.register(&client,
184                   Token(0),
185                   Ready::readable() | Ready::writable(),
186                   PollOpt::edge());
187 
188     let (mut socket, _) = assert_ok!(listener.accept());
189 
190     assert_ready!(poll, Token(0), Ready::writable());
191 
192     // Polling should not have any events
193     assert_idle!(poll);
194 
195     // Now, shutdown the write half of the socket.
196     assert_ok!(client.shutdown(Shutdown::Write));
197 
198     let n = assert_ok!(socket.read(&mut buf));
199     assert_eq!(0, n);
200     drop(socket);
201 
202     assert_ready!(poll, Token(0), Ready::readable());
203     #[cfg(not(any(target_os = "bitrig", target_os = "dragonfly",
204         target_os = "freebsd", target_os = "ios", target_os = "macos",
205         target_os = "netbsd", target_os = "openbsd")))]
206     assert_hup_ready!(poll);
207 
208     let mut buf = [0; 1024];
209     let n = assert_ok!(client.read(&mut buf));
210     assert_eq!(n, 0);
211 }
212 
213 #[test]
test_abrupt_shutdown()214 fn test_abrupt_shutdown() {
215     use net2::TcpStreamExt;
216     use std::io::Read;
217 
218     let mut poll = TestPoll::new();
219     let mut buf = [0; 1024];
220 
221     let listener = assert_ok!(net::TcpListener::bind("127.0.0.1:0"));
222     let addr = assert_ok!(listener.local_addr());
223 
224     let mut client = assert_ok!(TcpStream::connect(&addr));
225     poll.register(&client,
226                   Token(0),
227                   Ready::readable() | Ready::writable(),
228                   PollOpt::edge());
229 
230     let (socket, _) = assert_ok!(listener.accept());
231     assert_ok!(socket.set_linger(Some(Duration::from_millis(0))));
232     // assert_ok!(socket.set_linger(None));
233 
234     // Wait to be connected
235     assert_ready!(poll, Token(0), Ready::writable());
236 
237     drop(socket);
238 
239     #[cfg(not(any(target_os = "bitrig", target_os = "dragonfly",
240         target_os = "freebsd", target_os = "ios", target_os = "macos",
241         target_os = "netbsd", target_os = "openbsd")))]
242     assert_hup_ready!(poll);
243     assert_ready!(poll, Token(0), Ready::writable());
244     assert_ready!(poll, Token(0), Ready::readable());
245 
246     let res = client.read(&mut buf);
247     assert!(res.is_err(), "not err = {:?}", res);
248 }
249