1 #![warn(rust_2018_idioms)]
2 
3 use tokio::io::{AsyncRead, ReadBuf};
4 use tokio_test::assert_ready;
5 use tokio_test::task;
6 use tokio_util::codec::{Decoder, FramedRead};
7 
8 use bytes::{Buf, BytesMut};
9 use futures::Stream;
10 use std::collections::VecDeque;
11 use std::io;
12 use std::pin::Pin;
13 use std::task::Poll::{Pending, Ready};
14 use std::task::{Context, Poll};
15 
16 macro_rules! mock {
17     ($($x:expr,)*) => {{
18         let mut v = VecDeque::new();
19         v.extend(vec![$($x),*]);
20         Mock { calls: v }
21     }};
22 }
23 
24 macro_rules! assert_read {
25     ($e:expr, $n:expr) => {{
26         let val = assert_ready!($e);
27         assert_eq!(val.unwrap().unwrap(), $n);
28     }};
29 }
30 
31 macro_rules! pin {
32     ($id:ident) => {
33         Pin::new(&mut $id)
34     };
35 }
36 
37 struct U32Decoder;
38 
39 impl Decoder for U32Decoder {
40     type Item = u32;
41     type Error = io::Error;
42 
decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u32>>43     fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u32>> {
44         if buf.len() < 4 {
45             return Ok(None);
46         }
47 
48         let n = buf.split_to(4).get_u32();
49         Ok(Some(n))
50     }
51 }
52 
53 #[test]
read_multi_frame_in_packet()54 fn read_multi_frame_in_packet() {
55     let mut task = task::spawn(());
56     let mock = mock! {
57         Ok(b"\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02".to_vec()),
58     };
59     let mut framed = FramedRead::new(mock, U32Decoder);
60 
61     task.enter(|cx, _| {
62         assert_read!(pin!(framed).poll_next(cx), 0);
63         assert_read!(pin!(framed).poll_next(cx), 1);
64         assert_read!(pin!(framed).poll_next(cx), 2);
65         assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
66     });
67 }
68 
69 #[test]
read_multi_frame_across_packets()70 fn read_multi_frame_across_packets() {
71     let mut task = task::spawn(());
72     let mock = mock! {
73         Ok(b"\x00\x00\x00\x00".to_vec()),
74         Ok(b"\x00\x00\x00\x01".to_vec()),
75         Ok(b"\x00\x00\x00\x02".to_vec()),
76     };
77     let mut framed = FramedRead::new(mock, U32Decoder);
78 
79     task.enter(|cx, _| {
80         assert_read!(pin!(framed).poll_next(cx), 0);
81         assert_read!(pin!(framed).poll_next(cx), 1);
82         assert_read!(pin!(framed).poll_next(cx), 2);
83         assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
84     });
85 }
86 
87 #[test]
read_not_ready()88 fn read_not_ready() {
89     let mut task = task::spawn(());
90     let mock = mock! {
91         Err(io::Error::new(io::ErrorKind::WouldBlock, "")),
92         Ok(b"\x00\x00\x00\x00".to_vec()),
93         Ok(b"\x00\x00\x00\x01".to_vec()),
94     };
95     let mut framed = FramedRead::new(mock, U32Decoder);
96 
97     task.enter(|cx, _| {
98         assert!(pin!(framed).poll_next(cx).is_pending());
99         assert_read!(pin!(framed).poll_next(cx), 0);
100         assert_read!(pin!(framed).poll_next(cx), 1);
101         assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
102     });
103 }
104 
105 #[test]
read_partial_then_not_ready()106 fn read_partial_then_not_ready() {
107     let mut task = task::spawn(());
108     let mock = mock! {
109         Ok(b"\x00\x00".to_vec()),
110         Err(io::Error::new(io::ErrorKind::WouldBlock, "")),
111         Ok(b"\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02".to_vec()),
112     };
113     let mut framed = FramedRead::new(mock, U32Decoder);
114 
115     task.enter(|cx, _| {
116         assert!(pin!(framed).poll_next(cx).is_pending());
117         assert_read!(pin!(framed).poll_next(cx), 0);
118         assert_read!(pin!(framed).poll_next(cx), 1);
119         assert_read!(pin!(framed).poll_next(cx), 2);
120         assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
121     });
122 }
123 
124 #[test]
read_err()125 fn read_err() {
126     let mut task = task::spawn(());
127     let mock = mock! {
128         Err(io::Error::new(io::ErrorKind::Other, "")),
129     };
130     let mut framed = FramedRead::new(mock, U32Decoder);
131 
132     task.enter(|cx, _| {
133         assert_eq!(
134             io::ErrorKind::Other,
135             assert_ready!(pin!(framed).poll_next(cx))
136                 .unwrap()
137                 .unwrap_err()
138                 .kind()
139         )
140     });
141 }
142 
143 #[test]
read_partial_then_err()144 fn read_partial_then_err() {
145     let mut task = task::spawn(());
146     let mock = mock! {
147         Ok(b"\x00\x00".to_vec()),
148         Err(io::Error::new(io::ErrorKind::Other, "")),
149     };
150     let mut framed = FramedRead::new(mock, U32Decoder);
151 
152     task.enter(|cx, _| {
153         assert_eq!(
154             io::ErrorKind::Other,
155             assert_ready!(pin!(framed).poll_next(cx))
156                 .unwrap()
157                 .unwrap_err()
158                 .kind()
159         )
160     });
161 }
162 
163 #[test]
read_partial_would_block_then_err()164 fn read_partial_would_block_then_err() {
165     let mut task = task::spawn(());
166     let mock = mock! {
167         Ok(b"\x00\x00".to_vec()),
168         Err(io::Error::new(io::ErrorKind::WouldBlock, "")),
169         Err(io::Error::new(io::ErrorKind::Other, "")),
170     };
171     let mut framed = FramedRead::new(mock, U32Decoder);
172 
173     task.enter(|cx, _| {
174         assert!(pin!(framed).poll_next(cx).is_pending());
175         assert_eq!(
176             io::ErrorKind::Other,
177             assert_ready!(pin!(framed).poll_next(cx))
178                 .unwrap()
179                 .unwrap_err()
180                 .kind()
181         )
182     });
183 }
184 
185 #[test]
huge_size()186 fn huge_size() {
187     let mut task = task::spawn(());
188     let data = &[0; 32 * 1024][..];
189     let mut framed = FramedRead::new(data, BigDecoder);
190 
191     task.enter(|cx, _| {
192         assert_read!(pin!(framed).poll_next(cx), 0);
193         assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
194     });
195 
196     struct BigDecoder;
197 
198     impl Decoder for BigDecoder {
199         type Item = u32;
200         type Error = io::Error;
201 
202         fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u32>> {
203             if buf.len() < 32 * 1024 {
204                 return Ok(None);
205             }
206             buf.advance(32 * 1024);
207             Ok(Some(0))
208         }
209     }
210 }
211 
212 #[test]
data_remaining_is_error()213 fn data_remaining_is_error() {
214     let mut task = task::spawn(());
215     let slice = &[0; 5][..];
216     let mut framed = FramedRead::new(slice, U32Decoder);
217 
218     task.enter(|cx, _| {
219         assert_read!(pin!(framed).poll_next(cx), 0);
220         assert!(assert_ready!(pin!(framed).poll_next(cx)).unwrap().is_err());
221     });
222 }
223 
224 #[test]
multi_frames_on_eof()225 fn multi_frames_on_eof() {
226     let mut task = task::spawn(());
227     struct MyDecoder(Vec<u32>);
228 
229     impl Decoder for MyDecoder {
230         type Item = u32;
231         type Error = io::Error;
232 
233         fn decode(&mut self, _buf: &mut BytesMut) -> io::Result<Option<u32>> {
234             unreachable!();
235         }
236 
237         fn decode_eof(&mut self, _buf: &mut BytesMut) -> io::Result<Option<u32>> {
238             if self.0.is_empty() {
239                 return Ok(None);
240             }
241 
242             Ok(Some(self.0.remove(0)))
243         }
244     }
245 
246     let mut framed = FramedRead::new(mock!(), MyDecoder(vec![0, 1, 2, 3]));
247 
248     task.enter(|cx, _| {
249         assert_read!(pin!(framed).poll_next(cx), 0);
250         assert_read!(pin!(framed).poll_next(cx), 1);
251         assert_read!(pin!(framed).poll_next(cx), 2);
252         assert_read!(pin!(framed).poll_next(cx), 3);
253         assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
254     });
255 }
256 
257 #[test]
read_eof_then_resume()258 fn read_eof_then_resume() {
259     let mut task = task::spawn(());
260     let mock = mock! {
261         Ok(b"\x00\x00\x00\x01".to_vec()),
262         Ok(b"".to_vec()),
263         Ok(b"\x00\x00\x00\x02".to_vec()),
264         Ok(b"".to_vec()),
265         Ok(b"\x00\x00\x00\x03".to_vec()),
266     };
267     let mut framed = FramedRead::new(mock, U32Decoder);
268 
269     task.enter(|cx, _| {
270         assert_read!(pin!(framed).poll_next(cx), 1);
271         assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
272         assert_read!(pin!(framed).poll_next(cx), 2);
273         assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
274         assert_read!(pin!(framed).poll_next(cx), 3);
275         assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
276         assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
277     });
278 }
279 
280 // ===== Mock ======
281 
282 struct Mock {
283     calls: VecDeque<io::Result<Vec<u8>>>,
284 }
285 
286 impl AsyncRead for Mock {
poll_read( mut self: Pin<&mut Self>, _cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>287     fn poll_read(
288         mut self: Pin<&mut Self>,
289         _cx: &mut Context<'_>,
290         buf: &mut ReadBuf<'_>,
291     ) -> Poll<io::Result<()>> {
292         use io::ErrorKind::WouldBlock;
293 
294         match self.calls.pop_front() {
295             Some(Ok(data)) => {
296                 debug_assert!(buf.remaining() >= data.len());
297                 buf.put_slice(&data);
298                 Ready(Ok(()))
299             }
300             Some(Err(ref e)) if e.kind() == WouldBlock => Pending,
301             Some(Err(e)) => Ready(Err(e)),
302             None => Ready(Ok(())),
303         }
304     }
305 }
306