1 use tokio::{net::UdpSocket, stream::StreamExt};
2 use tokio_util::codec::{Decoder, Encoder};
3 use tokio_util::udp::UdpFramed;
4
5 use bytes::{BufMut, BytesMut};
6 use futures::future::try_join;
7 use futures::future::FutureExt;
8 use futures::sink::SinkExt;
9 use std::io;
10
11 #[cfg_attr(any(target_os = "macos", target_os = "ios"), allow(unused_assignments))]
12 #[tokio::test]
send_framed() -> std::io::Result<()>13 async fn send_framed() -> std::io::Result<()> {
14 let mut a_soc = UdpSocket::bind("127.0.0.1:0").await?;
15 let mut b_soc = UdpSocket::bind("127.0.0.1:0").await?;
16
17 let a_addr = a_soc.local_addr()?;
18 let b_addr = b_soc.local_addr()?;
19
20 // test sending & receiving bytes
21 {
22 let mut a = UdpFramed::new(a_soc, ByteCodec);
23 let mut b = UdpFramed::new(b_soc, ByteCodec);
24
25 let msg = b"4567";
26
27 let send = a.send((msg, b_addr));
28 let recv = b.next().map(|e| e.unwrap());
29 let (_, received) = try_join(send, recv).await.unwrap();
30
31 let (data, addr) = received;
32 assert_eq!(msg, &*data);
33 assert_eq!(a_addr, addr);
34
35 a_soc = a.into_inner();
36 b_soc = b.into_inner();
37 }
38
39 #[cfg(not(any(target_os = "macos", target_os = "ios")))]
40 // test sending & receiving an empty message
41 {
42 let mut a = UdpFramed::new(a_soc, ByteCodec);
43 let mut b = UdpFramed::new(b_soc, ByteCodec);
44
45 let msg = b"";
46
47 let send = a.send((msg, b_addr));
48 let recv = b.next().map(|e| e.unwrap());
49 let (_, received) = try_join(send, recv).await.unwrap();
50
51 let (data, addr) = received;
52 assert_eq!(msg, &*data);
53 assert_eq!(a_addr, addr);
54 }
55
56 Ok(())
57 }
58
59 pub struct ByteCodec;
60
61 impl Decoder for ByteCodec {
62 type Item = Vec<u8>;
63 type Error = io::Error;
64
decode(&mut self, buf: &mut BytesMut) -> Result<Option<Vec<u8>>, io::Error>65 fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Vec<u8>>, io::Error> {
66 let len = buf.len();
67 Ok(Some(buf.split_to(len).to_vec()))
68 }
69 }
70
71 impl Encoder<&[u8]> for ByteCodec {
72 type Error = io::Error;
73
encode(&mut self, data: &[u8], buf: &mut BytesMut) -> Result<(), io::Error>74 fn encode(&mut self, data: &[u8], buf: &mut BytesMut) -> Result<(), io::Error> {
75 buf.reserve(data.len());
76 buf.put_slice(data);
77 Ok(())
78 }
79 }
80