1 use futures::executor::block_on;
2 use futures::future::{Future, FutureExt};
3 use futures::io::{
4     AllowStdIo, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt,
5     BufReader, Cursor, SeekFrom,
6 };
7 use futures::task::{Context, Poll};
8 use futures_test::task::noop_context;
9 use std::cmp;
10 use std::io;
11 use std::pin::Pin;
12 
13 macro_rules! run_fill_buf {
14     ($reader:expr) => {{
15         let mut cx = noop_context();
16         loop {
17             if let Poll::Ready(x) = Pin::new(&mut $reader).poll_fill_buf(&mut cx) {
18                 break x;
19             }
20         }
21     }};
22 }
23 
run<F: Future + Unpin>(mut f: F) -> F::Output24 fn run<F: Future + Unpin>(mut f: F) -> F::Output {
25     let mut cx = noop_context();
26     loop {
27         if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
28             return x;
29         }
30     }
31 }
32 
33 struct MaybePending<'a> {
34     inner: &'a [u8],
35     ready_read: bool,
36     ready_fill_buf: bool,
37 }
38 
39 impl<'a> MaybePending<'a> {
new(inner: &'a [u8]) -> Self40     fn new(inner: &'a [u8]) -> Self {
41         Self { inner, ready_read: false, ready_fill_buf: false }
42     }
43 }
44 
45 impl AsyncRead for MaybePending<'_> {
poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<io::Result<usize>>46     fn poll_read(
47         mut self: Pin<&mut Self>,
48         cx: &mut Context<'_>,
49         buf: &mut [u8],
50     ) -> Poll<io::Result<usize>> {
51         if self.ready_read {
52             self.ready_read = false;
53             Pin::new(&mut self.inner).poll_read(cx, buf)
54         } else {
55             self.ready_read = true;
56             Poll::Pending
57         }
58     }
59 }
60 
61 impl AsyncBufRead for MaybePending<'_> {
poll_fill_buf(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<&[u8]>>62     fn poll_fill_buf(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
63         if self.ready_fill_buf {
64             self.ready_fill_buf = false;
65             if self.inner.is_empty() {
66                 return Poll::Ready(Ok(&[]));
67             }
68             let len = cmp::min(2, self.inner.len());
69             Poll::Ready(Ok(&self.inner[0..len]))
70         } else {
71             self.ready_fill_buf = true;
72             Poll::Pending
73         }
74     }
75 
consume(mut self: Pin<&mut Self>, amt: usize)76     fn consume(mut self: Pin<&mut Self>, amt: usize) {
77         self.inner = &self.inner[amt..];
78     }
79 }
80 
81 #[test]
test_buffered_reader()82 fn test_buffered_reader() {
83     let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
84     let mut reader = BufReader::with_capacity(2, inner);
85 
86     let mut buf = [0, 0, 0];
87     let nread = block_on(reader.read(&mut buf));
88     assert_eq!(nread.unwrap(), 3);
89     assert_eq!(buf, [5, 6, 7]);
90     assert_eq!(reader.buffer(), []);
91 
92     let mut buf = [0, 0];
93     let nread = block_on(reader.read(&mut buf));
94     assert_eq!(nread.unwrap(), 2);
95     assert_eq!(buf, [0, 1]);
96     assert_eq!(reader.buffer(), []);
97 
98     let mut buf = [0];
99     let nread = block_on(reader.read(&mut buf));
100     assert_eq!(nread.unwrap(), 1);
101     assert_eq!(buf, [2]);
102     assert_eq!(reader.buffer(), [3]);
103 
104     let mut buf = [0, 0, 0];
105     let nread = block_on(reader.read(&mut buf));
106     assert_eq!(nread.unwrap(), 1);
107     assert_eq!(buf, [3, 0, 0]);
108     assert_eq!(reader.buffer(), []);
109 
110     let nread = block_on(reader.read(&mut buf));
111     assert_eq!(nread.unwrap(), 1);
112     assert_eq!(buf, [4, 0, 0]);
113     assert_eq!(reader.buffer(), []);
114 
115     assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
116 }
117 
118 #[test]
test_buffered_reader_seek()119 fn test_buffered_reader_seek() {
120     let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
121     let mut reader = BufReader::with_capacity(2, Cursor::new(inner));
122 
123     assert_eq!(block_on(reader.seek(SeekFrom::Start(3))).ok(), Some(3));
124     assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..]));
125     assert_eq!(run(reader.seek(SeekFrom::Current(i64::min_value()))).ok(), None);
126     assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..]));
127     assert_eq!(block_on(reader.seek(SeekFrom::Current(1))).ok(), Some(4));
128     assert_eq!(run_fill_buf!(reader).ok(), Some(&[1, 2][..]));
129     Pin::new(&mut reader).consume(1);
130     assert_eq!(block_on(reader.seek(SeekFrom::Current(-2))).ok(), Some(3));
131 }
132 
133 #[test]
test_buffered_reader_seek_underflow()134 fn test_buffered_reader_seek_underflow() {
135     // gimmick reader that yields its position modulo 256 for each byte
136     struct PositionReader {
137         pos: u64,
138     }
139     impl io::Read for PositionReader {
140         fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
141             let len = buf.len();
142             for x in buf {
143                 *x = self.pos as u8;
144                 self.pos = self.pos.wrapping_add(1);
145             }
146             Ok(len)
147         }
148     }
149     impl io::Seek for PositionReader {
150         fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
151             match pos {
152                 SeekFrom::Start(n) => {
153                     self.pos = n;
154                 }
155                 SeekFrom::Current(n) => {
156                     self.pos = self.pos.wrapping_add(n as u64);
157                 }
158                 SeekFrom::End(n) => {
159                     self.pos = u64::max_value().wrapping_add(n as u64);
160                 }
161             }
162             Ok(self.pos)
163         }
164     }
165 
166     let mut reader = BufReader::with_capacity(5, AllowStdIo::new(PositionReader { pos: 0 }));
167     assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1, 2, 3, 4][..]));
168     assert_eq!(block_on(reader.seek(SeekFrom::End(-5))).ok(), Some(u64::max_value() - 5));
169     assert_eq!(run_fill_buf!(reader).ok().map(|s| s.len()), Some(5));
170     // the following seek will require two underlying seeks
171     let expected = 9_223_372_036_854_775_802;
172     assert_eq!(block_on(reader.seek(SeekFrom::Current(i64::min_value()))).ok(), Some(expected));
173     assert_eq!(run_fill_buf!(reader).ok().map(|s| s.len()), Some(5));
174     // seeking to 0 should empty the buffer.
175     assert_eq!(block_on(reader.seek(SeekFrom::Current(0))).ok(), Some(expected));
176     assert_eq!(reader.get_ref().get_ref().pos, expected);
177 }
178 
179 #[test]
test_short_reads()180 fn test_short_reads() {
181     /// A dummy reader intended at testing short-reads propagation.
182     struct ShortReader {
183         lengths: Vec<usize>,
184     }
185 
186     impl io::Read for ShortReader {
187         fn read(&mut self, _: &mut [u8]) -> io::Result<usize> {
188             if self.lengths.is_empty() {
189                 Ok(0)
190             } else {
191                 Ok(self.lengths.remove(0))
192             }
193         }
194     }
195 
196     let inner = ShortReader { lengths: vec![0, 1, 2, 0, 1, 0] };
197     let mut reader = BufReader::new(AllowStdIo::new(inner));
198     let mut buf = [0, 0];
199     assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
200     assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 1);
201     assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 2);
202     assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
203     assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 1);
204     assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
205     assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
206 }
207 
208 #[test]
maybe_pending()209 fn maybe_pending() {
210     let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
211     let mut reader = BufReader::with_capacity(2, MaybePending::new(inner));
212 
213     let mut buf = [0, 0, 0];
214     let nread = run(reader.read(&mut buf));
215     assert_eq!(nread.unwrap(), 3);
216     assert_eq!(buf, [5, 6, 7]);
217     assert_eq!(reader.buffer(), []);
218 
219     let mut buf = [0, 0];
220     let nread = run(reader.read(&mut buf));
221     assert_eq!(nread.unwrap(), 2);
222     assert_eq!(buf, [0, 1]);
223     assert_eq!(reader.buffer(), []);
224 
225     let mut buf = [0];
226     let nread = run(reader.read(&mut buf));
227     assert_eq!(nread.unwrap(), 1);
228     assert_eq!(buf, [2]);
229     assert_eq!(reader.buffer(), [3]);
230 
231     let mut buf = [0, 0, 0];
232     let nread = run(reader.read(&mut buf));
233     assert_eq!(nread.unwrap(), 1);
234     assert_eq!(buf, [3, 0, 0]);
235     assert_eq!(reader.buffer(), []);
236 
237     let nread = run(reader.read(&mut buf));
238     assert_eq!(nread.unwrap(), 1);
239     assert_eq!(buf, [4, 0, 0]);
240     assert_eq!(reader.buffer(), []);
241 
242     assert_eq!(run(reader.read(&mut buf)).unwrap(), 0);
243 }
244 
245 #[test]
maybe_pending_buf_read()246 fn maybe_pending_buf_read() {
247     let inner = MaybePending::new(&[0, 1, 2, 3, 1, 0]);
248     let mut reader = BufReader::with_capacity(2, inner);
249     let mut v = Vec::new();
250     run(reader.read_until(3, &mut v)).unwrap();
251     assert_eq!(v, [0, 1, 2, 3]);
252     v.clear();
253     run(reader.read_until(1, &mut v)).unwrap();
254     assert_eq!(v, [1]);
255     v.clear();
256     run(reader.read_until(8, &mut v)).unwrap();
257     assert_eq!(v, [0]);
258     v.clear();
259     run(reader.read_until(9, &mut v)).unwrap();
260     assert_eq!(v, []);
261 }
262 
263 // https://github.com/rust-lang/futures-rs/pull/1573#discussion_r281162309
264 #[test]
maybe_pending_seek()265 fn maybe_pending_seek() {
266     struct MaybePendingSeek<'a> {
267         inner: Cursor<&'a [u8]>,
268         ready: bool,
269     }
270 
271     impl<'a> MaybePendingSeek<'a> {
272         fn new(inner: &'a [u8]) -> Self {
273             Self { inner: Cursor::new(inner), ready: true }
274         }
275     }
276 
277     impl AsyncRead for MaybePendingSeek<'_> {
278         fn poll_read(
279             mut self: Pin<&mut Self>,
280             cx: &mut Context<'_>,
281             buf: &mut [u8],
282         ) -> Poll<io::Result<usize>> {
283             Pin::new(&mut self.inner).poll_read(cx, buf)
284         }
285     }
286 
287     impl AsyncBufRead for MaybePendingSeek<'_> {
288         fn poll_fill_buf(
289             mut self: Pin<&mut Self>,
290             cx: &mut Context<'_>,
291         ) -> Poll<io::Result<&[u8]>> {
292             let this: *mut Self = &mut *self as *mut _;
293             Pin::new(&mut unsafe { &mut *this }.inner).poll_fill_buf(cx)
294         }
295 
296         fn consume(mut self: Pin<&mut Self>, amt: usize) {
297             Pin::new(&mut self.inner).consume(amt)
298         }
299     }
300 
301     impl AsyncSeek for MaybePendingSeek<'_> {
302         fn poll_seek(
303             mut self: Pin<&mut Self>,
304             cx: &mut Context<'_>,
305             pos: SeekFrom,
306         ) -> Poll<io::Result<u64>> {
307             if self.ready {
308                 self.ready = false;
309                 Pin::new(&mut self.inner).poll_seek(cx, pos)
310             } else {
311                 self.ready = true;
312                 Poll::Pending
313             }
314         }
315     }
316 
317     let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
318     let mut reader = BufReader::with_capacity(2, MaybePendingSeek::new(inner));
319 
320     assert_eq!(run(reader.seek(SeekFrom::Current(3))).ok(), Some(3));
321     assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..]));
322     assert_eq!(run(reader.seek(SeekFrom::Current(i64::min_value()))).ok(), None);
323     assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..]));
324     assert_eq!(run(reader.seek(SeekFrom::Current(1))).ok(), Some(4));
325     assert_eq!(run_fill_buf!(reader).ok(), Some(&[1, 2][..]));
326     Pin::new(&mut reader).consume(1);
327     assert_eq!(run(reader.seek(SeekFrom::Current(-2))).ok(), Some(3));
328 }
329