1 extern crate futures;
2 #[macro_use]
3 extern crate tokio_core;
4 
5 use std::io;
6 use std::net::SocketAddr;
7 
8 use futures::{Future, Poll, Stream, Sink};
9 use tokio_core::net::{UdpSocket, UdpCodec};
10 use tokio_core::reactor::Core;
11 
12 macro_rules! t {
13     ($e:expr) => (match $e {
14         Ok(e) => e,
15         Err(e) => panic!("{} failed with {:?}", stringify!($e), e),
16     })
17 }
18 
19 fn send_messages<S: SendFn + Clone, R: RecvFn + Clone>(send: S, recv: R) {
20     let mut l = t!(Core::new());
21     let mut a = t!(UdpSocket::bind(&([127, 0, 0, 1], 0).into(), &l.handle()));
22     let mut b = t!(UdpSocket::bind(&([127, 0, 0, 1], 0).into(), &l.handle()));
23     let a_addr = t!(a.local_addr());
24     let b_addr = t!(b.local_addr());
25 
26     {
27         let send = SendMessage::new(a, send.clone(), b_addr, b"1234");
28         let recv = RecvMessage::new(b, recv.clone(), a_addr, b"1234");
29         let (sendt, received) = t!(l.run(send.join(recv)));
30         a = sendt;
31         b = received;
32     }
33 
34     {
35         let send = SendMessage::new(a, send, b_addr, b"");
36         let recv = RecvMessage::new(b, recv, a_addr, b"");
37         t!(l.run(send.join(recv)));
38     }
39 }
40 
41 #[test]
42 fn send_to_and_recv_from() {
43    send_messages(SendTo {}, RecvFrom {});
44 }
45 
46 #[test]
47 fn send_and_recv() {
48     send_messages(Send {}, Recv {});
49 }
50 
51 trait SendFn {
52     fn send(&self, &UdpSocket, &[u8], &SocketAddr) -> Result<usize, io::Error>;
53 }
54 
55 #[derive(Debug, Clone)]
56 struct SendTo {}
57 
58 impl SendFn for SendTo {
59     fn send(&self, socket: &UdpSocket, buf: &[u8], addr: &SocketAddr) -> Result<usize, io::Error> {
60         socket.send_to(buf, addr)
61     }
62 }
63 
64 #[derive(Debug, Clone)]
65 struct Send {}
66 
67 impl SendFn for Send {
68     fn send(&self, socket: &UdpSocket, buf: &[u8], addr: &SocketAddr) -> Result<usize, io::Error> {
69         socket.connect(addr).expect("could not connect");
70         socket.send(buf)
71     }
72 }
73 
74 struct SendMessage<S> {
75     socket: Option<UdpSocket>,
76     send: S,
77     addr: SocketAddr,
78     data: &'static [u8],
79 }
80 
81 impl<S: SendFn> SendMessage<S> {
82     fn new(socket: UdpSocket, send: S, addr: SocketAddr, data: &'static [u8]) -> SendMessage<S> {
83         SendMessage {
84             socket: Some(socket),
85             send: send,
86             addr: addr,
87             data: data,
88         }
89     }
90 }
91 
92 impl<S: SendFn> Future for SendMessage<S> {
93     type Item = UdpSocket;
94     type Error = io::Error;
95 
96     fn poll(&mut self) -> Poll<UdpSocket, io::Error> {
97         let n = try_nb!(self.send.send(self.socket.as_ref().unwrap(), &self.data[..], &self.addr));
98 
99         assert_eq!(n, self.data.len());
100 
101         Ok(self.socket.take().unwrap().into())
102     }
103 }
104 
105 trait RecvFn {
106     fn recv(&self, &UdpSocket, &mut [u8], &SocketAddr) -> Result<usize, io::Error>;
107 }
108 
109 #[derive(Debug, Clone)]
110 struct RecvFrom {}
111 
112 impl RecvFn for RecvFrom {
113     fn recv(&self, socket: &UdpSocket, buf: &mut [u8],
114             expected_addr: &SocketAddr) -> Result<usize, io::Error> {
115         socket.recv_from(buf).map(|(s, addr)| {
116             assert_eq!(addr, *expected_addr);
117             s
118         })
119     }
120 }
121 
122 #[derive(Debug, Clone)]
123 struct Recv {}
124 
125 impl RecvFn for Recv {
126     fn recv(&self, socket: &UdpSocket, buf: &mut [u8], _: &SocketAddr) -> Result<usize, io::Error> {
127         socket.recv(buf)
128     }
129 }
130 
131 struct RecvMessage<R> {
132     socket: Option<UdpSocket>,
133     recv: R,
134     expected_addr: SocketAddr,
135     expected_data: &'static [u8],
136 }
137 
138 impl<R: RecvFn> RecvMessage<R> {
139     fn new(socket: UdpSocket, recv: R, expected_addr: SocketAddr,
140            expected_data: &'static [u8]) -> RecvMessage<R> {
141         RecvMessage {
142             socket: Some(socket),
143             recv: recv,
144             expected_addr: expected_addr,
145             expected_data: expected_data,
146         }
147     }
148 }
149 
150 impl<R: RecvFn> Future for RecvMessage<R> {
151     type Item = UdpSocket;
152     type Error = io::Error;
153 
154     fn poll(&mut self) -> Poll<UdpSocket, io::Error> {
155         let mut buf = vec![0u8; 10 + self.expected_data.len() * 10];
156         let n = try_nb!(self.recv.recv(&self.socket.as_ref().unwrap(), &mut buf[..],
157                                        &self.expected_addr));
158 
159         assert_eq!(n, self.expected_data.len());
160         assert_eq!(&buf[..self.expected_data.len()], &self.expected_data[..]);
161 
162         Ok(self.socket.take().unwrap().into())
163     }
164 }
165 
166 #[test]
167 fn send_dgrams() {
168     let mut l = t!(Core::new());
169     let mut a = t!(UdpSocket::bind(&t!("127.0.0.1:0".parse()), &l.handle()));
170     let mut b = t!(UdpSocket::bind(&t!("127.0.0.1:0".parse()), &l.handle()));
171     let mut buf = [0u8; 50];
172     let b_addr = t!(b.local_addr());
173 
174     {
175         let send = a.send_dgram(&b"4321"[..], b_addr);
176         let recv = b.recv_dgram(&mut buf[..]);
177         let (sendt, received) = t!(l.run(send.join(recv)));
178         assert_eq!(received.2, 4);
179         assert_eq!(&received.1[..4], b"4321");
180         a = sendt.0;
181         b = received.0;
182     }
183 
184     {
185         let send = a.send_dgram(&b""[..], b_addr);
186         let recv = b.recv_dgram(&mut buf[..]);
187         let received = t!(l.run(send.join(recv))).1;
188         assert_eq!(received.2, 0);
189     }
190 }
191 
192 #[derive(Debug, Clone)]
193 struct Codec {
194     data: &'static [u8],
195     from: SocketAddr,
196     to: SocketAddr,
197 }
198 
199 impl UdpCodec for Codec {
200     type In = ();
201     type Out = &'static [u8];
202 
203     fn decode(&mut self, src: &SocketAddr, buf: &[u8]) -> io::Result<Self::In> {
204         assert_eq!(src, &self.from);
205         assert_eq!(buf, self.data);
206         Ok(())
207     }
208 
209     fn encode(&mut self, msg: Self::Out, buf: &mut Vec<u8>) -> SocketAddr {
210         assert_eq!(msg, self.data);
211         buf.extend_from_slice(msg);
212         self.to
213     }
214 }
215 
216 #[test]
217 fn send_framed() {
218     let mut l = t!(Core::new());
219     let mut a_soc = t!(UdpSocket::bind(&t!("127.0.0.1:0".parse()), &l.handle()));
220     let mut b_soc = t!(UdpSocket::bind(&t!("127.0.0.1:0".parse()), &l.handle()));
221     let a_addr = t!(a_soc.local_addr());
222     let b_addr = t!(b_soc.local_addr());
223 
224     {
225         let a = a_soc.framed(Codec { data: &b"4567"[..], from: a_addr, to: b_addr});
226         let b = b_soc.framed(Codec { data: &b"4567"[..], from: a_addr, to: b_addr});
227 
228         let send = a.send(&b"4567"[..]);
229         let recv = b.into_future().map_err(|e| e.0);
230         let (sendt, received) = t!(l.run(send.join(recv)));
231         assert_eq!(received.0, Some(()));
232 
233         a_soc = sendt.into_inner();
234         b_soc = received.1.into_inner();
235     }
236 
237     {
238         let a = a_soc.framed(Codec { data: &b""[..], from: a_addr, to: b_addr});
239         let b = b_soc.framed(Codec { data: &b""[..], from: a_addr, to: b_addr});
240 
241         let send = a.send(&b""[..]);
242         let recv = b.into_future().map_err(|e| e.0);
243         let received = t!(l.run(send.join(recv))).1;
244         assert_eq!(received.0, Some(()));
245     }
246 }
247 
248