1 extern crate futures; 2 #[macro_use] 3 extern crate tokio_core; 4 5 use std::io; 6 use std::net::SocketAddr; 7 8 use futures::{Future, Poll, Stream, Sink}; 9 use tokio_core::net::{UdpSocket, UdpCodec}; 10 use tokio_core::reactor::Core; 11 12 macro_rules! t { 13 ($e:expr) => (match $e { 14 Ok(e) => e, 15 Err(e) => panic!("{} failed with {:?}", stringify!($e), e), 16 }) 17 } 18 19 fn send_messages<S: SendFn + Clone, R: RecvFn + Clone>(send: S, recv: R) { 20 let mut l = t!(Core::new()); 21 let mut a = t!(UdpSocket::bind(&([127, 0, 0, 1], 0).into(), &l.handle())); 22 let mut b = t!(UdpSocket::bind(&([127, 0, 0, 1], 0).into(), &l.handle())); 23 let a_addr = t!(a.local_addr()); 24 let b_addr = t!(b.local_addr()); 25 26 { 27 let send = SendMessage::new(a, send.clone(), b_addr, b"1234"); 28 let recv = RecvMessage::new(b, recv.clone(), a_addr, b"1234"); 29 let (sendt, received) = t!(l.run(send.join(recv))); 30 a = sendt; 31 b = received; 32 } 33 34 { 35 let send = SendMessage::new(a, send, b_addr, b""); 36 let recv = RecvMessage::new(b, recv, a_addr, b""); 37 t!(l.run(send.join(recv))); 38 } 39 } 40 41 #[test] 42 fn send_to_and_recv_from() { 43 send_messages(SendTo {}, RecvFrom {}); 44 } 45 46 #[test] 47 fn send_and_recv() { 48 send_messages(Send {}, Recv {}); 49 } 50 51 trait SendFn { 52 fn send(&self, &UdpSocket, &[u8], &SocketAddr) -> Result<usize, io::Error>; 53 } 54 55 #[derive(Debug, Clone)] 56 struct SendTo {} 57 58 impl SendFn for SendTo { 59 fn send(&self, socket: &UdpSocket, buf: &[u8], addr: &SocketAddr) -> Result<usize, io::Error> { 60 socket.send_to(buf, addr) 61 } 62 } 63 64 #[derive(Debug, Clone)] 65 struct Send {} 66 67 impl SendFn for Send { 68 fn send(&self, socket: &UdpSocket, buf: &[u8], addr: &SocketAddr) -> Result<usize, io::Error> { 69 socket.connect(addr).expect("could not connect"); 70 socket.send(buf) 71 } 72 } 73 74 struct SendMessage<S> { 75 socket: Option<UdpSocket>, 76 send: S, 77 addr: SocketAddr, 78 data: &'static [u8], 79 } 80 81 impl<S: SendFn> SendMessage<S> { 82 fn new(socket: UdpSocket, send: S, addr: SocketAddr, data: &'static [u8]) -> SendMessage<S> { 83 SendMessage { 84 socket: Some(socket), 85 send: send, 86 addr: addr, 87 data: data, 88 } 89 } 90 } 91 92 impl<S: SendFn> Future for SendMessage<S> { 93 type Item = UdpSocket; 94 type Error = io::Error; 95 96 fn poll(&mut self) -> Poll<UdpSocket, io::Error> { 97 let n = try_nb!(self.send.send(self.socket.as_ref().unwrap(), &self.data[..], &self.addr)); 98 99 assert_eq!(n, self.data.len()); 100 101 Ok(self.socket.take().unwrap().into()) 102 } 103 } 104 105 trait RecvFn { 106 fn recv(&self, &UdpSocket, &mut [u8], &SocketAddr) -> Result<usize, io::Error>; 107 } 108 109 #[derive(Debug, Clone)] 110 struct RecvFrom {} 111 112 impl RecvFn for RecvFrom { 113 fn recv(&self, socket: &UdpSocket, buf: &mut [u8], 114 expected_addr: &SocketAddr) -> Result<usize, io::Error> { 115 socket.recv_from(buf).map(|(s, addr)| { 116 assert_eq!(addr, *expected_addr); 117 s 118 }) 119 } 120 } 121 122 #[derive(Debug, Clone)] 123 struct Recv {} 124 125 impl RecvFn for Recv { 126 fn recv(&self, socket: &UdpSocket, buf: &mut [u8], _: &SocketAddr) -> Result<usize, io::Error> { 127 socket.recv(buf) 128 } 129 } 130 131 struct RecvMessage<R> { 132 socket: Option<UdpSocket>, 133 recv: R, 134 expected_addr: SocketAddr, 135 expected_data: &'static [u8], 136 } 137 138 impl<R: RecvFn> RecvMessage<R> { 139 fn new(socket: UdpSocket, recv: R, expected_addr: SocketAddr, 140 expected_data: &'static [u8]) -> RecvMessage<R> { 141 RecvMessage { 142 socket: Some(socket), 143 recv: recv, 144 expected_addr: expected_addr, 145 expected_data: expected_data, 146 } 147 } 148 } 149 150 impl<R: RecvFn> Future for RecvMessage<R> { 151 type Item = UdpSocket; 152 type Error = io::Error; 153 154 fn poll(&mut self) -> Poll<UdpSocket, io::Error> { 155 let mut buf = vec![0u8; 10 + self.expected_data.len() * 10]; 156 let n = try_nb!(self.recv.recv(&self.socket.as_ref().unwrap(), &mut buf[..], 157 &self.expected_addr)); 158 159 assert_eq!(n, self.expected_data.len()); 160 assert_eq!(&buf[..self.expected_data.len()], &self.expected_data[..]); 161 162 Ok(self.socket.take().unwrap().into()) 163 } 164 } 165 166 #[test] 167 fn send_dgrams() { 168 let mut l = t!(Core::new()); 169 let mut a = t!(UdpSocket::bind(&t!("127.0.0.1:0".parse()), &l.handle())); 170 let mut b = t!(UdpSocket::bind(&t!("127.0.0.1:0".parse()), &l.handle())); 171 let mut buf = [0u8; 50]; 172 let b_addr = t!(b.local_addr()); 173 174 { 175 let send = a.send_dgram(&b"4321"[..], b_addr); 176 let recv = b.recv_dgram(&mut buf[..]); 177 let (sendt, received) = t!(l.run(send.join(recv))); 178 assert_eq!(received.2, 4); 179 assert_eq!(&received.1[..4], b"4321"); 180 a = sendt.0; 181 b = received.0; 182 } 183 184 { 185 let send = a.send_dgram(&b""[..], b_addr); 186 let recv = b.recv_dgram(&mut buf[..]); 187 let received = t!(l.run(send.join(recv))).1; 188 assert_eq!(received.2, 0); 189 } 190 } 191 192 #[derive(Debug, Clone)] 193 struct Codec { 194 data: &'static [u8], 195 from: SocketAddr, 196 to: SocketAddr, 197 } 198 199 impl UdpCodec for Codec { 200 type In = (); 201 type Out = &'static [u8]; 202 203 fn decode(&mut self, src: &SocketAddr, buf: &[u8]) -> io::Result<Self::In> { 204 assert_eq!(src, &self.from); 205 assert_eq!(buf, self.data); 206 Ok(()) 207 } 208 209 fn encode(&mut self, msg: Self::Out, buf: &mut Vec<u8>) -> SocketAddr { 210 assert_eq!(msg, self.data); 211 buf.extend_from_slice(msg); 212 self.to 213 } 214 } 215 216 #[test] 217 fn send_framed() { 218 let mut l = t!(Core::new()); 219 let mut a_soc = t!(UdpSocket::bind(&t!("127.0.0.1:0".parse()), &l.handle())); 220 let mut b_soc = t!(UdpSocket::bind(&t!("127.0.0.1:0".parse()), &l.handle())); 221 let a_addr = t!(a_soc.local_addr()); 222 let b_addr = t!(b_soc.local_addr()); 223 224 { 225 let a = a_soc.framed(Codec { data: &b"4567"[..], from: a_addr, to: b_addr}); 226 let b = b_soc.framed(Codec { data: &b"4567"[..], from: a_addr, to: b_addr}); 227 228 let send = a.send(&b"4567"[..]); 229 let recv = b.into_future().map_err(|e| e.0); 230 let (sendt, received) = t!(l.run(send.join(recv))); 231 assert_eq!(received.0, Some(())); 232 233 a_soc = sendt.into_inner(); 234 b_soc = received.1.into_inner(); 235 } 236 237 { 238 let a = a_soc.framed(Codec { data: &b""[..], from: a_addr, to: b_addr}); 239 let b = b_soc.framed(Codec { data: &b""[..], from: a_addr, to: b_addr}); 240 241 let send = a.send(&b""[..]); 242 let recv = b.into_future().map_err(|e| e.0); 243 let received = t!(l.run(send.join(recv))).1; 244 assert_eq!(received.0, Some(())); 245 } 246 } 247 248