1 use tokio::{net::UdpSocket, stream::StreamExt};
2 use tokio_util::codec::{Decoder, Encoder};
3 use tokio_util::udp::UdpFramed;
4 
5 use bytes::{BufMut, BytesMut};
6 use futures::future::try_join;
7 use futures::future::FutureExt;
8 use futures::sink::SinkExt;
9 use std::io;
10 
11 #[cfg_attr(any(target_os = "macos", target_os = "ios"), allow(unused_assignments))]
12 #[tokio::test]
send_framed() -> std::io::Result<()>13 async fn send_framed() -> std::io::Result<()> {
14     let mut a_soc = UdpSocket::bind("127.0.0.1:0").await?;
15     let mut b_soc = UdpSocket::bind("127.0.0.1:0").await?;
16 
17     let a_addr = a_soc.local_addr()?;
18     let b_addr = b_soc.local_addr()?;
19 
20     // test sending & receiving bytes
21     {
22         let mut a = UdpFramed::new(a_soc, ByteCodec);
23         let mut b = UdpFramed::new(b_soc, ByteCodec);
24 
25         let msg = b"4567";
26 
27         let send = a.send((msg, b_addr));
28         let recv = b.next().map(|e| e.unwrap());
29         let (_, received) = try_join(send, recv).await.unwrap();
30 
31         let (data, addr) = received;
32         assert_eq!(msg, &*data);
33         assert_eq!(a_addr, addr);
34 
35         a_soc = a.into_inner();
36         b_soc = b.into_inner();
37     }
38 
39     #[cfg(not(any(target_os = "macos", target_os = "ios")))]
40     // test sending & receiving an empty message
41     {
42         let mut a = UdpFramed::new(a_soc, ByteCodec);
43         let mut b = UdpFramed::new(b_soc, ByteCodec);
44 
45         let msg = b"";
46 
47         let send = a.send((msg, b_addr));
48         let recv = b.next().map(|e| e.unwrap());
49         let (_, received) = try_join(send, recv).await.unwrap();
50 
51         let (data, addr) = received;
52         assert_eq!(msg, &*data);
53         assert_eq!(a_addr, addr);
54     }
55 
56     Ok(())
57 }
58 
59 pub struct ByteCodec;
60 
61 impl Decoder for ByteCodec {
62     type Item = Vec<u8>;
63     type Error = io::Error;
64 
decode(&mut self, buf: &mut BytesMut) -> Result<Option<Vec<u8>>, io::Error>65     fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Vec<u8>>, io::Error> {
66         let len = buf.len();
67         Ok(Some(buf.split_to(len).to_vec()))
68     }
69 }
70 
71 impl Encoder<&[u8]> for ByteCodec {
72     type Error = io::Error;
73 
encode(&mut self, data: &[u8], buf: &mut BytesMut) -> Result<(), io::Error>74     fn encode(&mut self, data: &[u8], buf: &mut BytesMut) -> Result<(), io::Error> {
75         buf.reserve(data.len());
76         buf.put_slice(data);
77         Ok(())
78     }
79 }
80