1 extern crate futures;
2 extern crate tokio_udp;
3 extern crate tokio_codec;
4 #[macro_use]
5 extern crate tokio_io;
6 extern crate bytes;
7 extern crate env_logger;
8 
9 use std::io;
10 use std::net::SocketAddr;
11 
12 use futures::{Future, Poll, Stream, Sink};
13 
14 use tokio_udp::{UdpSocket, UdpFramed};
15 use tokio_codec::{Encoder, Decoder};
16 use bytes::{BytesMut, BufMut};
17 
18 macro_rules! t {
19     ($e:expr) => (match $e {
20         Ok(e) => e,
21         Err(e) => panic!("{} failed with {:?}", stringify!($e), e),
22     })
23 }
24 
send_messages<S: SendFn + Clone, R: RecvFn + Clone>(send: S, recv: R)25 fn send_messages<S: SendFn + Clone, R: RecvFn + Clone>(send: S, recv: R) {
26     let mut a = t!(UdpSocket::bind(&([127, 0, 0, 1], 0).into()));
27     let mut b = t!(UdpSocket::bind(&([127, 0, 0, 1], 0).into()));
28     let a_addr = t!(a.local_addr());
29     let b_addr = t!(b.local_addr());
30 
31     {
32         let send = SendMessage::new(a, send.clone(), b_addr, b"1234");
33         let recv = RecvMessage::new(b, recv.clone(), a_addr, b"1234");
34         let (sendt, received) = t!(send.join(recv).wait());
35         a = sendt;
36         b = received;
37     }
38 
39     {
40         let send = SendMessage::new(a, send, b_addr, b"");
41         let recv = RecvMessage::new(b, recv, a_addr, b"");
42         t!(send.join(recv).wait());
43     }
44 }
45 
46 #[test]
send_to_and_recv_from()47 fn send_to_and_recv_from() {
48    send_messages(SendTo {}, RecvFrom {});
49 }
50 
51 #[test]
send_and_recv()52 fn send_and_recv() {
53     send_messages(Send {}, Recv {});
54 }
55 
56 trait SendFn {
send(&self, &mut UdpSocket, &[u8], &SocketAddr) -> Result<usize, io::Error>57     fn send(&self, &mut UdpSocket, &[u8], &SocketAddr) -> Result<usize, io::Error>;
58 }
59 
60 #[derive(Debug, Clone)]
61 struct SendTo {}
62 
63 impl SendFn for SendTo {
send(&self, socket: &mut UdpSocket, buf: &[u8], addr: &SocketAddr) -> Result<usize, io::Error>64     fn send(&self, socket: &mut UdpSocket, buf: &[u8], addr: &SocketAddr) -> Result<usize, io::Error> {
65         socket.send_to(buf, addr)
66     }
67 }
68 
69 #[derive(Debug, Clone)]
70 struct Send {}
71 
72 impl SendFn for Send {
send(&self, socket: &mut UdpSocket, buf: &[u8], addr: &SocketAddr) -> Result<usize, io::Error>73     fn send(&self, socket: &mut UdpSocket, buf: &[u8], addr: &SocketAddr) -> Result<usize, io::Error> {
74         socket.connect(addr).expect("could not connect");
75         socket.send(buf)
76     }
77 }
78 
79 struct SendMessage<S> {
80     socket: Option<UdpSocket>,
81     send: S,
82     addr: SocketAddr,
83     data: &'static [u8],
84 }
85 
86 impl<S: SendFn> SendMessage<S> {
new(socket: UdpSocket, send: S, addr: SocketAddr, data: &'static [u8]) -> SendMessage<S>87     fn new(socket: UdpSocket, send: S, addr: SocketAddr, data: &'static [u8]) -> SendMessage<S> {
88         SendMessage {
89             socket: Some(socket),
90             send: send,
91             addr: addr,
92             data: data,
93         }
94     }
95 }
96 
97 impl<S: SendFn> Future for SendMessage<S> {
98     type Item = UdpSocket;
99     type Error = io::Error;
100 
poll(&mut self) -> Poll<UdpSocket, io::Error>101     fn poll(&mut self) -> Poll<UdpSocket, io::Error> {
102         let n = try_nb!(self.send.send(self.socket.as_mut().unwrap(), &self.data[..], &self.addr));
103 
104         assert_eq!(n, self.data.len());
105 
106         Ok(self.socket.take().unwrap().into())
107     }
108 }
109 
110 trait RecvFn {
recv(&self, &mut UdpSocket, &mut [u8], &SocketAddr) -> Result<usize, io::Error>111     fn recv(&self, &mut UdpSocket, &mut [u8], &SocketAddr) -> Result<usize, io::Error>;
112 }
113 
114 #[derive(Debug, Clone)]
115 struct RecvFrom {}
116 
117 impl RecvFn for RecvFrom {
recv(&self, socket: &mut UdpSocket, buf: &mut [u8], expected_addr: &SocketAddr) -> Result<usize, io::Error>118     fn recv(&self, socket: &mut UdpSocket, buf: &mut [u8],
119             expected_addr: &SocketAddr) -> Result<usize, io::Error> {
120         socket.recv_from(buf).map(|(s, addr)| {
121             assert_eq!(addr, *expected_addr);
122             s
123         })
124     }
125 }
126 
127 #[derive(Debug, Clone)]
128 struct Recv {}
129 
130 impl RecvFn for Recv {
recv(&self, socket: &mut UdpSocket, buf: &mut [u8], _: &SocketAddr) -> Result<usize, io::Error>131     fn recv(&self, socket: &mut UdpSocket, buf: &mut [u8], _: &SocketAddr) -> Result<usize, io::Error> {
132         socket.recv(buf)
133     }
134 }
135 
136 struct RecvMessage<R> {
137     socket: Option<UdpSocket>,
138     recv: R,
139     expected_addr: SocketAddr,
140     expected_data: &'static [u8],
141 }
142 
143 impl<R: RecvFn> RecvMessage<R> {
new(socket: UdpSocket, recv: R, expected_addr: SocketAddr, expected_data: &'static [u8]) -> RecvMessage<R>144     fn new(socket: UdpSocket, recv: R, expected_addr: SocketAddr,
145            expected_data: &'static [u8]) -> RecvMessage<R> {
146         RecvMessage {
147             socket: Some(socket),
148             recv: recv,
149             expected_addr: expected_addr,
150             expected_data: expected_data,
151         }
152     }
153 }
154 
155 impl<R: RecvFn> Future for RecvMessage<R> {
156     type Item = UdpSocket;
157     type Error = io::Error;
158 
poll(&mut self) -> Poll<UdpSocket, io::Error>159     fn poll(&mut self) -> Poll<UdpSocket, io::Error> {
160         let mut buf = vec![0u8; 10 + self.expected_data.len() * 10];
161         let n = try_nb!(self.recv.recv(&mut self.socket.as_mut().unwrap(), &mut buf[..],
162                                        &self.expected_addr));
163 
164         assert_eq!(n, self.expected_data.len());
165         assert_eq!(&buf[..self.expected_data.len()], &self.expected_data[..]);
166 
167         Ok(self.socket.take().unwrap().into())
168     }
169 }
170 
171 #[test]
send_dgrams()172 fn send_dgrams() {
173     let mut a = t!(UdpSocket::bind(&t!("127.0.0.1:0".parse())));
174     let mut b = t!(UdpSocket::bind(&t!("127.0.0.1:0".parse())));
175     let mut buf = [0u8; 50];
176     let b_addr = t!(b.local_addr());
177 
178     {
179         let send = a.send_dgram(&b"4321"[..], &b_addr);
180         let recv = b.recv_dgram(&mut buf[..]);
181         let (sendt, received) = t!(send.join(recv).wait());
182         assert_eq!(received.2, 4);
183         assert_eq!(&received.1[..4], b"4321");
184         a = sendt.0;
185         b = received.0;
186     }
187 
188     {
189         let send = a.send_dgram(&b""[..], &b_addr);
190         let recv = b.recv_dgram(&mut buf[..]);
191         let received = t!(send.join(recv).wait()).1;
192         assert_eq!(received.2, 0);
193     }
194 }
195 
196 pub struct ByteCodec;
197 
198 impl Decoder for ByteCodec {
199     type Item = Vec<u8>;
200     type Error = io::Error;
201 
decode(&mut self, buf: &mut BytesMut) -> Result<Option<Vec<u8>>, io::Error>202     fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Vec<u8>>, io::Error> {
203         let len = buf.len();
204         Ok(Some(buf.split_to(len).to_vec()))
205     }
206 }
207 
208 impl Encoder for ByteCodec {
209     type Item = Vec<u8>;
210     type Error = io::Error;
211 
encode(&mut self, data: Vec<u8>, buf: &mut BytesMut) -> Result<(), io::Error>212     fn encode(&mut self, data: Vec<u8>, buf: &mut BytesMut) -> Result<(), io::Error> {
213         buf.reserve(data.len());
214         buf.put(data);
215         Ok(())
216     }
217 }
218 
219 #[test]
send_framed()220 fn send_framed() {
221     drop(env_logger::init());
222 
223     let mut a_soc = t!(UdpSocket::bind(&t!("127.0.0.1:0".parse())));
224     let mut b_soc = t!(UdpSocket::bind(&t!("127.0.0.1:0".parse())));
225     let a_addr = t!(a_soc.local_addr());
226     let b_addr = t!(b_soc.local_addr());
227 
228     {
229         let a = UdpFramed::new(a_soc, ByteCodec);
230         let b = UdpFramed::new(b_soc, ByteCodec);
231 
232         let msg = b"4567".to_vec();
233 
234         let send = a.send((msg.clone(), b_addr));
235         let recv = b.into_future().map_err(|e| e.0);
236         let (sendt, received) = t!(send.join(recv).wait());
237 
238         let (data, addr) = received.0.unwrap();
239         assert_eq!(msg, data);
240         assert_eq!(a_addr, addr);
241 
242         a_soc = sendt.into_inner();
243         b_soc = received.1.into_inner();
244     }
245 
246     {
247         let a = UdpFramed::new(a_soc, ByteCodec);
248         let b = UdpFramed::new(b_soc, ByteCodec);
249 
250         let msg = b"".to_vec();
251 
252         let send = a.send((msg.clone(), b_addr));
253         let recv = b.into_future().map_err(|e| e.0);
254         let received = t!(send.join(recv).wait()).1;
255 
256         let (data, addr) = received.0.unwrap();
257         assert_eq!(msg, data);
258         assert_eq!(a_addr, addr);
259     }
260 }
261