1 use super::DEFAULT_BUF_SIZE;
2 use futures_core::future::Future;
3 use futures_core::ready;
4 use futures_core::task::{Context, Poll};
5 #[cfg(feature = "read-initializer")]
6 use futures_io::Initializer;
7 use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSliceMut, SeekFrom};
8 use pin_project_lite::pin_project;
9 use std::io::{self, Read};
10 use std::pin::Pin;
11 use std::{cmp, fmt};
12 
13 pin_project! {
14     /// The `BufReader` struct adds buffering to any reader.
15     ///
16     /// It can be excessively inefficient to work directly with a [`AsyncRead`]
17     /// instance. A `BufReader` performs large, infrequent reads on the underlying
18     /// [`AsyncRead`] and maintains an in-memory buffer of the results.
19     ///
20     /// `BufReader` can improve the speed of programs that make *small* and
21     /// *repeated* read calls to the same file or network socket. It does not
22     /// help when reading very large amounts at once, or reading just one or a few
23     /// times. It also provides no advantage when reading from a source that is
24     /// already in memory, like a `Vec<u8>`.
25     ///
26     /// When the `BufReader` is dropped, the contents of its buffer will be
27     /// discarded. Creating multiple instances of a `BufReader` on the same
28     /// stream can cause data loss.
29     ///
30     /// [`AsyncRead`]: futures_io::AsyncRead
31     ///
32     // TODO: Examples
33     pub struct BufReader<R> {
34         #[pin]
35         inner: R,
36         buffer: Box<[u8]>,
37         pos: usize,
38         cap: usize,
39     }
40 }
41 
42 impl<R: AsyncRead> BufReader<R> {
43     /// Creates a new `BufReader` with a default buffer capacity. The default is currently 8 KB,
44     /// but may change in the future.
new(inner: R) -> Self45     pub fn new(inner: R) -> Self {
46         Self::with_capacity(DEFAULT_BUF_SIZE, inner)
47     }
48 
49     /// Creates a new `BufReader` with the specified buffer capacity.
with_capacity(capacity: usize, inner: R) -> Self50     pub fn with_capacity(capacity: usize, inner: R) -> Self {
51         unsafe {
52             let mut buffer = Vec::with_capacity(capacity);
53             buffer.set_len(capacity);
54             super::initialize(&inner, &mut buffer);
55             Self { inner, buffer: buffer.into_boxed_slice(), pos: 0, cap: 0 }
56         }
57     }
58 
59     delegate_access_inner!(inner, R, ());
60 
61     /// Returns a reference to the internally buffered data.
62     ///
63     /// Unlike `fill_buf`, this will not attempt to fill the buffer if it is empty.
buffer(&self) -> &[u8]64     pub fn buffer(&self) -> &[u8] {
65         &self.buffer[self.pos..self.cap]
66     }
67 
68     /// Invalidates all data in the internal buffer.
69     #[inline]
discard_buffer(self: Pin<&mut Self>)70     fn discard_buffer(self: Pin<&mut Self>) {
71         let this = self.project();
72         *this.pos = 0;
73         *this.cap = 0;
74     }
75 }
76 
77 impl<R: AsyncRead + AsyncSeek> BufReader<R> {
78     /// Seeks relative to the current position. If the new position lies within the buffer,
79     /// the buffer will not be flushed, allowing for more efficient seeks.
80     /// This method does not return the location of the underlying reader, so the caller
81     /// must track this information themselves if it is required.
seek_relative(self: Pin<&mut Self>, offset: i64) -> SeeKRelative<'_, R>82     pub fn seek_relative(self: Pin<&mut Self>, offset: i64) -> SeeKRelative<'_, R> {
83         SeeKRelative { inner: self, offset, first: true }
84     }
85 
86     /// Attempts to seek relative to the current position. If the new position lies within the buffer,
87     /// the buffer will not be flushed, allowing for more efficient seeks.
88     /// This method does not return the location of the underlying reader, so the caller
89     /// must track this information themselves if it is required.
poll_seek_relative( self: Pin<&mut Self>, cx: &mut Context<'_>, offset: i64, ) -> Poll<io::Result<()>>90     pub fn poll_seek_relative(
91         self: Pin<&mut Self>,
92         cx: &mut Context<'_>,
93         offset: i64,
94     ) -> Poll<io::Result<()>> {
95         let pos = self.pos as u64;
96         if offset < 0 {
97             if let Some(new_pos) = pos.checked_sub((-offset) as u64) {
98                 *self.project().pos = new_pos as usize;
99                 return Poll::Ready(Ok(()));
100             }
101         } else if let Some(new_pos) = pos.checked_add(offset as u64) {
102             if new_pos <= self.cap as u64 {
103                 *self.project().pos = new_pos as usize;
104                 return Poll::Ready(Ok(()));
105             }
106         }
107         self.poll_seek(cx, SeekFrom::Current(offset)).map(|res| res.map(|_| ()))
108     }
109 }
110 
111 impl<R: AsyncRead> AsyncRead for BufReader<R> {
poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<io::Result<usize>>112     fn poll_read(
113         mut self: Pin<&mut Self>,
114         cx: &mut Context<'_>,
115         buf: &mut [u8],
116     ) -> Poll<io::Result<usize>> {
117         // If we don't have any buffered data and we're doing a massive read
118         // (larger than our internal buffer), bypass our internal buffer
119         // entirely.
120         if self.pos == self.cap && buf.len() >= self.buffer.len() {
121             let res = ready!(self.as_mut().project().inner.poll_read(cx, buf));
122             self.discard_buffer();
123             return Poll::Ready(res);
124         }
125         let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
126         let nread = rem.read(buf)?;
127         self.consume(nread);
128         Poll::Ready(Ok(nread))
129     }
130 
poll_read_vectored( mut self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>], ) -> Poll<io::Result<usize>>131     fn poll_read_vectored(
132         mut self: Pin<&mut Self>,
133         cx: &mut Context<'_>,
134         bufs: &mut [IoSliceMut<'_>],
135     ) -> Poll<io::Result<usize>> {
136         let total_len = bufs.iter().map(|b| b.len()).sum::<usize>();
137         if self.pos == self.cap && total_len >= self.buffer.len() {
138             let res = ready!(self.as_mut().project().inner.poll_read_vectored(cx, bufs));
139             self.discard_buffer();
140             return Poll::Ready(res);
141         }
142         let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
143         let nread = rem.read_vectored(bufs)?;
144         self.consume(nread);
145         Poll::Ready(Ok(nread))
146     }
147 
148     // we can't skip unconditionally because of the large buffer case in read.
149     #[cfg(feature = "read-initializer")]
initializer(&self) -> Initializer150     unsafe fn initializer(&self) -> Initializer {
151         self.inner.initializer()
152     }
153 }
154 
155 impl<R: AsyncRead> AsyncBufRead for BufReader<R> {
poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>>156     fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
157         let this = self.project();
158 
159         // If we've reached the end of our internal buffer then we need to fetch
160         // some more data from the underlying reader.
161         // Branch using `>=` instead of the more correct `==`
162         // to tell the compiler that the pos..cap slice is always valid.
163         if *this.pos >= *this.cap {
164             debug_assert!(*this.pos == *this.cap);
165             *this.cap = ready!(this.inner.poll_read(cx, this.buffer))?;
166             *this.pos = 0;
167         }
168         Poll::Ready(Ok(&this.buffer[*this.pos..*this.cap]))
169     }
170 
consume(self: Pin<&mut Self>, amt: usize)171     fn consume(self: Pin<&mut Self>, amt: usize) {
172         *self.project().pos = cmp::min(self.pos + amt, self.cap);
173     }
174 }
175 
176 impl<R: AsyncWrite> AsyncWrite for BufReader<R> {
177     delegate_async_write!(inner);
178 }
179 
180 impl<R: fmt::Debug> fmt::Debug for BufReader<R> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result181     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
182         f.debug_struct("BufReader")
183             .field("reader", &self.inner)
184             .field("buffer", &format_args!("{}/{}", self.cap - self.pos, self.buffer.len()))
185             .finish()
186     }
187 }
188 
189 impl<R: AsyncRead + AsyncSeek> AsyncSeek for BufReader<R> {
190     /// Seek to an offset, in bytes, in the underlying reader.
191     ///
192     /// The position used for seeking with `SeekFrom::Current(_)` is the
193     /// position the underlying reader would be at if the `BufReader` had no
194     /// internal buffer.
195     ///
196     /// Seeking always discards the internal buffer, even if the seek position
197     /// would otherwise fall within it. This guarantees that calling
198     /// `.into_inner()` immediately after a seek yields the underlying reader
199     /// at the same position.
200     ///
201     /// To seek without discarding the internal buffer, use
202     /// [`BufReader::seek_relative`](BufReader::seek_relative) or
203     /// [`BufReader::poll_seek_relative`](BufReader::poll_seek_relative).
204     ///
205     /// See [`AsyncSeek`](futures_io::AsyncSeek) for more details.
206     ///
207     /// Note: In the edge case where you're seeking with `SeekFrom::Current(n)`
208     /// where `n` minus the internal buffer length overflows an `i64`, two
209     /// seeks will be performed instead of one. If the second seek returns
210     /// `Err`, the underlying reader will be left at the same position it would
211     /// have if you called `seek` with `SeekFrom::Current(0)`.
poll_seek( mut self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom, ) -> Poll<io::Result<u64>>212     fn poll_seek(
213         mut self: Pin<&mut Self>,
214         cx: &mut Context<'_>,
215         pos: SeekFrom,
216     ) -> Poll<io::Result<u64>> {
217         let result: u64;
218         if let SeekFrom::Current(n) = pos {
219             let remainder = (self.cap - self.pos) as i64;
220             // it should be safe to assume that remainder fits within an i64 as the alternative
221             // means we managed to allocate 8 exbibytes and that's absurd.
222             // But it's not out of the realm of possibility for some weird underlying reader to
223             // support seeking by i64::min_value() so we need to handle underflow when subtracting
224             // remainder.
225             if let Some(offset) = n.checked_sub(remainder) {
226                 result =
227                     ready!(self.as_mut().project().inner.poll_seek(cx, SeekFrom::Current(offset)))?;
228             } else {
229                 // seek backwards by our remainder, and then by the offset
230                 ready!(self.as_mut().project().inner.poll_seek(cx, SeekFrom::Current(-remainder)))?;
231                 self.as_mut().discard_buffer();
232                 result = ready!(self.as_mut().project().inner.poll_seek(cx, SeekFrom::Current(n)))?;
233             }
234         } else {
235             // Seeking with Start/End doesn't care about our buffer length.
236             result = ready!(self.as_mut().project().inner.poll_seek(cx, pos))?;
237         }
238         self.discard_buffer();
239         Poll::Ready(Ok(result))
240     }
241 }
242 
243 /// Future for the [`BufReader::seek_relative`](self::BufReader::seek_relative) method.
244 #[derive(Debug)]
245 #[must_use = "futures do nothing unless polled"]
246 pub struct SeeKRelative<'a, R> {
247     inner: Pin<&'a mut BufReader<R>>,
248     offset: i64,
249     first: bool,
250 }
251 
252 impl<R> Future for SeeKRelative<'_, R>
253 where
254     R: AsyncRead + AsyncSeek,
255 {
256     type Output = io::Result<()>;
257 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>258     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
259         let offset = self.offset;
260         if self.first {
261             self.first = false;
262             self.inner.as_mut().poll_seek_relative(cx, offset)
263         } else {
264             self.inner
265                 .as_mut()
266                 .as_mut()
267                 .poll_seek(cx, SeekFrom::Current(offset))
268                 .map(|res| res.map(|_| ()))
269         }
270     }
271 }
272