1 use futures::executor::block_on;
2 use futures::future::{Future, FutureExt};
3 use futures::io::{
4 AllowStdIo, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt,
5 BufReader, Cursor, SeekFrom,
6 };
7 use futures::task::{Context, Poll};
8 use futures_test::task::noop_context;
9 use std::cmp;
10 use std::io;
11 use std::pin::Pin;
12
13 macro_rules! run_fill_buf {
14 ($reader:expr) => {{
15 let mut cx = noop_context();
16 loop {
17 if let Poll::Ready(x) = Pin::new(&mut $reader).poll_fill_buf(&mut cx) {
18 break x;
19 }
20 }
21 }};
22 }
23
run<F: Future + Unpin>(mut f: F) -> F::Output24 fn run<F: Future + Unpin>(mut f: F) -> F::Output {
25 let mut cx = noop_context();
26 loop {
27 if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
28 return x;
29 }
30 }
31 }
32
33 struct MaybePending<'a> {
34 inner: &'a [u8],
35 ready_read: bool,
36 ready_fill_buf: bool,
37 }
38
39 impl<'a> MaybePending<'a> {
new(inner: &'a [u8]) -> Self40 fn new(inner: &'a [u8]) -> Self {
41 Self { inner, ready_read: false, ready_fill_buf: false }
42 }
43 }
44
45 impl AsyncRead for MaybePending<'_> {
poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<io::Result<usize>>46 fn poll_read(
47 mut self: Pin<&mut Self>,
48 cx: &mut Context<'_>,
49 buf: &mut [u8],
50 ) -> Poll<io::Result<usize>> {
51 if self.ready_read {
52 self.ready_read = false;
53 Pin::new(&mut self.inner).poll_read(cx, buf)
54 } else {
55 self.ready_read = true;
56 Poll::Pending
57 }
58 }
59 }
60
61 impl AsyncBufRead for MaybePending<'_> {
poll_fill_buf(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<&[u8]>>62 fn poll_fill_buf(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
63 if self.ready_fill_buf {
64 self.ready_fill_buf = false;
65 if self.inner.is_empty() {
66 return Poll::Ready(Ok(&[]));
67 }
68 let len = cmp::min(2, self.inner.len());
69 Poll::Ready(Ok(&self.inner[0..len]))
70 } else {
71 self.ready_fill_buf = true;
72 Poll::Pending
73 }
74 }
75
consume(mut self: Pin<&mut Self>, amt: usize)76 fn consume(mut self: Pin<&mut Self>, amt: usize) {
77 self.inner = &self.inner[amt..];
78 }
79 }
80
81 #[test]
test_buffered_reader()82 fn test_buffered_reader() {
83 let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
84 let mut reader = BufReader::with_capacity(2, inner);
85
86 let mut buf = [0, 0, 0];
87 let nread = block_on(reader.read(&mut buf));
88 assert_eq!(nread.unwrap(), 3);
89 assert_eq!(buf, [5, 6, 7]);
90 assert_eq!(reader.buffer(), []);
91
92 let mut buf = [0, 0];
93 let nread = block_on(reader.read(&mut buf));
94 assert_eq!(nread.unwrap(), 2);
95 assert_eq!(buf, [0, 1]);
96 assert_eq!(reader.buffer(), []);
97
98 let mut buf = [0];
99 let nread = block_on(reader.read(&mut buf));
100 assert_eq!(nread.unwrap(), 1);
101 assert_eq!(buf, [2]);
102 assert_eq!(reader.buffer(), [3]);
103
104 let mut buf = [0, 0, 0];
105 let nread = block_on(reader.read(&mut buf));
106 assert_eq!(nread.unwrap(), 1);
107 assert_eq!(buf, [3, 0, 0]);
108 assert_eq!(reader.buffer(), []);
109
110 let nread = block_on(reader.read(&mut buf));
111 assert_eq!(nread.unwrap(), 1);
112 assert_eq!(buf, [4, 0, 0]);
113 assert_eq!(reader.buffer(), []);
114
115 assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
116 }
117
118 #[test]
test_buffered_reader_seek()119 fn test_buffered_reader_seek() {
120 let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
121 let mut reader = BufReader::with_capacity(2, Cursor::new(inner));
122
123 assert_eq!(block_on(reader.seek(SeekFrom::Start(3))).ok(), Some(3));
124 assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..]));
125 assert_eq!(run(reader.seek(SeekFrom::Current(i64::min_value()))).ok(), None);
126 assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..]));
127 assert_eq!(block_on(reader.seek(SeekFrom::Current(1))).ok(), Some(4));
128 assert_eq!(run_fill_buf!(reader).ok(), Some(&[1, 2][..]));
129 Pin::new(&mut reader).consume(1);
130 assert_eq!(block_on(reader.seek(SeekFrom::Current(-2))).ok(), Some(3));
131 }
132
133 #[test]
test_buffered_reader_seek_underflow()134 fn test_buffered_reader_seek_underflow() {
135 // gimmick reader that yields its position modulo 256 for each byte
136 struct PositionReader {
137 pos: u64,
138 }
139 impl io::Read for PositionReader {
140 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
141 let len = buf.len();
142 for x in buf {
143 *x = self.pos as u8;
144 self.pos = self.pos.wrapping_add(1);
145 }
146 Ok(len)
147 }
148 }
149 impl io::Seek for PositionReader {
150 fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
151 match pos {
152 SeekFrom::Start(n) => {
153 self.pos = n;
154 }
155 SeekFrom::Current(n) => {
156 self.pos = self.pos.wrapping_add(n as u64);
157 }
158 SeekFrom::End(n) => {
159 self.pos = u64::max_value().wrapping_add(n as u64);
160 }
161 }
162 Ok(self.pos)
163 }
164 }
165
166 let mut reader = BufReader::with_capacity(5, AllowStdIo::new(PositionReader { pos: 0 }));
167 assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1, 2, 3, 4][..]));
168 assert_eq!(block_on(reader.seek(SeekFrom::End(-5))).ok(), Some(u64::max_value() - 5));
169 assert_eq!(run_fill_buf!(reader).ok().map(|s| s.len()), Some(5));
170 // the following seek will require two underlying seeks
171 let expected = 9_223_372_036_854_775_802;
172 assert_eq!(block_on(reader.seek(SeekFrom::Current(i64::min_value()))).ok(), Some(expected));
173 assert_eq!(run_fill_buf!(reader).ok().map(|s| s.len()), Some(5));
174 // seeking to 0 should empty the buffer.
175 assert_eq!(block_on(reader.seek(SeekFrom::Current(0))).ok(), Some(expected));
176 assert_eq!(reader.get_ref().get_ref().pos, expected);
177 }
178
179 #[test]
test_short_reads()180 fn test_short_reads() {
181 /// A dummy reader intended at testing short-reads propagation.
182 struct ShortReader {
183 lengths: Vec<usize>,
184 }
185
186 impl io::Read for ShortReader {
187 fn read(&mut self, _: &mut [u8]) -> io::Result<usize> {
188 if self.lengths.is_empty() {
189 Ok(0)
190 } else {
191 Ok(self.lengths.remove(0))
192 }
193 }
194 }
195
196 let inner = ShortReader { lengths: vec![0, 1, 2, 0, 1, 0] };
197 let mut reader = BufReader::new(AllowStdIo::new(inner));
198 let mut buf = [0, 0];
199 assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
200 assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 1);
201 assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 2);
202 assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
203 assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 1);
204 assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
205 assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
206 }
207
208 #[test]
maybe_pending()209 fn maybe_pending() {
210 let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
211 let mut reader = BufReader::with_capacity(2, MaybePending::new(inner));
212
213 let mut buf = [0, 0, 0];
214 let nread = run(reader.read(&mut buf));
215 assert_eq!(nread.unwrap(), 3);
216 assert_eq!(buf, [5, 6, 7]);
217 assert_eq!(reader.buffer(), []);
218
219 let mut buf = [0, 0];
220 let nread = run(reader.read(&mut buf));
221 assert_eq!(nread.unwrap(), 2);
222 assert_eq!(buf, [0, 1]);
223 assert_eq!(reader.buffer(), []);
224
225 let mut buf = [0];
226 let nread = run(reader.read(&mut buf));
227 assert_eq!(nread.unwrap(), 1);
228 assert_eq!(buf, [2]);
229 assert_eq!(reader.buffer(), [3]);
230
231 let mut buf = [0, 0, 0];
232 let nread = run(reader.read(&mut buf));
233 assert_eq!(nread.unwrap(), 1);
234 assert_eq!(buf, [3, 0, 0]);
235 assert_eq!(reader.buffer(), []);
236
237 let nread = run(reader.read(&mut buf));
238 assert_eq!(nread.unwrap(), 1);
239 assert_eq!(buf, [4, 0, 0]);
240 assert_eq!(reader.buffer(), []);
241
242 assert_eq!(run(reader.read(&mut buf)).unwrap(), 0);
243 }
244
245 #[test]
maybe_pending_buf_read()246 fn maybe_pending_buf_read() {
247 let inner = MaybePending::new(&[0, 1, 2, 3, 1, 0]);
248 let mut reader = BufReader::with_capacity(2, inner);
249 let mut v = Vec::new();
250 run(reader.read_until(3, &mut v)).unwrap();
251 assert_eq!(v, [0, 1, 2, 3]);
252 v.clear();
253 run(reader.read_until(1, &mut v)).unwrap();
254 assert_eq!(v, [1]);
255 v.clear();
256 run(reader.read_until(8, &mut v)).unwrap();
257 assert_eq!(v, [0]);
258 v.clear();
259 run(reader.read_until(9, &mut v)).unwrap();
260 assert_eq!(v, []);
261 }
262
263 // https://github.com/rust-lang/futures-rs/pull/1573#discussion_r281162309
264 #[test]
maybe_pending_seek()265 fn maybe_pending_seek() {
266 struct MaybePendingSeek<'a> {
267 inner: Cursor<&'a [u8]>,
268 ready: bool,
269 }
270
271 impl<'a> MaybePendingSeek<'a> {
272 fn new(inner: &'a [u8]) -> Self {
273 Self { inner: Cursor::new(inner), ready: true }
274 }
275 }
276
277 impl AsyncRead for MaybePendingSeek<'_> {
278 fn poll_read(
279 mut self: Pin<&mut Self>,
280 cx: &mut Context<'_>,
281 buf: &mut [u8],
282 ) -> Poll<io::Result<usize>> {
283 Pin::new(&mut self.inner).poll_read(cx, buf)
284 }
285 }
286
287 impl AsyncBufRead for MaybePendingSeek<'_> {
288 fn poll_fill_buf(
289 mut self: Pin<&mut Self>,
290 cx: &mut Context<'_>,
291 ) -> Poll<io::Result<&[u8]>> {
292 let this: *mut Self = &mut *self as *mut _;
293 Pin::new(&mut unsafe { &mut *this }.inner).poll_fill_buf(cx)
294 }
295
296 fn consume(mut self: Pin<&mut Self>, amt: usize) {
297 Pin::new(&mut self.inner).consume(amt)
298 }
299 }
300
301 impl AsyncSeek for MaybePendingSeek<'_> {
302 fn poll_seek(
303 mut self: Pin<&mut Self>,
304 cx: &mut Context<'_>,
305 pos: SeekFrom,
306 ) -> Poll<io::Result<u64>> {
307 if self.ready {
308 self.ready = false;
309 Pin::new(&mut self.inner).poll_seek(cx, pos)
310 } else {
311 self.ready = true;
312 Poll::Pending
313 }
314 }
315 }
316
317 let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
318 let mut reader = BufReader::with_capacity(2, MaybePendingSeek::new(inner));
319
320 assert_eq!(run(reader.seek(SeekFrom::Current(3))).ok(), Some(3));
321 assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..]));
322 assert_eq!(run(reader.seek(SeekFrom::Current(i64::min_value()))).ok(), None);
323 assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..]));
324 assert_eq!(run(reader.seek(SeekFrom::Current(1))).ok(), Some(4));
325 assert_eq!(run_fill_buf!(reader).ok(), Some(&[1, 2][..]));
326 Pin::new(&mut reader).consume(1);
327 assert_eq!(run(reader.seek(SeekFrom::Current(-2))).ok(), Some(3));
328 }
329