1 use mio::{Events, Poll, PollOpt, Ready, Token};
2 use mio::net::UdpSocket;
3 use bytes::{Buf, RingBuf, SliceBuf, MutBuf};
4 use std::io::ErrorKind;
5 use std::str;
6 use std::time;
7 use localhost;
8 use iovec::IoVec;
9
10 const LISTENER: Token = Token(0);
11 const SENDER: Token = Token(1);
12
13 pub struct UdpHandlerSendRecv {
14 tx: UdpSocket,
15 rx: UdpSocket,
16 msg: &'static str,
17 buf: SliceBuf<'static>,
18 rx_buf: RingBuf,
19 connected: bool,
20 shutdown: bool,
21 }
22
23 impl UdpHandlerSendRecv {
new(tx: UdpSocket, rx: UdpSocket, connected: bool, msg : &'static str) -> UdpHandlerSendRecv24 fn new(tx: UdpSocket, rx: UdpSocket, connected: bool, msg : &'static str) -> UdpHandlerSendRecv {
25 UdpHandlerSendRecv {
26 tx,
27 rx,
28 msg,
29 buf: SliceBuf::wrap(msg.as_bytes()),
30 rx_buf: RingBuf::new(1024),
31 connected,
32 shutdown: false,
33 }
34 }
35 }
36
assert_send<T: Send>()37 fn assert_send<T: Send>() {
38 }
39
assert_sync<T: Sync>()40 fn assert_sync<T: Sync>() {
41 }
42
43 #[cfg(test)]
test_send_recv_udp(tx: UdpSocket, rx: UdpSocket, connected: bool)44 fn test_send_recv_udp(tx: UdpSocket, rx: UdpSocket, connected: bool) {
45 debug!("Starting TEST_UDP_SOCKETS");
46 let poll = Poll::new().unwrap();
47
48 assert_send::<UdpSocket>();
49 assert_sync::<UdpSocket>();
50
51 // ensure that the sockets are non-blocking
52 let mut buf = [0; 128];
53 assert_eq!(ErrorKind::WouldBlock, rx.recv_from(&mut buf).unwrap_err().kind());
54
55 info!("Registering SENDER");
56 poll.register(&tx, SENDER, Ready::writable(), PollOpt::edge()).unwrap();
57
58 info!("Registering LISTENER");
59 poll.register(&rx, LISTENER, Ready::readable(), PollOpt::edge()).unwrap();
60
61 let mut events = Events::with_capacity(1024);
62
63 info!("Starting event loop to test with...");
64 let mut handler = UdpHandlerSendRecv::new(tx, rx, connected, "hello world");
65
66 while !handler.shutdown {
67 poll.poll(&mut events, None).unwrap();
68
69 for event in &events {
70 if event.readiness().is_readable() {
71 if let LISTENER = event.token() {
72 debug!("We are receiving a datagram now...");
73 let cnt = unsafe {
74 if !handler.connected {
75 handler.rx.recv_from(handler.rx_buf.mut_bytes()).unwrap().0
76 } else {
77 handler.rx.recv(handler.rx_buf.mut_bytes()).unwrap()
78 }
79 };
80
81 unsafe { MutBuf::advance(&mut handler.rx_buf, cnt); }
82 assert!(str::from_utf8(handler.rx_buf.bytes()).unwrap() == handler.msg);
83 handler.shutdown = true;
84 }
85 }
86
87 if event.readiness().is_writable() {
88 if let SENDER = event.token() {
89 let cnt = if !handler.connected {
90 let addr = handler.rx.local_addr().unwrap();
91 handler.tx.send_to(handler.buf.bytes(), &addr).unwrap()
92 } else {
93 handler.tx.send(handler.buf.bytes()).unwrap()
94 };
95
96 handler.buf.advance(cnt);
97 }
98 }
99 }
100 }
101 }
102
103 /// Returns the sender and the receiver
connected_sockets() -> (UdpSocket, UdpSocket)104 fn connected_sockets() -> (UdpSocket, UdpSocket) {
105 let addr = localhost();
106 let any = localhost();
107
108 let tx = UdpSocket::bind(&any).unwrap();
109 let rx = UdpSocket::bind(&addr).unwrap();
110
111 let tx_addr = tx.local_addr().unwrap();
112 let rx_addr = rx.local_addr().unwrap();
113
114 assert!(tx.connect(rx_addr).is_ok());
115 assert!(rx.connect(tx_addr).is_ok());
116
117 (tx, rx)
118 }
119
120 #[test]
test_udp_socket()121 pub fn test_udp_socket() {
122 let addr = localhost();
123 let any = localhost();
124
125 let tx = UdpSocket::bind(&any).unwrap();
126 let rx = UdpSocket::bind(&addr).unwrap();
127
128 test_send_recv_udp(tx, rx, false);
129 }
130
131 #[test]
test_udp_socket_send_recv()132 pub fn test_udp_socket_send_recv() {
133 let (tx, rx) = connected_sockets();
134
135 test_send_recv_udp(tx, rx, true);
136 }
137
138 #[test]
test_udp_socket_discard()139 pub fn test_udp_socket_discard() {
140 let addr = localhost();
141 let any = localhost();
142 let outside = localhost();
143
144 let tx = UdpSocket::bind(&any).unwrap();
145 let rx = UdpSocket::bind(&addr).unwrap();
146 let udp_outside = UdpSocket::bind(&outside).unwrap();
147
148 let tx_addr = tx.local_addr().unwrap();
149 let rx_addr = rx.local_addr().unwrap();
150
151 assert!(tx.connect(rx_addr).is_ok());
152 assert!(udp_outside.connect(rx_addr).is_ok());
153 assert!(rx.connect(tx_addr).is_ok());
154
155 let poll = Poll::new().unwrap();
156
157 let r = udp_outside.send(b"hello world");
158 assert!(r.is_ok() || r.unwrap_err().kind() == ErrorKind::WouldBlock);
159
160 poll.register(&rx, LISTENER, Ready::readable(), PollOpt::edge()).unwrap();
161 poll.register(&tx, SENDER, Ready::writable(), PollOpt::edge()).unwrap();
162
163 let mut events = Events::with_capacity(1024);
164
165 poll.poll(&mut events, Some(time::Duration::from_secs(5))).unwrap();
166
167 for event in &events {
168 if event.readiness().is_readable() {
169 if let LISTENER = event.token() {
170 assert!(false, "Expected to no receive a packet but got something")
171 }
172 }
173 }
174 }
175
176 #[cfg(all(unix, not(target_os = "fuchsia")))]
177 #[test]
test_udp_socket_send_recv_bufs()178 pub fn test_udp_socket_send_recv_bufs() {
179 let (tx, rx) = connected_sockets();
180
181 let poll = Poll::new().unwrap();
182
183 poll.register(&tx, SENDER, Ready::writable(), PollOpt::edge())
184 .unwrap();
185
186 poll.register(&rx, LISTENER, Ready::readable(), PollOpt::edge())
187 .unwrap();
188
189 let mut events = Events::with_capacity(1024);
190
191 let data = b"hello, world";
192 let write_bufs: Vec<_> = vec![b"hello, " as &[u8], b"world"]
193 .into_iter()
194 .flat_map(IoVec::from_bytes)
195 .collect();
196 let (a, b, c) = (
197 &mut [0u8; 4] as &mut [u8],
198 &mut [0u8; 6] as &mut [u8],
199 &mut [0u8; 8] as &mut [u8],
200 );
201 let mut read_bufs: Vec<_> = vec![a, b, c]
202 .into_iter()
203 .flat_map(IoVec::from_bytes_mut)
204 .collect();
205
206 let times = 5;
207 let mut rtimes = 0;
208 let mut wtimes = 0;
209
210 'outer: loop {
211 poll.poll(&mut events, None).unwrap();
212
213 for event in &events {
214 if event.readiness().is_readable() {
215 if let LISTENER = event.token() {
216 loop {
217 let cnt = match rx.recv_bufs(read_bufs.as_mut()) {
218 Ok(cnt) => cnt,
219 Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
220 Err(e) => panic!("read error {}", e),
221 };
222 assert_eq!(cnt, data.len());
223 let res: Vec<u8> = read_bufs
224 .iter()
225 .flat_map(|buf| buf.iter())
226 .cloned()
227 .collect();
228 assert_eq!(&res[..cnt], &data[..cnt]);
229 rtimes += 1;
230 if rtimes == times {
231 break 'outer;
232 }
233 }
234 }
235 }
236
237 if event.readiness().is_writable() {
238 if let SENDER = event.token() {
239 while wtimes < times {
240 let cnt = match tx.send_bufs(write_bufs.as_slice()) {
241 Ok(cnt) => cnt,
242 Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
243 Err(e) => panic!("write error {}", e),
244 };
245 assert_eq!(cnt, data.len());
246 wtimes += 1;
247 }
248 }
249 }
250 }
251 }
252 }
253