1 #![warn(rust_2018_idioms)]
2 #![cfg(feature = "full")]
3 
4 // https://github.com/rust-lang/futures-rs/blob/1803948ff091b4eabf7f3bf39e16bbbdefca5cc8/futures/tests/io_buf_reader.rs
5 
6 use futures::task::{noop_waker_ref, Context, Poll};
7 use std::cmp;
8 use std::io::{self, Cursor};
9 use std::pin::Pin;
10 use tokio::io::{
11     AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWriteExt,
12     BufReader, ReadBuf, SeekFrom,
13 };
14 use tokio_test::task::spawn;
15 use tokio_test::{assert_pending, assert_ready};
16 
17 macro_rules! run_fill_buf {
18     ($reader:expr) => {{
19         let mut cx = Context::from_waker(noop_waker_ref());
20         loop {
21             if let Poll::Ready(x) = Pin::new(&mut $reader).poll_fill_buf(&mut cx) {
22                 break x;
23             }
24         }
25     }};
26 }
27 
28 struct MaybePending<'a> {
29     inner: &'a [u8],
30     ready_read: bool,
31     ready_fill_buf: bool,
32 }
33 
34 impl<'a> MaybePending<'a> {
new(inner: &'a [u8]) -> Self35     fn new(inner: &'a [u8]) -> Self {
36         Self {
37             inner,
38             ready_read: false,
39             ready_fill_buf: false,
40         }
41     }
42 }
43 
44 impl AsyncRead for MaybePending<'_> {
poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>45     fn poll_read(
46         mut self: Pin<&mut Self>,
47         cx: &mut Context<'_>,
48         buf: &mut ReadBuf<'_>,
49     ) -> Poll<io::Result<()>> {
50         if self.ready_read {
51             self.ready_read = false;
52             Pin::new(&mut self.inner).poll_read(cx, buf)
53         } else {
54             self.ready_read = true;
55             cx.waker().wake_by_ref();
56             Poll::Pending
57         }
58     }
59 }
60 
61 impl AsyncBufRead for MaybePending<'_> {
poll_fill_buf(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<&[u8]>>62     fn poll_fill_buf(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
63         if self.ready_fill_buf {
64             self.ready_fill_buf = false;
65             if self.inner.is_empty() {
66                 return Poll::Ready(Ok(&[]));
67             }
68             let len = cmp::min(2, self.inner.len());
69             Poll::Ready(Ok(&self.inner[0..len]))
70         } else {
71             self.ready_fill_buf = true;
72             Poll::Pending
73         }
74     }
75 
consume(mut self: Pin<&mut Self>, amt: usize)76     fn consume(mut self: Pin<&mut Self>, amt: usize) {
77         self.inner = &self.inner[amt..];
78     }
79 }
80 
81 #[tokio::test]
test_buffered_reader()82 async fn test_buffered_reader() {
83     let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
84     let mut reader = BufReader::with_capacity(2, inner);
85 
86     let mut buf = [0, 0, 0];
87     let nread = reader.read(&mut buf).await.unwrap();
88     assert_eq!(nread, 3);
89     assert_eq!(buf, [5, 6, 7]);
90     assert_eq!(reader.buffer(), []);
91 
92     let mut buf = [0, 0];
93     let nread = reader.read(&mut buf).await.unwrap();
94     assert_eq!(nread, 2);
95     assert_eq!(buf, [0, 1]);
96     assert_eq!(reader.buffer(), []);
97 
98     let mut buf = [0];
99     let nread = reader.read(&mut buf).await.unwrap();
100     assert_eq!(nread, 1);
101     assert_eq!(buf, [2]);
102     assert_eq!(reader.buffer(), [3]);
103 
104     let mut buf = [0, 0, 0];
105     let nread = reader.read(&mut buf).await.unwrap();
106     assert_eq!(nread, 1);
107     assert_eq!(buf, [3, 0, 0]);
108     assert_eq!(reader.buffer(), []);
109 
110     let nread = reader.read(&mut buf).await.unwrap();
111     assert_eq!(nread, 1);
112     assert_eq!(buf, [4, 0, 0]);
113     assert_eq!(reader.buffer(), []);
114 
115     assert_eq!(reader.read(&mut buf).await.unwrap(), 0);
116 }
117 
118 #[tokio::test]
test_buffered_reader_seek()119 async fn test_buffered_reader_seek() {
120     let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
121     let mut reader = BufReader::with_capacity(2, Cursor::new(inner));
122 
123     assert_eq!(reader.seek(SeekFrom::Start(3)).await.unwrap(), 3);
124     assert_eq!(run_fill_buf!(reader).unwrap(), &[0, 1][..]);
125     assert!(reader.seek(SeekFrom::Current(i64::MIN)).await.is_err());
126     assert_eq!(run_fill_buf!(reader).unwrap(), &[0, 1][..]);
127     assert_eq!(reader.seek(SeekFrom::Current(1)).await.unwrap(), 4);
128     assert_eq!(run_fill_buf!(reader).unwrap(), &[1, 2][..]);
129     Pin::new(&mut reader).consume(1);
130     assert_eq!(reader.seek(SeekFrom::Current(-2)).await.unwrap(), 3);
131 }
132 
133 #[tokio::test]
test_buffered_reader_seek_underflow()134 async fn test_buffered_reader_seek_underflow() {
135     // gimmick reader that yields its position modulo 256 for each byte
136     struct PositionReader {
137         pos: u64,
138     }
139     impl AsyncRead for PositionReader {
140         fn poll_read(
141             mut self: Pin<&mut Self>,
142             _: &mut Context<'_>,
143             buf: &mut ReadBuf<'_>,
144         ) -> Poll<io::Result<()>> {
145             let b = buf.initialize_unfilled();
146             let len = b.len();
147             for x in b {
148                 *x = self.pos as u8;
149                 self.pos = self.pos.wrapping_add(1);
150             }
151             buf.advance(len);
152             Poll::Ready(Ok(()))
153         }
154     }
155     impl AsyncSeek for PositionReader {
156         fn start_seek(mut self: Pin<&mut Self>, pos: SeekFrom) -> io::Result<()> {
157             match pos {
158                 SeekFrom::Start(n) => {
159                     self.pos = n;
160                 }
161                 SeekFrom::Current(n) => {
162                     self.pos = self.pos.wrapping_add(n as u64);
163                 }
164                 SeekFrom::End(n) => {
165                     self.pos = u64::MAX.wrapping_add(n as u64);
166                 }
167             }
168             Ok(())
169         }
170         fn poll_complete(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<u64>> {
171             Poll::Ready(Ok(self.pos))
172         }
173     }
174 
175     let mut reader = BufReader::with_capacity(5, PositionReader { pos: 0 });
176     assert_eq!(run_fill_buf!(reader).unwrap(), &[0, 1, 2, 3, 4][..]);
177     assert_eq!(reader.seek(SeekFrom::End(-5)).await.unwrap(), u64::MAX - 5);
178     assert_eq!(run_fill_buf!(reader).unwrap().len(), 5);
179     // the following seek will require two underlying seeks
180     let expected = 9_223_372_036_854_775_802;
181     assert_eq!(
182         reader.seek(SeekFrom::Current(i64::MIN)).await.unwrap(),
183         expected
184     );
185     assert_eq!(run_fill_buf!(reader).unwrap().len(), 5);
186     // seeking to 0 should empty the buffer.
187     assert_eq!(reader.seek(SeekFrom::Current(0)).await.unwrap(), expected);
188     assert_eq!(reader.get_ref().pos, expected);
189 }
190 
191 #[tokio::test]
test_short_reads()192 async fn test_short_reads() {
193     /// A dummy reader intended at testing short-reads propagation.
194     struct ShortReader {
195         lengths: Vec<usize>,
196     }
197 
198     impl AsyncRead for ShortReader {
199         fn poll_read(
200             mut self: Pin<&mut Self>,
201             _: &mut Context<'_>,
202             buf: &mut ReadBuf<'_>,
203         ) -> Poll<io::Result<()>> {
204             if !self.lengths.is_empty() {
205                 buf.advance(self.lengths.remove(0));
206             }
207             Poll::Ready(Ok(()))
208         }
209     }
210 
211     let inner = ShortReader {
212         lengths: vec![0, 1, 2, 0, 1, 0],
213     };
214     let mut reader = BufReader::new(inner);
215     let mut buf = [0, 0];
216     assert_eq!(reader.read(&mut buf).await.unwrap(), 0);
217     assert_eq!(reader.read(&mut buf).await.unwrap(), 1);
218     assert_eq!(reader.read(&mut buf).await.unwrap(), 2);
219     assert_eq!(reader.read(&mut buf).await.unwrap(), 0);
220     assert_eq!(reader.read(&mut buf).await.unwrap(), 1);
221     assert_eq!(reader.read(&mut buf).await.unwrap(), 0);
222     assert_eq!(reader.read(&mut buf).await.unwrap(), 0);
223 }
224 
225 #[tokio::test]
maybe_pending()226 async fn maybe_pending() {
227     let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
228     let mut reader = BufReader::with_capacity(2, MaybePending::new(inner));
229 
230     let mut buf = [0, 0, 0];
231     let nread = reader.read(&mut buf).await.unwrap();
232     assert_eq!(nread, 3);
233     assert_eq!(buf, [5, 6, 7]);
234     assert_eq!(reader.buffer(), []);
235 
236     let mut buf = [0, 0];
237     let nread = reader.read(&mut buf).await.unwrap();
238     assert_eq!(nread, 2);
239     assert_eq!(buf, [0, 1]);
240     assert_eq!(reader.buffer(), []);
241 
242     let mut buf = [0];
243     let nread = reader.read(&mut buf).await.unwrap();
244     assert_eq!(nread, 1);
245     assert_eq!(buf, [2]);
246     assert_eq!(reader.buffer(), [3]);
247 
248     let mut buf = [0, 0, 0];
249     let nread = reader.read(&mut buf).await.unwrap();
250     assert_eq!(nread, 1);
251     assert_eq!(buf, [3, 0, 0]);
252     assert_eq!(reader.buffer(), []);
253 
254     let nread = reader.read(&mut buf).await.unwrap();
255     assert_eq!(nread, 1);
256     assert_eq!(buf, [4, 0, 0]);
257     assert_eq!(reader.buffer(), []);
258 
259     assert_eq!(reader.read(&mut buf).await.unwrap(), 0);
260 }
261 
262 #[tokio::test]
maybe_pending_buf_read()263 async fn maybe_pending_buf_read() {
264     let inner = MaybePending::new(&[0, 1, 2, 3, 1, 0]);
265     let mut reader = BufReader::with_capacity(2, inner);
266     let mut v = Vec::new();
267     reader.read_until(3, &mut v).await.unwrap();
268     assert_eq!(v, [0, 1, 2, 3]);
269     v.clear();
270     reader.read_until(1, &mut v).await.unwrap();
271     assert_eq!(v, [1]);
272     v.clear();
273     reader.read_until(8, &mut v).await.unwrap();
274     assert_eq!(v, [0]);
275     v.clear();
276     reader.read_until(9, &mut v).await.unwrap();
277     assert_eq!(v, []);
278 }
279 
280 // https://github.com/rust-lang/futures-rs/pull/1573#discussion_r281162309
281 #[tokio::test]
maybe_pending_seek()282 async fn maybe_pending_seek() {
283     struct MaybePendingSeek<'a> {
284         inner: Cursor<&'a [u8]>,
285         ready: bool,
286         seek_res: Option<io::Result<()>>,
287     }
288 
289     impl<'a> MaybePendingSeek<'a> {
290         fn new(inner: &'a [u8]) -> Self {
291             Self {
292                 inner: Cursor::new(inner),
293                 ready: true,
294                 seek_res: None,
295             }
296         }
297     }
298 
299     impl AsyncRead for MaybePendingSeek<'_> {
300         fn poll_read(
301             mut self: Pin<&mut Self>,
302             cx: &mut Context<'_>,
303             buf: &mut ReadBuf<'_>,
304         ) -> Poll<io::Result<()>> {
305             Pin::new(&mut self.inner).poll_read(cx, buf)
306         }
307     }
308 
309     impl AsyncBufRead for MaybePendingSeek<'_> {
310         fn poll_fill_buf(
311             mut self: Pin<&mut Self>,
312             cx: &mut Context<'_>,
313         ) -> Poll<io::Result<&[u8]>> {
314             let this: *mut Self = &mut *self as *mut _;
315             Pin::new(&mut unsafe { &mut *this }.inner).poll_fill_buf(cx)
316         }
317 
318         fn consume(mut self: Pin<&mut Self>, amt: usize) {
319             Pin::new(&mut self.inner).consume(amt)
320         }
321     }
322 
323     impl AsyncSeek for MaybePendingSeek<'_> {
324         fn start_seek(mut self: Pin<&mut Self>, pos: SeekFrom) -> io::Result<()> {
325             self.seek_res = Some(Pin::new(&mut self.inner).start_seek(pos));
326             Ok(())
327         }
328         fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
329             if self.ready {
330                 self.ready = false;
331                 self.seek_res.take().unwrap_or(Ok(()))?;
332                 Pin::new(&mut self.inner).poll_complete(cx)
333             } else {
334                 self.ready = true;
335                 cx.waker().wake_by_ref();
336                 Poll::Pending
337             }
338         }
339     }
340 
341     let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
342     let mut reader = BufReader::with_capacity(2, MaybePendingSeek::new(inner));
343 
344     assert_eq!(reader.seek(SeekFrom::Current(3)).await.unwrap(), 3);
345     assert_eq!(run_fill_buf!(reader).unwrap(), &[0, 1][..]);
346     assert!(reader.seek(SeekFrom::Current(i64::MIN)).await.is_err());
347     assert_eq!(run_fill_buf!(reader).unwrap(), &[0, 1][..]);
348     assert_eq!(reader.seek(SeekFrom::Current(1)).await.unwrap(), 4);
349     assert_eq!(run_fill_buf!(reader).unwrap(), &[1, 2][..]);
350     Pin::new(&mut reader).consume(1);
351     assert_eq!(reader.seek(SeekFrom::Current(-2)).await.unwrap(), 3);
352 }
353 
354 // This tests the AsyncBufReadExt::fill_buf wrapper.
355 #[tokio::test]
test_fill_buf_wrapper()356 async fn test_fill_buf_wrapper() {
357     let (mut write, read) = tokio::io::duplex(16);
358 
359     let mut read = BufReader::new(read);
360     write.write_all(b"hello world").await.unwrap();
361 
362     assert_eq!(read.fill_buf().await.unwrap(), b"hello world");
363     read.consume(b"hello ".len());
364     assert_eq!(read.fill_buf().await.unwrap(), b"world");
365     assert_eq!(read.fill_buf().await.unwrap(), b"world");
366     read.consume(b"world".len());
367 
368     let mut fill = spawn(read.fill_buf());
369     assert_pending!(fill.poll());
370 
371     write.write_all(b"foo bar").await.unwrap();
372     assert_eq!(assert_ready!(fill.poll()).unwrap(), b"foo bar");
373     drop(fill);
374 
375     drop(write);
376     assert_eq!(read.fill_buf().await.unwrap(), b"foo bar");
377     read.consume(b"foo bar".len());
378     assert_eq!(read.fill_buf().await.unwrap(), b"");
379 }
380