1 extern crate futures;
2 #[macro_use]
3 extern crate tokio_core;
4
5 use std::io;
6 use std::net::SocketAddr;
7
8 use futures::{Future, Poll, Stream, Sink};
9 use tokio_core::net::{UdpSocket, UdpCodec};
10 use tokio_core::reactor::Core;
11
12 macro_rules! t {
13 ($e:expr) => (match $e {
14 Ok(e) => e,
15 Err(e) => panic!("{} failed with {:?}", stringify!($e), e),
16 })
17 }
18
send_messages<S: SendFn + Clone, R: RecvFn + Clone>(send: S, recv: R)19 fn send_messages<S: SendFn + Clone, R: RecvFn + Clone>(send: S, recv: R) {
20 let mut l = t!(Core::new());
21 let mut a = t!(UdpSocket::bind(&([127, 0, 0, 1], 0).into(), &l.handle()));
22 let mut b = t!(UdpSocket::bind(&([127, 0, 0, 1], 0).into(), &l.handle()));
23 let a_addr = t!(a.local_addr());
24 let b_addr = t!(b.local_addr());
25
26 {
27 let send = SendMessage::new(a, send.clone(), b_addr, b"1234");
28 let recv = RecvMessage::new(b, recv.clone(), a_addr, b"1234");
29 let (sendt, received) = t!(l.run(send.join(recv)));
30 a = sendt;
31 b = received;
32 }
33
34 {
35 let send = SendMessage::new(a, send, b_addr, b"");
36 let recv = RecvMessage::new(b, recv, a_addr, b"");
37 t!(l.run(send.join(recv)));
38 }
39 }
40
41 #[test]
send_to_and_recv_from()42 fn send_to_and_recv_from() {
43 send_messages(SendTo {}, RecvFrom {});
44 }
45
46 #[test]
send_and_recv()47 fn send_and_recv() {
48 send_messages(Send {}, Recv {});
49 }
50
51 trait SendFn {
send(&self, &UdpSocket, &[u8], &SocketAddr) -> Result<usize, io::Error>52 fn send(&self, &UdpSocket, &[u8], &SocketAddr) -> Result<usize, io::Error>;
53 }
54
55 #[derive(Debug, Clone)]
56 struct SendTo {}
57
58 impl SendFn for SendTo {
send(&self, socket: &UdpSocket, buf: &[u8], addr: &SocketAddr) -> Result<usize, io::Error>59 fn send(&self, socket: &UdpSocket, buf: &[u8], addr: &SocketAddr) -> Result<usize, io::Error> {
60 socket.send_to(buf, addr)
61 }
62 }
63
64 #[derive(Debug, Clone)]
65 struct Send {}
66
67 impl SendFn for Send {
send(&self, socket: &UdpSocket, buf: &[u8], addr: &SocketAddr) -> Result<usize, io::Error>68 fn send(&self, socket: &UdpSocket, buf: &[u8], addr: &SocketAddr) -> Result<usize, io::Error> {
69 socket.connect(addr).expect("could not connect");
70 socket.send(buf)
71 }
72 }
73
74 struct SendMessage<S> {
75 socket: Option<UdpSocket>,
76 send: S,
77 addr: SocketAddr,
78 data: &'static [u8],
79 }
80
81 impl<S: SendFn> SendMessage<S> {
new(socket: UdpSocket, send: S, addr: SocketAddr, data: &'static [u8]) -> SendMessage<S>82 fn new(socket: UdpSocket, send: S, addr: SocketAddr, data: &'static [u8]) -> SendMessage<S> {
83 SendMessage {
84 socket: Some(socket),
85 send: send,
86 addr: addr,
87 data: data,
88 }
89 }
90 }
91
92 impl<S: SendFn> Future for SendMessage<S> {
93 type Item = UdpSocket;
94 type Error = io::Error;
95
poll(&mut self) -> Poll<UdpSocket, io::Error>96 fn poll(&mut self) -> Poll<UdpSocket, io::Error> {
97 let n = try_nb!(self.send.send(self.socket.as_ref().unwrap(), &self.data[..], &self.addr));
98
99 assert_eq!(n, self.data.len());
100
101 Ok(self.socket.take().unwrap().into())
102 }
103 }
104
105 trait RecvFn {
recv(&self, &UdpSocket, &mut [u8], &SocketAddr) -> Result<usize, io::Error>106 fn recv(&self, &UdpSocket, &mut [u8], &SocketAddr) -> Result<usize, io::Error>;
107 }
108
109 #[derive(Debug, Clone)]
110 struct RecvFrom {}
111
112 impl RecvFn for RecvFrom {
recv(&self, socket: &UdpSocket, buf: &mut [u8], expected_addr: &SocketAddr) -> Result<usize, io::Error>113 fn recv(&self, socket: &UdpSocket, buf: &mut [u8],
114 expected_addr: &SocketAddr) -> Result<usize, io::Error> {
115 socket.recv_from(buf).map(|(s, addr)| {
116 assert_eq!(addr, *expected_addr);
117 s
118 })
119 }
120 }
121
122 #[derive(Debug, Clone)]
123 struct Recv {}
124
125 impl RecvFn for Recv {
recv(&self, socket: &UdpSocket, buf: &mut [u8], _: &SocketAddr) -> Result<usize, io::Error>126 fn recv(&self, socket: &UdpSocket, buf: &mut [u8], _: &SocketAddr) -> Result<usize, io::Error> {
127 socket.recv(buf)
128 }
129 }
130
131 struct RecvMessage<R> {
132 socket: Option<UdpSocket>,
133 recv: R,
134 expected_addr: SocketAddr,
135 expected_data: &'static [u8],
136 }
137
138 impl<R: RecvFn> RecvMessage<R> {
new(socket: UdpSocket, recv: R, expected_addr: SocketAddr, expected_data: &'static [u8]) -> RecvMessage<R>139 fn new(socket: UdpSocket, recv: R, expected_addr: SocketAddr,
140 expected_data: &'static [u8]) -> RecvMessage<R> {
141 RecvMessage {
142 socket: Some(socket),
143 recv: recv,
144 expected_addr: expected_addr,
145 expected_data: expected_data,
146 }
147 }
148 }
149
150 impl<R: RecvFn> Future for RecvMessage<R> {
151 type Item = UdpSocket;
152 type Error = io::Error;
153
poll(&mut self) -> Poll<UdpSocket, io::Error>154 fn poll(&mut self) -> Poll<UdpSocket, io::Error> {
155 let mut buf = vec![0u8; 10 + self.expected_data.len() * 10];
156 let n = try_nb!(self.recv.recv(&self.socket.as_ref().unwrap(), &mut buf[..],
157 &self.expected_addr));
158
159 assert_eq!(n, self.expected_data.len());
160 assert_eq!(&buf[..self.expected_data.len()], &self.expected_data[..]);
161
162 Ok(self.socket.take().unwrap().into())
163 }
164 }
165
166 #[test]
send_dgrams()167 fn send_dgrams() {
168 let mut l = t!(Core::new());
169 let mut a = t!(UdpSocket::bind(&t!("127.0.0.1:0".parse()), &l.handle()));
170 let mut b = t!(UdpSocket::bind(&t!("127.0.0.1:0".parse()), &l.handle()));
171 let mut buf = [0u8; 50];
172 let b_addr = t!(b.local_addr());
173
174 {
175 let send = a.send_dgram(&b"4321"[..], b_addr);
176 let recv = b.recv_dgram(&mut buf[..]);
177 let (sendt, received) = t!(l.run(send.join(recv)));
178 assert_eq!(received.2, 4);
179 assert_eq!(&received.1[..4], b"4321");
180 a = sendt.0;
181 b = received.0;
182 }
183
184 {
185 let send = a.send_dgram(&b""[..], b_addr);
186 let recv = b.recv_dgram(&mut buf[..]);
187 let received = t!(l.run(send.join(recv))).1;
188 assert_eq!(received.2, 0);
189 }
190 }
191
192 #[derive(Debug, Clone)]
193 struct Codec {
194 data: &'static [u8],
195 from: SocketAddr,
196 to: SocketAddr,
197 }
198
199 impl UdpCodec for Codec {
200 type In = ();
201 type Out = &'static [u8];
202
decode(&mut self, src: &SocketAddr, buf: &[u8]) -> io::Result<Self::In>203 fn decode(&mut self, src: &SocketAddr, buf: &[u8]) -> io::Result<Self::In> {
204 assert_eq!(src, &self.from);
205 assert_eq!(buf, self.data);
206 Ok(())
207 }
208
encode(&mut self, msg: Self::Out, buf: &mut Vec<u8>) -> SocketAddr209 fn encode(&mut self, msg: Self::Out, buf: &mut Vec<u8>) -> SocketAddr {
210 assert_eq!(msg, self.data);
211 buf.extend_from_slice(msg);
212 self.to
213 }
214 }
215
216 #[test]
send_framed()217 fn send_framed() {
218 let mut l = t!(Core::new());
219 let mut a_soc = t!(UdpSocket::bind(&t!("127.0.0.1:0".parse()), &l.handle()));
220 let mut b_soc = t!(UdpSocket::bind(&t!("127.0.0.1:0".parse()), &l.handle()));
221 let a_addr = t!(a_soc.local_addr());
222 let b_addr = t!(b_soc.local_addr());
223
224 {
225 let a = a_soc.framed(Codec { data: &b"4567"[..], from: a_addr, to: b_addr});
226 let b = b_soc.framed(Codec { data: &b"4567"[..], from: a_addr, to: b_addr});
227
228 let send = a.send(&b"4567"[..]);
229 let recv = b.into_future().map_err(|e| e.0);
230 let (sendt, received) = t!(l.run(send.join(recv)));
231 assert_eq!(received.0, Some(()));
232
233 a_soc = sendt.into_inner();
234 b_soc = received.1.into_inner();
235 }
236
237 {
238 let a = a_soc.framed(Codec { data: &b""[..], from: a_addr, to: b_addr});
239 let b = b_soc.framed(Codec { data: &b""[..], from: a_addr, to: b_addr});
240
241 let send = a.send(&b""[..]);
242 let recv = b.into_future().map_err(|e| e.0);
243 let received = t!(l.run(send.join(recv))).1;
244 assert_eq!(received.0, Some(()));
245 }
246 }
247
248