1 #![cfg(all(feature = "os-poll", feature = "tcp"))]
2 
3 use mio::net::TcpListener;
4 use mio::{Interest, Token};
5 use std::io::{self, Read};
6 use std::net::{self, SocketAddr};
7 #[cfg(unix)]
8 use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd};
9 use std::sync::{Arc, Barrier};
10 use std::thread;
11 
12 mod util;
13 use util::{
14     any_local_address, any_local_ipv6_address, assert_send, assert_socket_close_on_exec,
15     assert_socket_non_blocking, assert_sync, assert_would_block, expect_events, expect_no_events,
16     init, init_with_poll, ExpectEvent,
17 };
18 
19 const ID1: Token = Token(0);
20 const ID2: Token = Token(1);
21 
22 #[test]
is_send_and_sync()23 fn is_send_and_sync() {
24     assert_send::<TcpListener>();
25     assert_sync::<TcpListener>();
26 }
27 
28 #[test]
tcp_listener()29 fn tcp_listener() {
30     smoke_test_tcp_listener(any_local_address(), TcpListener::bind);
31 }
32 
33 #[test]
tcp_listener_ipv6()34 fn tcp_listener_ipv6() {
35     smoke_test_tcp_listener(any_local_ipv6_address(), TcpListener::bind);
36 }
37 
38 #[test]
tcp_listener_std()39 fn tcp_listener_std() {
40     smoke_test_tcp_listener(any_local_address(), |addr| {
41         let listener = net::TcpListener::bind(addr).unwrap();
42         // `std::net::TcpListener`s are blocking by default, so make sure it is in
43         // non-blocking mode before wrapping in a Mio equivalent.
44         listener.set_nonblocking(true).unwrap();
45         Ok(TcpListener::from_std(listener))
46     });
47 }
48 
smoke_test_tcp_listener<F>(addr: SocketAddr, make_listener: F) where F: FnOnce(SocketAddr) -> io::Result<TcpListener>,49 fn smoke_test_tcp_listener<F>(addr: SocketAddr, make_listener: F)
50 where
51     F: FnOnce(SocketAddr) -> io::Result<TcpListener>,
52 {
53     let (mut poll, mut events) = init_with_poll();
54 
55     let mut listener = make_listener(addr).unwrap();
56     let address = listener.local_addr().unwrap();
57 
58     assert_socket_non_blocking(&listener);
59     assert_socket_close_on_exec(&listener);
60 
61     poll.registry()
62         .register(&mut listener, ID1, Interest::READABLE)
63         .expect("unable to register TCP listener");
64 
65     let barrier = Arc::new(Barrier::new(2));
66     let thread_handle = start_connections(address, 1, barrier.clone());
67 
68     expect_events(
69         &mut poll,
70         &mut events,
71         vec![ExpectEvent::new(ID1, Interest::READABLE)],
72     );
73 
74     // Expect a single connection.
75     let (mut stream, peer_address) = listener.accept().expect("unable to accept connection");
76     assert!(peer_address.ip().is_loopback());
77     assert_eq!(stream.peer_addr().unwrap(), peer_address);
78     assert_eq!(stream.local_addr().unwrap(), address);
79 
80     // Expect the stream to be non-blocking.
81     let mut buf = [0; 20];
82     assert_would_block(stream.read(&mut buf));
83 
84     // Expect no more connections.
85     assert_would_block(listener.accept());
86 
87     assert!(listener.take_error().unwrap().is_none());
88 
89     barrier.wait();
90     thread_handle.join().expect("unable to join thread");
91 }
92 
93 #[test]
set_get_ttl()94 fn set_get_ttl() {
95     init();
96 
97     let listener = TcpListener::bind(any_local_address()).unwrap();
98 
99     // set TTL, get TTL, make sure it has the expected value
100     const TTL: u32 = 10;
101     listener.set_ttl(TTL).unwrap();
102     assert_eq!(listener.ttl().unwrap(), TTL);
103     assert!(listener.take_error().unwrap().is_none());
104 }
105 
106 #[test]
get_ttl_without_previous_set()107 fn get_ttl_without_previous_set() {
108     init();
109 
110     let listener = TcpListener::bind(any_local_address()).unwrap();
111 
112     // expect a get TTL to work w/o any previous set_ttl
113     listener.ttl().expect("unable to get TTL for TCP listener");
114     assert!(listener.take_error().unwrap().is_none());
115 }
116 
117 #[cfg(unix)]
118 #[test]
raw_fd()119 fn raw_fd() {
120     init();
121 
122     let listener = TcpListener::bind(any_local_address()).unwrap();
123     let address = listener.local_addr().unwrap();
124 
125     let raw_fd1 = listener.as_raw_fd();
126     let raw_fd2 = listener.into_raw_fd();
127     assert_eq!(raw_fd1, raw_fd2);
128 
129     let listener = unsafe { TcpListener::from_raw_fd(raw_fd2) };
130     assert_eq!(listener.as_raw_fd(), raw_fd1);
131     assert_eq!(listener.local_addr().unwrap(), address);
132 }
133 
134 #[test]
registering()135 fn registering() {
136     let (mut poll, mut events) = init_with_poll();
137 
138     let mut stream = TcpListener::bind(any_local_address()).unwrap();
139 
140     poll.registry()
141         .register(&mut stream, ID1, Interest::READABLE)
142         .expect("unable to register TCP listener");
143 
144     expect_no_events(&mut poll, &mut events);
145 
146     // NOTE: more tests are done in the smoke tests above.
147 }
148 
149 #[test]
reregister()150 fn reregister() {
151     let (mut poll, mut events) = init_with_poll();
152 
153     let mut listener = TcpListener::bind(any_local_address()).unwrap();
154     let address = listener.local_addr().unwrap();
155 
156     poll.registry()
157         .register(&mut listener, ID1, Interest::READABLE)
158         .unwrap();
159     poll.registry()
160         .reregister(&mut listener, ID2, Interest::READABLE)
161         .unwrap();
162 
163     let barrier = Arc::new(Barrier::new(2));
164     let thread_handle = start_connections(address, 1, barrier.clone());
165 
166     expect_events(
167         &mut poll,
168         &mut events,
169         vec![ExpectEvent::new(ID2, Interest::READABLE)],
170     );
171 
172     let (stream, peer_address) = listener.accept().expect("unable to accept connection");
173     assert!(peer_address.ip().is_loopback());
174     assert_eq!(stream.peer_addr().unwrap(), peer_address);
175     assert_eq!(stream.local_addr().unwrap(), address);
176 
177     assert_would_block(listener.accept());
178 
179     assert!(listener.take_error().unwrap().is_none());
180 
181     barrier.wait();
182     thread_handle.join().expect("unable to join thread");
183 }
184 
185 #[test]
no_events_after_deregister()186 fn no_events_after_deregister() {
187     let (mut poll, mut events) = init_with_poll();
188 
189     let mut listener = TcpListener::bind(any_local_address()).unwrap();
190     let address = listener.local_addr().unwrap();
191 
192     poll.registry()
193         .register(&mut listener, ID1, Interest::READABLE)
194         .unwrap();
195 
196     let barrier = Arc::new(Barrier::new(2));
197     let thread_handle = start_connections(address, 1, barrier.clone());
198 
199     poll.registry().deregister(&mut listener).unwrap();
200 
201     expect_no_events(&mut poll, &mut events);
202 
203     // Should still be able to accept the connection.
204     let (stream, peer_address) = listener.accept().expect("unable to accept connection");
205     assert!(peer_address.ip().is_loopback());
206     assert_eq!(stream.peer_addr().unwrap(), peer_address);
207     assert_eq!(stream.local_addr().unwrap(), address);
208 
209     assert_would_block(listener.accept());
210 
211     assert!(listener.take_error().unwrap().is_none());
212 
213     barrier.wait();
214     thread_handle.join().expect("unable to join thread");
215 }
216 
217 /// This tests reregister on successful accept works
218 #[test]
tcp_listener_two_streams()219 fn tcp_listener_two_streams() {
220     let (mut poll1, mut events) = init_with_poll();
221 
222     let mut listener = TcpListener::bind(any_local_address()).unwrap();
223     let address = listener.local_addr().unwrap();
224 
225     let barrier = Arc::new(Barrier::new(3));
226     let thread_handle1 = start_connections(address, 1, barrier.clone());
227 
228     poll1
229         .registry()
230         .register(&mut listener, ID1, Interest::READABLE)
231         .unwrap();
232 
233     expect_events(
234         &mut poll1,
235         &mut events,
236         vec![ExpectEvent::new(ID1, Interest::READABLE)],
237     );
238 
239     {
240         let (stream, peer_address) = listener.accept().expect("unable to accept connection");
241         assert!(peer_address.ip().is_loopback());
242         assert_eq!(stream.peer_addr().unwrap(), peer_address);
243         assert_eq!(stream.local_addr().unwrap(), address);
244     }
245 
246     assert_would_block(listener.accept());
247 
248     let thread_handle2 = start_connections(address, 1, barrier.clone());
249 
250     expect_events(
251         &mut poll1,
252         &mut events,
253         vec![ExpectEvent::new(ID1, Interest::READABLE)],
254     );
255 
256     {
257         let (stream, peer_address) = listener.accept().expect("unable to accept connection");
258         assert!(peer_address.ip().is_loopback());
259         assert_eq!(stream.peer_addr().unwrap(), peer_address);
260         assert_eq!(stream.local_addr().unwrap(), address);
261     }
262 
263     expect_no_events(&mut poll1, &mut events);
264 
265     barrier.wait();
266     thread_handle1.join().expect("unable to join thread");
267     thread_handle2.join().expect("unable to join thread");
268 }
269 
270 /// Start `n_connections` connections to `address`. If a `barrier` is provided
271 /// it will wait on it after each connection is made before it is dropped.
start_connections( address: SocketAddr, n_connections: usize, barrier: Arc<Barrier>, ) -> thread::JoinHandle<()>272 fn start_connections(
273     address: SocketAddr,
274     n_connections: usize,
275     barrier: Arc<Barrier>,
276 ) -> thread::JoinHandle<()> {
277     thread::spawn(move || {
278         for _ in 0..n_connections {
279             let conn = net::TcpStream::connect(address).unwrap();
280             barrier.wait();
281             drop(conn);
282         }
283     })
284 }
285