1 #![cfg(all(feature = "os-poll", feature = "tcp"))]
2
3 use mio::net::TcpListener;
4 use mio::{Interest, Token};
5 use std::io::{self, Read};
6 use std::net::{self, SocketAddr};
7 #[cfg(unix)]
8 use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd};
9 use std::sync::{Arc, Barrier};
10 use std::thread;
11
12 mod util;
13 use util::{
14 any_local_address, any_local_ipv6_address, assert_send, assert_socket_close_on_exec,
15 assert_socket_non_blocking, assert_sync, assert_would_block, expect_events, expect_no_events,
16 init, init_with_poll, ExpectEvent,
17 };
18
19 const ID1: Token = Token(0);
20 const ID2: Token = Token(1);
21
22 #[test]
is_send_and_sync()23 fn is_send_and_sync() {
24 assert_send::<TcpListener>();
25 assert_sync::<TcpListener>();
26 }
27
28 #[test]
tcp_listener()29 fn tcp_listener() {
30 smoke_test_tcp_listener(any_local_address(), TcpListener::bind);
31 }
32
33 #[test]
tcp_listener_ipv6()34 fn tcp_listener_ipv6() {
35 smoke_test_tcp_listener(any_local_ipv6_address(), TcpListener::bind);
36 }
37
38 #[test]
tcp_listener_std()39 fn tcp_listener_std() {
40 smoke_test_tcp_listener(any_local_address(), |addr| {
41 let listener = net::TcpListener::bind(addr).unwrap();
42 // `std::net::TcpListener`s are blocking by default, so make sure it is in
43 // non-blocking mode before wrapping in a Mio equivalent.
44 listener.set_nonblocking(true).unwrap();
45 Ok(TcpListener::from_std(listener))
46 });
47 }
48
smoke_test_tcp_listener<F>(addr: SocketAddr, make_listener: F) where F: FnOnce(SocketAddr) -> io::Result<TcpListener>,49 fn smoke_test_tcp_listener<F>(addr: SocketAddr, make_listener: F)
50 where
51 F: FnOnce(SocketAddr) -> io::Result<TcpListener>,
52 {
53 let (mut poll, mut events) = init_with_poll();
54
55 let mut listener = make_listener(addr).unwrap();
56 let address = listener.local_addr().unwrap();
57
58 assert_socket_non_blocking(&listener);
59 assert_socket_close_on_exec(&listener);
60
61 poll.registry()
62 .register(&mut listener, ID1, Interest::READABLE)
63 .expect("unable to register TCP listener");
64
65 let barrier = Arc::new(Barrier::new(2));
66 let thread_handle = start_connections(address, 1, barrier.clone());
67
68 expect_events(
69 &mut poll,
70 &mut events,
71 vec![ExpectEvent::new(ID1, Interest::READABLE)],
72 );
73
74 // Expect a single connection.
75 let (mut stream, peer_address) = listener.accept().expect("unable to accept connection");
76 assert!(peer_address.ip().is_loopback());
77 assert_eq!(stream.peer_addr().unwrap(), peer_address);
78 assert_eq!(stream.local_addr().unwrap(), address);
79
80 // Expect the stream to be non-blocking.
81 let mut buf = [0; 20];
82 assert_would_block(stream.read(&mut buf));
83
84 // Expect no more connections.
85 assert_would_block(listener.accept());
86
87 assert!(listener.take_error().unwrap().is_none());
88
89 barrier.wait();
90 thread_handle.join().expect("unable to join thread");
91 }
92
93 #[test]
set_get_ttl()94 fn set_get_ttl() {
95 init();
96
97 let listener = TcpListener::bind(any_local_address()).unwrap();
98
99 // set TTL, get TTL, make sure it has the expected value
100 const TTL: u32 = 10;
101 listener.set_ttl(TTL).unwrap();
102 assert_eq!(listener.ttl().unwrap(), TTL);
103 assert!(listener.take_error().unwrap().is_none());
104 }
105
106 #[test]
get_ttl_without_previous_set()107 fn get_ttl_without_previous_set() {
108 init();
109
110 let listener = TcpListener::bind(any_local_address()).unwrap();
111
112 // expect a get TTL to work w/o any previous set_ttl
113 listener.ttl().expect("unable to get TTL for TCP listener");
114 assert!(listener.take_error().unwrap().is_none());
115 }
116
117 #[cfg(unix)]
118 #[test]
raw_fd()119 fn raw_fd() {
120 init();
121
122 let listener = TcpListener::bind(any_local_address()).unwrap();
123 let address = listener.local_addr().unwrap();
124
125 let raw_fd1 = listener.as_raw_fd();
126 let raw_fd2 = listener.into_raw_fd();
127 assert_eq!(raw_fd1, raw_fd2);
128
129 let listener = unsafe { TcpListener::from_raw_fd(raw_fd2) };
130 assert_eq!(listener.as_raw_fd(), raw_fd1);
131 assert_eq!(listener.local_addr().unwrap(), address);
132 }
133
134 #[test]
registering()135 fn registering() {
136 let (mut poll, mut events) = init_with_poll();
137
138 let mut stream = TcpListener::bind(any_local_address()).unwrap();
139
140 poll.registry()
141 .register(&mut stream, ID1, Interest::READABLE)
142 .expect("unable to register TCP listener");
143
144 expect_no_events(&mut poll, &mut events);
145
146 // NOTE: more tests are done in the smoke tests above.
147 }
148
149 #[test]
reregister()150 fn reregister() {
151 let (mut poll, mut events) = init_with_poll();
152
153 let mut listener = TcpListener::bind(any_local_address()).unwrap();
154 let address = listener.local_addr().unwrap();
155
156 poll.registry()
157 .register(&mut listener, ID1, Interest::READABLE)
158 .unwrap();
159 poll.registry()
160 .reregister(&mut listener, ID2, Interest::READABLE)
161 .unwrap();
162
163 let barrier = Arc::new(Barrier::new(2));
164 let thread_handle = start_connections(address, 1, barrier.clone());
165
166 expect_events(
167 &mut poll,
168 &mut events,
169 vec![ExpectEvent::new(ID2, Interest::READABLE)],
170 );
171
172 let (stream, peer_address) = listener.accept().expect("unable to accept connection");
173 assert!(peer_address.ip().is_loopback());
174 assert_eq!(stream.peer_addr().unwrap(), peer_address);
175 assert_eq!(stream.local_addr().unwrap(), address);
176
177 assert_would_block(listener.accept());
178
179 assert!(listener.take_error().unwrap().is_none());
180
181 barrier.wait();
182 thread_handle.join().expect("unable to join thread");
183 }
184
185 #[test]
no_events_after_deregister()186 fn no_events_after_deregister() {
187 let (mut poll, mut events) = init_with_poll();
188
189 let mut listener = TcpListener::bind(any_local_address()).unwrap();
190 let address = listener.local_addr().unwrap();
191
192 poll.registry()
193 .register(&mut listener, ID1, Interest::READABLE)
194 .unwrap();
195
196 let barrier = Arc::new(Barrier::new(2));
197 let thread_handle = start_connections(address, 1, barrier.clone());
198
199 poll.registry().deregister(&mut listener).unwrap();
200
201 expect_no_events(&mut poll, &mut events);
202
203 // Should still be able to accept the connection.
204 let (stream, peer_address) = listener.accept().expect("unable to accept connection");
205 assert!(peer_address.ip().is_loopback());
206 assert_eq!(stream.peer_addr().unwrap(), peer_address);
207 assert_eq!(stream.local_addr().unwrap(), address);
208
209 assert_would_block(listener.accept());
210
211 assert!(listener.take_error().unwrap().is_none());
212
213 barrier.wait();
214 thread_handle.join().expect("unable to join thread");
215 }
216
217 /// This tests reregister on successful accept works
218 #[test]
tcp_listener_two_streams()219 fn tcp_listener_two_streams() {
220 let (mut poll1, mut events) = init_with_poll();
221
222 let mut listener = TcpListener::bind(any_local_address()).unwrap();
223 let address = listener.local_addr().unwrap();
224
225 let barrier = Arc::new(Barrier::new(3));
226 let thread_handle1 = start_connections(address, 1, barrier.clone());
227
228 poll1
229 .registry()
230 .register(&mut listener, ID1, Interest::READABLE)
231 .unwrap();
232
233 expect_events(
234 &mut poll1,
235 &mut events,
236 vec![ExpectEvent::new(ID1, Interest::READABLE)],
237 );
238
239 {
240 let (stream, peer_address) = listener.accept().expect("unable to accept connection");
241 assert!(peer_address.ip().is_loopback());
242 assert_eq!(stream.peer_addr().unwrap(), peer_address);
243 assert_eq!(stream.local_addr().unwrap(), address);
244 }
245
246 assert_would_block(listener.accept());
247
248 let thread_handle2 = start_connections(address, 1, barrier.clone());
249
250 expect_events(
251 &mut poll1,
252 &mut events,
253 vec![ExpectEvent::new(ID1, Interest::READABLE)],
254 );
255
256 {
257 let (stream, peer_address) = listener.accept().expect("unable to accept connection");
258 assert!(peer_address.ip().is_loopback());
259 assert_eq!(stream.peer_addr().unwrap(), peer_address);
260 assert_eq!(stream.local_addr().unwrap(), address);
261 }
262
263 expect_no_events(&mut poll1, &mut events);
264
265 barrier.wait();
266 thread_handle1.join().expect("unable to join thread");
267 thread_handle2.join().expect("unable to join thread");
268 }
269
270 /// Start `n_connections` connections to `address`. If a `barrier` is provided
271 /// it will wait on it after each connection is made before it is dropped.
start_connections( address: SocketAddr, n_connections: usize, barrier: Arc<Barrier>, ) -> thread::JoinHandle<()>272 fn start_connections(
273 address: SocketAddr,
274 n_connections: usize,
275 barrier: Arc<Barrier>,
276 ) -> thread::JoinHandle<()> {
277 thread::spawn(move || {
278 for _ in 0..n_connections {
279 let conn = net::TcpStream::connect(address).unwrap();
280 barrier.wait();
281 drop(conn);
282 }
283 })
284 }
285