1 #![warn(rust_2018_idioms)]
2
3 use tokio::io::{AsyncRead, ReadBuf};
4 use tokio_test::assert_ready;
5 use tokio_test::task;
6 use tokio_util::codec::{Decoder, FramedRead};
7
8 use bytes::{Buf, BytesMut};
9 use futures::Stream;
10 use std::collections::VecDeque;
11 use std::io;
12 use std::pin::Pin;
13 use std::task::Poll::{Pending, Ready};
14 use std::task::{Context, Poll};
15
16 macro_rules! mock {
17 ($($x:expr,)*) => {{
18 let mut v = VecDeque::new();
19 v.extend(vec![$($x),*]);
20 Mock { calls: v }
21 }};
22 }
23
24 macro_rules! assert_read {
25 ($e:expr, $n:expr) => {{
26 let val = assert_ready!($e);
27 assert_eq!(val.unwrap().unwrap(), $n);
28 }};
29 }
30
31 macro_rules! pin {
32 ($id:ident) => {
33 Pin::new(&mut $id)
34 };
35 }
36
37 struct U32Decoder;
38
39 impl Decoder for U32Decoder {
40 type Item = u32;
41 type Error = io::Error;
42
decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u32>>43 fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u32>> {
44 if buf.len() < 4 {
45 return Ok(None);
46 }
47
48 let n = buf.split_to(4).get_u32();
49 Ok(Some(n))
50 }
51 }
52
53 #[test]
read_multi_frame_in_packet()54 fn read_multi_frame_in_packet() {
55 let mut task = task::spawn(());
56 let mock = mock! {
57 Ok(b"\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02".to_vec()),
58 };
59 let mut framed = FramedRead::new(mock, U32Decoder);
60
61 task.enter(|cx, _| {
62 assert_read!(pin!(framed).poll_next(cx), 0);
63 assert_read!(pin!(framed).poll_next(cx), 1);
64 assert_read!(pin!(framed).poll_next(cx), 2);
65 assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
66 });
67 }
68
69 #[test]
read_multi_frame_across_packets()70 fn read_multi_frame_across_packets() {
71 let mut task = task::spawn(());
72 let mock = mock! {
73 Ok(b"\x00\x00\x00\x00".to_vec()),
74 Ok(b"\x00\x00\x00\x01".to_vec()),
75 Ok(b"\x00\x00\x00\x02".to_vec()),
76 };
77 let mut framed = FramedRead::new(mock, U32Decoder);
78
79 task.enter(|cx, _| {
80 assert_read!(pin!(framed).poll_next(cx), 0);
81 assert_read!(pin!(framed).poll_next(cx), 1);
82 assert_read!(pin!(framed).poll_next(cx), 2);
83 assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
84 });
85 }
86
87 #[test]
read_not_ready()88 fn read_not_ready() {
89 let mut task = task::spawn(());
90 let mock = mock! {
91 Err(io::Error::new(io::ErrorKind::WouldBlock, "")),
92 Ok(b"\x00\x00\x00\x00".to_vec()),
93 Ok(b"\x00\x00\x00\x01".to_vec()),
94 };
95 let mut framed = FramedRead::new(mock, U32Decoder);
96
97 task.enter(|cx, _| {
98 assert!(pin!(framed).poll_next(cx).is_pending());
99 assert_read!(pin!(framed).poll_next(cx), 0);
100 assert_read!(pin!(framed).poll_next(cx), 1);
101 assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
102 });
103 }
104
105 #[test]
read_partial_then_not_ready()106 fn read_partial_then_not_ready() {
107 let mut task = task::spawn(());
108 let mock = mock! {
109 Ok(b"\x00\x00".to_vec()),
110 Err(io::Error::new(io::ErrorKind::WouldBlock, "")),
111 Ok(b"\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02".to_vec()),
112 };
113 let mut framed = FramedRead::new(mock, U32Decoder);
114
115 task.enter(|cx, _| {
116 assert!(pin!(framed).poll_next(cx).is_pending());
117 assert_read!(pin!(framed).poll_next(cx), 0);
118 assert_read!(pin!(framed).poll_next(cx), 1);
119 assert_read!(pin!(framed).poll_next(cx), 2);
120 assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
121 });
122 }
123
124 #[test]
read_err()125 fn read_err() {
126 let mut task = task::spawn(());
127 let mock = mock! {
128 Err(io::Error::new(io::ErrorKind::Other, "")),
129 };
130 let mut framed = FramedRead::new(mock, U32Decoder);
131
132 task.enter(|cx, _| {
133 assert_eq!(
134 io::ErrorKind::Other,
135 assert_ready!(pin!(framed).poll_next(cx))
136 .unwrap()
137 .unwrap_err()
138 .kind()
139 )
140 });
141 }
142
143 #[test]
read_partial_then_err()144 fn read_partial_then_err() {
145 let mut task = task::spawn(());
146 let mock = mock! {
147 Ok(b"\x00\x00".to_vec()),
148 Err(io::Error::new(io::ErrorKind::Other, "")),
149 };
150 let mut framed = FramedRead::new(mock, U32Decoder);
151
152 task.enter(|cx, _| {
153 assert_eq!(
154 io::ErrorKind::Other,
155 assert_ready!(pin!(framed).poll_next(cx))
156 .unwrap()
157 .unwrap_err()
158 .kind()
159 )
160 });
161 }
162
163 #[test]
read_partial_would_block_then_err()164 fn read_partial_would_block_then_err() {
165 let mut task = task::spawn(());
166 let mock = mock! {
167 Ok(b"\x00\x00".to_vec()),
168 Err(io::Error::new(io::ErrorKind::WouldBlock, "")),
169 Err(io::Error::new(io::ErrorKind::Other, "")),
170 };
171 let mut framed = FramedRead::new(mock, U32Decoder);
172
173 task.enter(|cx, _| {
174 assert!(pin!(framed).poll_next(cx).is_pending());
175 assert_eq!(
176 io::ErrorKind::Other,
177 assert_ready!(pin!(framed).poll_next(cx))
178 .unwrap()
179 .unwrap_err()
180 .kind()
181 )
182 });
183 }
184
185 #[test]
huge_size()186 fn huge_size() {
187 let mut task = task::spawn(());
188 let data = &[0; 32 * 1024][..];
189 let mut framed = FramedRead::new(data, BigDecoder);
190
191 task.enter(|cx, _| {
192 assert_read!(pin!(framed).poll_next(cx), 0);
193 assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
194 });
195
196 struct BigDecoder;
197
198 impl Decoder for BigDecoder {
199 type Item = u32;
200 type Error = io::Error;
201
202 fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u32>> {
203 if buf.len() < 32 * 1024 {
204 return Ok(None);
205 }
206 buf.advance(32 * 1024);
207 Ok(Some(0))
208 }
209 }
210 }
211
212 #[test]
data_remaining_is_error()213 fn data_remaining_is_error() {
214 let mut task = task::spawn(());
215 let slice = &[0; 5][..];
216 let mut framed = FramedRead::new(slice, U32Decoder);
217
218 task.enter(|cx, _| {
219 assert_read!(pin!(framed).poll_next(cx), 0);
220 assert!(assert_ready!(pin!(framed).poll_next(cx)).unwrap().is_err());
221 });
222 }
223
224 #[test]
multi_frames_on_eof()225 fn multi_frames_on_eof() {
226 let mut task = task::spawn(());
227 struct MyDecoder(Vec<u32>);
228
229 impl Decoder for MyDecoder {
230 type Item = u32;
231 type Error = io::Error;
232
233 fn decode(&mut self, _buf: &mut BytesMut) -> io::Result<Option<u32>> {
234 unreachable!();
235 }
236
237 fn decode_eof(&mut self, _buf: &mut BytesMut) -> io::Result<Option<u32>> {
238 if self.0.is_empty() {
239 return Ok(None);
240 }
241
242 Ok(Some(self.0.remove(0)))
243 }
244 }
245
246 let mut framed = FramedRead::new(mock!(), MyDecoder(vec![0, 1, 2, 3]));
247
248 task.enter(|cx, _| {
249 assert_read!(pin!(framed).poll_next(cx), 0);
250 assert_read!(pin!(framed).poll_next(cx), 1);
251 assert_read!(pin!(framed).poll_next(cx), 2);
252 assert_read!(pin!(framed).poll_next(cx), 3);
253 assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
254 });
255 }
256
257 #[test]
read_eof_then_resume()258 fn read_eof_then_resume() {
259 let mut task = task::spawn(());
260 let mock = mock! {
261 Ok(b"\x00\x00\x00\x01".to_vec()),
262 Ok(b"".to_vec()),
263 Ok(b"\x00\x00\x00\x02".to_vec()),
264 Ok(b"".to_vec()),
265 Ok(b"\x00\x00\x00\x03".to_vec()),
266 };
267 let mut framed = FramedRead::new(mock, U32Decoder);
268
269 task.enter(|cx, _| {
270 assert_read!(pin!(framed).poll_next(cx), 1);
271 assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
272 assert_read!(pin!(framed).poll_next(cx), 2);
273 assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
274 assert_read!(pin!(framed).poll_next(cx), 3);
275 assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
276 assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
277 });
278 }
279
280 // ===== Mock ======
281
282 struct Mock {
283 calls: VecDeque<io::Result<Vec<u8>>>,
284 }
285
286 impl AsyncRead for Mock {
poll_read( mut self: Pin<&mut Self>, _cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>287 fn poll_read(
288 mut self: Pin<&mut Self>,
289 _cx: &mut Context<'_>,
290 buf: &mut ReadBuf<'_>,
291 ) -> Poll<io::Result<()>> {
292 use io::ErrorKind::WouldBlock;
293
294 match self.calls.pop_front() {
295 Some(Ok(data)) => {
296 debug_assert!(buf.remaining() >= data.len());
297 buf.put_slice(&data);
298 Ready(Ok(()))
299 }
300 Some(Err(ref e)) if e.kind() == WouldBlock => Pending,
301 Some(Err(e)) => Ready(Err(e)),
302 None => Ready(Ok(())),
303 }
304 }
305 }
306