1 use futures::executor::block_on;
2 use futures::future::{Future, FutureExt};
3 use futures::io::{
4     AllowStdIo, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt,
5     BufReader, SeekFrom,
6 };
7 use futures::pin_mut;
8 use futures::task::{Context, Poll};
9 use futures_test::task::noop_context;
10 use pin_project::pin_project;
11 use std::cmp;
12 use std::io;
13 use std::pin::Pin;
14 
15 // helper for maybe_pending_* tests
run<F: Future + Unpin>(mut f: F) -> F::Output16 fn run<F: Future + Unpin>(mut f: F) -> F::Output {
17     let mut cx = noop_context();
18     loop {
19         if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
20             return x;
21         }
22     }
23 }
24 
25 // https://github.com/rust-lang/futures-rs/pull/2489#discussion_r697865719
26 #[pin_project(!Unpin)]
27 struct Cursor<T> {
28     #[pin]
29     inner: futures::io::Cursor<T>,
30 }
31 
32 impl<T> Cursor<T> {
new(inner: T) -> Self33     fn new(inner: T) -> Self {
34         Self { inner: futures::io::Cursor::new(inner) }
35     }
36 }
37 
38 impl AsyncRead for Cursor<&[u8]> {
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<io::Result<usize>>39     fn poll_read(
40         self: Pin<&mut Self>,
41         cx: &mut Context<'_>,
42         buf: &mut [u8],
43     ) -> Poll<io::Result<usize>> {
44         self.project().inner.poll_read(cx, buf)
45     }
46 }
47 
48 impl AsyncBufRead for Cursor<&[u8]> {
poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>>49     fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
50         self.project().inner.poll_fill_buf(cx)
51     }
52 
consume(self: Pin<&mut Self>, amt: usize)53     fn consume(self: Pin<&mut Self>, amt: usize) {
54         self.project().inner.consume(amt)
55     }
56 }
57 
58 impl AsyncSeek for Cursor<&[u8]> {
poll_seek( self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom, ) -> Poll<io::Result<u64>>59     fn poll_seek(
60         self: Pin<&mut Self>,
61         cx: &mut Context<'_>,
62         pos: SeekFrom,
63     ) -> Poll<io::Result<u64>> {
64         self.project().inner.poll_seek(cx, pos)
65     }
66 }
67 
68 struct MaybePending<'a> {
69     inner: &'a [u8],
70     ready_read: bool,
71     ready_fill_buf: bool,
72 }
73 
74 impl<'a> MaybePending<'a> {
new(inner: &'a [u8]) -> Self75     fn new(inner: &'a [u8]) -> Self {
76         Self { inner, ready_read: false, ready_fill_buf: false }
77     }
78 }
79 
80 impl AsyncRead for MaybePending<'_> {
poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<io::Result<usize>>81     fn poll_read(
82         mut self: Pin<&mut Self>,
83         cx: &mut Context<'_>,
84         buf: &mut [u8],
85     ) -> Poll<io::Result<usize>> {
86         if self.ready_read {
87             self.ready_read = false;
88             Pin::new(&mut self.inner).poll_read(cx, buf)
89         } else {
90             self.ready_read = true;
91             Poll::Pending
92         }
93     }
94 }
95 
96 impl AsyncBufRead for MaybePending<'_> {
poll_fill_buf(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<&[u8]>>97     fn poll_fill_buf(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
98         if self.ready_fill_buf {
99             self.ready_fill_buf = false;
100             if self.inner.is_empty() {
101                 return Poll::Ready(Ok(&[]));
102             }
103             let len = cmp::min(2, self.inner.len());
104             Poll::Ready(Ok(&self.inner[0..len]))
105         } else {
106             self.ready_fill_buf = true;
107             Poll::Pending
108         }
109     }
110 
consume(mut self: Pin<&mut Self>, amt: usize)111     fn consume(mut self: Pin<&mut Self>, amt: usize) {
112         self.inner = &self.inner[amt..];
113     }
114 }
115 
116 #[test]
test_buffered_reader()117 fn test_buffered_reader() {
118     block_on(async {
119         let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
120         let mut reader = BufReader::with_capacity(2, inner);
121 
122         let mut buf = [0, 0, 0];
123         let nread = reader.read(&mut buf).await.unwrap();
124         assert_eq!(nread, 3);
125         assert_eq!(buf, [5, 6, 7]);
126         assert_eq!(reader.buffer(), []);
127 
128         let mut buf = [0, 0];
129         let nread = reader.read(&mut buf).await.unwrap();
130         assert_eq!(nread, 2);
131         assert_eq!(buf, [0, 1]);
132         assert_eq!(reader.buffer(), []);
133 
134         let mut buf = [0];
135         let nread = reader.read(&mut buf).await.unwrap();
136         assert_eq!(nread, 1);
137         assert_eq!(buf, [2]);
138         assert_eq!(reader.buffer(), [3]);
139 
140         let mut buf = [0, 0, 0];
141         let nread = reader.read(&mut buf).await.unwrap();
142         assert_eq!(nread, 1);
143         assert_eq!(buf, [3, 0, 0]);
144         assert_eq!(reader.buffer(), []);
145 
146         let nread = reader.read(&mut buf).await.unwrap();
147         assert_eq!(nread, 1);
148         assert_eq!(buf, [4, 0, 0]);
149         assert_eq!(reader.buffer(), []);
150 
151         assert_eq!(reader.read(&mut buf).await.unwrap(), 0);
152     });
153 }
154 
155 #[test]
test_buffered_reader_seek()156 fn test_buffered_reader_seek() {
157     block_on(async {
158         let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
159         let reader = BufReader::with_capacity(2, Cursor::new(inner));
160         pin_mut!(reader);
161 
162         assert_eq!(reader.seek(SeekFrom::Start(3)).await.unwrap(), 3);
163         assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]);
164         assert!(reader.seek(SeekFrom::Current(i64::MIN)).await.is_err());
165         assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]);
166         assert_eq!(reader.seek(SeekFrom::Current(1)).await.unwrap(), 4);
167         assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[1, 2][..]);
168         reader.as_mut().consume(1);
169         assert_eq!(reader.seek(SeekFrom::Current(-2)).await.unwrap(), 3);
170     });
171 }
172 
173 #[test]
test_buffered_reader_seek_relative()174 fn test_buffered_reader_seek_relative() {
175     block_on(async {
176         let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
177         let reader = BufReader::with_capacity(2, Cursor::new(inner));
178         pin_mut!(reader);
179 
180         assert!(reader.as_mut().seek_relative(3).await.is_ok());
181         assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]);
182         assert!(reader.as_mut().seek_relative(0).await.is_ok());
183         assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]);
184         assert!(reader.as_mut().seek_relative(1).await.is_ok());
185         assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[1][..]);
186         assert!(reader.as_mut().seek_relative(-1).await.is_ok());
187         assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]);
188         assert!(reader.as_mut().seek_relative(2).await.is_ok());
189         assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[2, 3][..]);
190     });
191 }
192 
193 #[test]
test_buffered_reader_invalidated_after_read()194 fn test_buffered_reader_invalidated_after_read() {
195     block_on(async {
196         let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
197         let reader = BufReader::with_capacity(3, Cursor::new(inner));
198         pin_mut!(reader);
199 
200         assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[5, 6, 7][..]);
201         reader.as_mut().consume(3);
202 
203         let mut buffer = [0, 0, 0, 0, 0];
204         assert_eq!(reader.read(&mut buffer).await.unwrap(), 5);
205         assert_eq!(buffer, [0, 1, 2, 3, 4]);
206 
207         assert!(reader.as_mut().seek_relative(-2).await.is_ok());
208         let mut buffer = [0, 0];
209         assert_eq!(reader.read(&mut buffer).await.unwrap(), 2);
210         assert_eq!(buffer, [3, 4]);
211     });
212 }
213 
214 #[test]
test_buffered_reader_invalidated_after_seek()215 fn test_buffered_reader_invalidated_after_seek() {
216     block_on(async {
217         let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
218         let reader = BufReader::with_capacity(3, Cursor::new(inner));
219         pin_mut!(reader);
220 
221         assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[5, 6, 7][..]);
222         reader.as_mut().consume(3);
223 
224         assert!(reader.seek(SeekFrom::Current(5)).await.is_ok());
225 
226         assert!(reader.as_mut().seek_relative(-2).await.is_ok());
227         let mut buffer = [0, 0];
228         assert_eq!(reader.read(&mut buffer).await.unwrap(), 2);
229         assert_eq!(buffer, [3, 4]);
230     });
231 }
232 
233 #[test]
test_buffered_reader_seek_underflow()234 fn test_buffered_reader_seek_underflow() {
235     // gimmick reader that yields its position modulo 256 for each byte
236     struct PositionReader {
237         pos: u64,
238     }
239     impl io::Read for PositionReader {
240         fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
241             let len = buf.len();
242             for x in buf {
243                 *x = self.pos as u8;
244                 self.pos = self.pos.wrapping_add(1);
245             }
246             Ok(len)
247         }
248     }
249     impl io::Seek for PositionReader {
250         fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
251             match pos {
252                 SeekFrom::Start(n) => {
253                     self.pos = n;
254                 }
255                 SeekFrom::Current(n) => {
256                     self.pos = self.pos.wrapping_add(n as u64);
257                 }
258                 SeekFrom::End(n) => {
259                     self.pos = u64::MAX.wrapping_add(n as u64);
260                 }
261             }
262             Ok(self.pos)
263         }
264     }
265 
266     block_on(async {
267         let reader = BufReader::with_capacity(5, AllowStdIo::new(PositionReader { pos: 0 }));
268         pin_mut!(reader);
269         assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1, 2, 3, 4][..]);
270         assert_eq!(reader.seek(SeekFrom::End(-5)).await.unwrap(), u64::MAX - 5);
271         assert_eq!(reader.as_mut().fill_buf().await.unwrap().len(), 5);
272         // the following seek will require two underlying seeks
273         let expected = 9_223_372_036_854_775_802;
274         assert_eq!(reader.seek(SeekFrom::Current(i64::MIN)).await.unwrap(), expected);
275         assert_eq!(reader.as_mut().fill_buf().await.unwrap().len(), 5);
276         // seeking to 0 should empty the buffer.
277         assert_eq!(reader.seek(SeekFrom::Current(0)).await.unwrap(), expected);
278         assert_eq!(reader.get_ref().get_ref().pos, expected);
279     });
280 }
281 
282 #[test]
test_short_reads()283 fn test_short_reads() {
284     /// A dummy reader intended at testing short-reads propagation.
285     struct ShortReader {
286         lengths: Vec<usize>,
287     }
288 
289     impl io::Read for ShortReader {
290         fn read(&mut self, _: &mut [u8]) -> io::Result<usize> {
291             if self.lengths.is_empty() {
292                 Ok(0)
293             } else {
294                 Ok(self.lengths.remove(0))
295             }
296         }
297     }
298 
299     block_on(async {
300         let inner = ShortReader { lengths: vec![0, 1, 2, 0, 1, 0] };
301         let mut reader = BufReader::new(AllowStdIo::new(inner));
302         let mut buf = [0, 0];
303         assert_eq!(reader.read(&mut buf).await.unwrap(), 0);
304         assert_eq!(reader.read(&mut buf).await.unwrap(), 1);
305         assert_eq!(reader.read(&mut buf).await.unwrap(), 2);
306         assert_eq!(reader.read(&mut buf).await.unwrap(), 0);
307         assert_eq!(reader.read(&mut buf).await.unwrap(), 1);
308         assert_eq!(reader.read(&mut buf).await.unwrap(), 0);
309         assert_eq!(reader.read(&mut buf).await.unwrap(), 0);
310     });
311 }
312 
313 #[test]
maybe_pending()314 fn maybe_pending() {
315     let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
316     let mut reader = BufReader::with_capacity(2, MaybePending::new(inner));
317 
318     let mut buf = [0, 0, 0];
319     let nread = run(reader.read(&mut buf));
320     assert_eq!(nread.unwrap(), 3);
321     assert_eq!(buf, [5, 6, 7]);
322     assert_eq!(reader.buffer(), []);
323 
324     let mut buf = [0, 0];
325     let nread = run(reader.read(&mut buf));
326     assert_eq!(nread.unwrap(), 2);
327     assert_eq!(buf, [0, 1]);
328     assert_eq!(reader.buffer(), []);
329 
330     let mut buf = [0];
331     let nread = run(reader.read(&mut buf));
332     assert_eq!(nread.unwrap(), 1);
333     assert_eq!(buf, [2]);
334     assert_eq!(reader.buffer(), [3]);
335 
336     let mut buf = [0, 0, 0];
337     let nread = run(reader.read(&mut buf));
338     assert_eq!(nread.unwrap(), 1);
339     assert_eq!(buf, [3, 0, 0]);
340     assert_eq!(reader.buffer(), []);
341 
342     let nread = run(reader.read(&mut buf));
343     assert_eq!(nread.unwrap(), 1);
344     assert_eq!(buf, [4, 0, 0]);
345     assert_eq!(reader.buffer(), []);
346 
347     assert_eq!(run(reader.read(&mut buf)).unwrap(), 0);
348 }
349 
350 #[test]
maybe_pending_buf_read()351 fn maybe_pending_buf_read() {
352     let inner = MaybePending::new(&[0, 1, 2, 3, 1, 0]);
353     let mut reader = BufReader::with_capacity(2, inner);
354     let mut v = Vec::new();
355     run(reader.read_until(3, &mut v)).unwrap();
356     assert_eq!(v, [0, 1, 2, 3]);
357     v.clear();
358     run(reader.read_until(1, &mut v)).unwrap();
359     assert_eq!(v, [1]);
360     v.clear();
361     run(reader.read_until(8, &mut v)).unwrap();
362     assert_eq!(v, [0]);
363     v.clear();
364     run(reader.read_until(9, &mut v)).unwrap();
365     assert_eq!(v, []);
366 }
367 
368 // https://github.com/rust-lang/futures-rs/pull/1573#discussion_r281162309
369 #[test]
maybe_pending_seek()370 fn maybe_pending_seek() {
371     #[pin_project]
372     struct MaybePendingSeek<'a> {
373         #[pin]
374         inner: Cursor<&'a [u8]>,
375         ready: bool,
376     }
377 
378     impl<'a> MaybePendingSeek<'a> {
379         fn new(inner: &'a [u8]) -> Self {
380             Self { inner: Cursor::new(inner), ready: true }
381         }
382     }
383 
384     impl AsyncRead for MaybePendingSeek<'_> {
385         fn poll_read(
386             self: Pin<&mut Self>,
387             cx: &mut Context<'_>,
388             buf: &mut [u8],
389         ) -> Poll<io::Result<usize>> {
390             self.project().inner.poll_read(cx, buf)
391         }
392     }
393 
394     impl AsyncBufRead for MaybePendingSeek<'_> {
395         fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
396             self.project().inner.poll_fill_buf(cx)
397         }
398 
399         fn consume(self: Pin<&mut Self>, amt: usize) {
400             self.project().inner.consume(amt)
401         }
402     }
403 
404     impl AsyncSeek for MaybePendingSeek<'_> {
405         fn poll_seek(
406             mut self: Pin<&mut Self>,
407             cx: &mut Context<'_>,
408             pos: SeekFrom,
409         ) -> Poll<io::Result<u64>> {
410             if self.ready {
411                 *self.as_mut().project().ready = false;
412                 self.project().inner.poll_seek(cx, pos)
413             } else {
414                 *self.project().ready = true;
415                 Poll::Pending
416             }
417         }
418     }
419 
420     let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
421     let reader = BufReader::with_capacity(2, MaybePendingSeek::new(inner));
422     pin_mut!(reader);
423 
424     assert_eq!(run(reader.seek(SeekFrom::Current(3))).ok(), Some(3));
425     assert_eq!(run(reader.as_mut().fill_buf()).ok(), Some(&[0, 1][..]));
426     assert_eq!(run(reader.seek(SeekFrom::Current(i64::MIN))).ok(), None);
427     assert_eq!(run(reader.as_mut().fill_buf()).ok(), Some(&[0, 1][..]));
428     assert_eq!(run(reader.seek(SeekFrom::Current(1))).ok(), Some(4));
429     assert_eq!(run(reader.as_mut().fill_buf()).ok(), Some(&[1, 2][..]));
430     Pin::new(&mut reader).consume(1);
431     assert_eq!(run(reader.seek(SeekFrom::Current(-2))).ok(), Some(3));
432 }
433