1 use mio::{Events, Poll, PollOpt, Ready, Token};
2 use mio::net::UdpSocket;
3 use bytes::{Buf, RingBuf, SliceBuf, MutBuf};
4 use std::io::ErrorKind;
5 use std::str;
6 use std::time;
7 use localhost;
8 use iovec::IoVec;
9 
10 const LISTENER: Token = Token(0);
11 const SENDER: Token = Token(1);
12 
13 pub struct UdpHandlerSendRecv {
14     tx: UdpSocket,
15     rx: UdpSocket,
16     msg: &'static str,
17     buf: SliceBuf<'static>,
18     rx_buf: RingBuf,
19     connected: bool,
20     shutdown: bool,
21 }
22 
23 impl UdpHandlerSendRecv {
new(tx: UdpSocket, rx: UdpSocket, connected: bool, msg : &'static str) -> UdpHandlerSendRecv24     fn new(tx: UdpSocket, rx: UdpSocket, connected: bool, msg : &'static str) -> UdpHandlerSendRecv {
25         UdpHandlerSendRecv {
26             tx,
27             rx,
28             msg,
29             buf: SliceBuf::wrap(msg.as_bytes()),
30             rx_buf: RingBuf::new(1024),
31             connected,
32             shutdown: false,
33         }
34     }
35 }
36 
assert_send<T: Send>()37 fn assert_send<T: Send>() {
38 }
39 
assert_sync<T: Sync>()40 fn assert_sync<T: Sync>() {
41 }
42 
43 #[cfg(test)]
test_send_recv_udp(tx: UdpSocket, rx: UdpSocket, connected: bool)44 fn test_send_recv_udp(tx: UdpSocket, rx: UdpSocket, connected: bool) {
45     debug!("Starting TEST_UDP_SOCKETS");
46     let poll = Poll::new().unwrap();
47 
48     assert_send::<UdpSocket>();
49     assert_sync::<UdpSocket>();
50 
51     // ensure that the sockets are non-blocking
52     let mut buf = [0; 128];
53     assert_eq!(ErrorKind::WouldBlock, rx.recv_from(&mut buf).unwrap_err().kind());
54 
55     info!("Registering SENDER");
56     poll.register(&tx, SENDER, Ready::writable(), PollOpt::edge()).unwrap();
57 
58     info!("Registering LISTENER");
59     poll.register(&rx, LISTENER, Ready::readable(), PollOpt::edge()).unwrap();
60 
61     let mut events = Events::with_capacity(1024);
62 
63     info!("Starting event loop to test with...");
64     let mut handler = UdpHandlerSendRecv::new(tx, rx, connected, "hello world");
65 
66     while !handler.shutdown {
67         poll.poll(&mut events, None).unwrap();
68 
69         for event in &events {
70             if event.readiness().is_readable() {
71                 if let LISTENER = event.token() {
72                     debug!("We are receiving a datagram now...");
73                     let cnt = unsafe {
74                         if !handler.connected {
75                             handler.rx.recv_from(handler.rx_buf.mut_bytes()).unwrap().0
76                         } else {
77                             handler.rx.recv(handler.rx_buf.mut_bytes()).unwrap()
78                         }
79                     };
80 
81                     unsafe { MutBuf::advance(&mut handler.rx_buf, cnt); }
82                     assert!(str::from_utf8(handler.rx_buf.bytes()).unwrap() == handler.msg);
83                     handler.shutdown = true;
84                 }
85             }
86 
87             if event.readiness().is_writable() {
88                 if let SENDER = event.token() {
89                     let cnt = if !handler.connected {
90                         let addr = handler.rx.local_addr().unwrap();
91                         handler.tx.send_to(handler.buf.bytes(), &addr).unwrap()
92                     } else {
93                         handler.tx.send(handler.buf.bytes()).unwrap()
94                     };
95 
96                     handler.buf.advance(cnt);
97                 }
98             }
99         }
100     }
101 }
102 
103 /// Returns the sender and the receiver
connected_sockets() -> (UdpSocket, UdpSocket)104 fn connected_sockets() -> (UdpSocket, UdpSocket) {
105     let addr = localhost();
106     let any = localhost();
107 
108     let tx = UdpSocket::bind(&any).unwrap();
109     let rx = UdpSocket::bind(&addr).unwrap();
110 
111     let tx_addr = tx.local_addr().unwrap();
112     let rx_addr = rx.local_addr().unwrap();
113 
114     assert!(tx.connect(rx_addr).is_ok());
115     assert!(rx.connect(tx_addr).is_ok());
116 
117     (tx, rx)
118 }
119 
120 #[test]
test_udp_socket()121 pub fn test_udp_socket() {
122     let addr = localhost();
123     let any = localhost();
124 
125     let tx = UdpSocket::bind(&any).unwrap();
126     let rx = UdpSocket::bind(&addr).unwrap();
127 
128     test_send_recv_udp(tx, rx, false);
129 }
130 
131 #[test]
test_udp_socket_send_recv()132 pub fn test_udp_socket_send_recv() {
133     let (tx, rx) = connected_sockets();
134 
135     test_send_recv_udp(tx, rx, true);
136 }
137 
138 #[test]
test_udp_socket_discard()139 pub fn test_udp_socket_discard() {
140     let addr = localhost();
141     let any = localhost();
142     let outside = localhost();
143 
144     let tx = UdpSocket::bind(&any).unwrap();
145     let rx = UdpSocket::bind(&addr).unwrap();
146     let udp_outside = UdpSocket::bind(&outside).unwrap();
147 
148     let tx_addr = tx.local_addr().unwrap();
149     let rx_addr = rx.local_addr().unwrap();
150 
151     assert!(tx.connect(rx_addr).is_ok());
152     assert!(udp_outside.connect(rx_addr).is_ok());
153     assert!(rx.connect(tx_addr).is_ok());
154 
155     let poll = Poll::new().unwrap();
156 
157     let r = udp_outside.send(b"hello world");
158     assert!(r.is_ok() || r.unwrap_err().kind() == ErrorKind::WouldBlock);
159 
160     poll.register(&rx, LISTENER, Ready::readable(), PollOpt::edge()).unwrap();
161     poll.register(&tx, SENDER, Ready::writable(), PollOpt::edge()).unwrap();
162 
163     let mut events = Events::with_capacity(1024);
164 
165     poll.poll(&mut events, Some(time::Duration::from_secs(5))).unwrap();
166 
167     for event in &events {
168         if event.readiness().is_readable() {
169             if let LISTENER = event.token() {
170                 assert!(false, "Expected to no receive a packet but got something")
171             }
172         }
173     }
174 }
175 
176 #[cfg(all(unix, not(target_os = "fuchsia")))]
177 #[test]
test_udp_socket_send_recv_bufs()178 pub fn test_udp_socket_send_recv_bufs() {
179     let (tx, rx) = connected_sockets();
180 
181     let poll = Poll::new().unwrap();
182 
183     poll.register(&tx, SENDER, Ready::writable(), PollOpt::edge())
184         .unwrap();
185 
186     poll.register(&rx, LISTENER, Ready::readable(), PollOpt::edge())
187         .unwrap();
188 
189     let mut events = Events::with_capacity(1024);
190 
191     let data = b"hello, world";
192     let write_bufs: Vec<_> = vec![b"hello, " as &[u8], b"world"]
193         .into_iter()
194         .flat_map(IoVec::from_bytes)
195         .collect();
196     let (a, b, c) = (
197         &mut [0u8; 4] as &mut [u8],
198         &mut [0u8; 6] as &mut [u8],
199         &mut [0u8; 8] as &mut [u8],
200     );
201     let mut read_bufs: Vec<_> = vec![a, b, c]
202         .into_iter()
203         .flat_map(IoVec::from_bytes_mut)
204         .collect();
205 
206     let times = 5;
207     let mut rtimes = 0;
208     let mut wtimes = 0;
209 
210     'outer: loop {
211         poll.poll(&mut events, None).unwrap();
212 
213         for event in &events {
214             if event.readiness().is_readable() {
215                 if let LISTENER = event.token() {
216                     loop {
217                         let cnt = match rx.recv_bufs(read_bufs.as_mut()) {
218                             Ok(cnt) => cnt,
219                             Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
220                             Err(e) => panic!("read error {}", e),
221                         };
222                         assert_eq!(cnt, data.len());
223                         let res: Vec<u8> = read_bufs
224                             .iter()
225                             .flat_map(|buf| buf.iter())
226                             .cloned()
227                             .collect();
228                         assert_eq!(&res[..cnt], &data[..cnt]);
229                         rtimes += 1;
230                         if rtimes == times {
231                             break 'outer;
232                         }
233                     }
234                 }
235             }
236 
237             if event.readiness().is_writable() {
238                 if let SENDER = event.token() {
239                     while wtimes < times {
240                         let cnt = match tx.send_bufs(write_bufs.as_slice()) {
241                             Ok(cnt) => cnt,
242                             Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
243                             Err(e) => panic!("write error {}", e),
244                         };
245                         assert_eq!(cnt, data.len());
246                         wtimes += 1;
247                     }
248                 }
249             }
250         }
251     }
252 }
253