1 use crate::io::util::DEFAULT_BUF_SIZE;
2 use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite};
3 
4 use bytes::Buf;
5 use pin_project_lite::pin_project;
6 use std::io::{self, Read};
7 use std::mem::MaybeUninit;
8 use std::pin::Pin;
9 use std::task::{Context, Poll};
10 use std::{cmp, fmt};
11 
12 pin_project! {
13     /// The `BufReader` struct adds buffering to any reader.
14     ///
15     /// It can be excessively inefficient to work directly with a [`AsyncRead`]
16     /// instance. A `BufReader` performs large, infrequent reads on the underlying
17     /// [`AsyncRead`] and maintains an in-memory buffer of the results.
18     ///
19     /// `BufReader` can improve the speed of programs that make *small* and
20     /// *repeated* read calls to the same file or network socket. It does not
21     /// help when reading very large amounts at once, or reading just one or a few
22     /// times. It also provides no advantage when reading from a source that is
23     /// already in memory, like a `Vec<u8>`.
24     ///
25     /// When the `BufReader` is dropped, the contents of its buffer will be
26     /// discarded. Creating multiple instances of a `BufReader` on the same
27     /// stream can cause data loss.
28     #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
29     pub struct BufReader<R> {
30         #[pin]
31         pub(super) inner: R,
32         pub(super) buf: Box<[u8]>,
33         pub(super) pos: usize,
34         pub(super) cap: usize,
35     }
36 }
37 
38 impl<R: AsyncRead> BufReader<R> {
39     /// Creates a new `BufReader` with a default buffer capacity. The default is currently 8 KB,
40     /// but may change in the future.
new(inner: R) -> Self41     pub fn new(inner: R) -> Self {
42         Self::with_capacity(DEFAULT_BUF_SIZE, inner)
43     }
44 
45     /// Creates a new `BufReader` with the specified buffer capacity.
with_capacity(capacity: usize, inner: R) -> Self46     pub fn with_capacity(capacity: usize, inner: R) -> Self {
47         unsafe {
48             let mut buffer = Vec::with_capacity(capacity);
49             buffer.set_len(capacity);
50 
51             {
52                 // Convert to MaybeUninit
53                 let b = &mut *(&mut buffer[..] as *mut [u8] as *mut [MaybeUninit<u8>]);
54                 inner.prepare_uninitialized_buffer(b);
55             }
56             Self {
57                 inner,
58                 buf: buffer.into_boxed_slice(),
59                 pos: 0,
60                 cap: 0,
61             }
62         }
63     }
64 
65     /// Gets a reference to the underlying reader.
66     ///
67     /// It is inadvisable to directly read from the underlying reader.
get_ref(&self) -> &R68     pub fn get_ref(&self) -> &R {
69         &self.inner
70     }
71 
72     /// Gets a mutable reference to the underlying reader.
73     ///
74     /// It is inadvisable to directly read from the underlying reader.
get_mut(&mut self) -> &mut R75     pub fn get_mut(&mut self) -> &mut R {
76         &mut self.inner
77     }
78 
79     /// Gets a pinned mutable reference to the underlying reader.
80     ///
81     /// It is inadvisable to directly read from the underlying reader.
get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R>82     pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
83         self.project().inner
84     }
85 
86     /// Consumes this `BufReader`, returning the underlying reader.
87     ///
88     /// Note that any leftover data in the internal buffer is lost.
into_inner(self) -> R89     pub fn into_inner(self) -> R {
90         self.inner
91     }
92 
93     /// Returns a reference to the internally buffered data.
94     ///
95     /// Unlike `fill_buf`, this will not attempt to fill the buffer if it is empty.
buffer(&self) -> &[u8]96     pub fn buffer(&self) -> &[u8] {
97         &self.buf[self.pos..self.cap]
98     }
99 
100     /// Invalidates all data in the internal buffer.
101     #[inline]
discard_buffer(self: Pin<&mut Self>)102     fn discard_buffer(self: Pin<&mut Self>) {
103         let me = self.project();
104         *me.pos = 0;
105         *me.cap = 0;
106     }
107 }
108 
109 impl<R: AsyncRead> AsyncRead for BufReader<R> {
poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<io::Result<usize>>110     fn poll_read(
111         mut self: Pin<&mut Self>,
112         cx: &mut Context<'_>,
113         buf: &mut [u8],
114     ) -> Poll<io::Result<usize>> {
115         // If we don't have any buffered data and we're doing a massive read
116         // (larger than our internal buffer), bypass our internal buffer
117         // entirely.
118         if self.pos == self.cap && buf.len() >= self.buf.len() {
119             let res = ready!(self.as_mut().get_pin_mut().poll_read(cx, buf));
120             self.discard_buffer();
121             return Poll::Ready(res);
122         }
123         let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
124         let nread = rem.read(buf)?;
125         self.consume(nread);
126         Poll::Ready(Ok(nread))
127     }
128 
129     // we can't skip unconditionally because of the large buffer case in read.
prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool130     unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool {
131         self.inner.prepare_uninitialized_buffer(buf)
132     }
133 }
134 
135 impl<R: AsyncRead> AsyncBufRead for BufReader<R> {
poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>>136     fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
137         let me = self.project();
138 
139         // If we've reached the end of our internal buffer then we need to fetch
140         // some more data from the underlying reader.
141         // Branch using `>=` instead of the more correct `==`
142         // to tell the compiler that the pos..cap slice is always valid.
143         if *me.pos >= *me.cap {
144             debug_assert!(*me.pos == *me.cap);
145             *me.cap = ready!(me.inner.poll_read(cx, me.buf))?;
146             *me.pos = 0;
147         }
148         Poll::Ready(Ok(&me.buf[*me.pos..*me.cap]))
149     }
150 
consume(self: Pin<&mut Self>, amt: usize)151     fn consume(self: Pin<&mut Self>, amt: usize) {
152         let me = self.project();
153         *me.pos = cmp::min(*me.pos + amt, *me.cap);
154     }
155 }
156 
157 impl<R: AsyncRead + AsyncWrite> AsyncWrite for BufReader<R> {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>158     fn poll_write(
159         self: Pin<&mut Self>,
160         cx: &mut Context<'_>,
161         buf: &[u8],
162     ) -> Poll<io::Result<usize>> {
163         self.get_pin_mut().poll_write(cx, buf)
164     }
165 
poll_write_buf<B: Buf>( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut B, ) -> Poll<io::Result<usize>>166     fn poll_write_buf<B: Buf>(
167         self: Pin<&mut Self>,
168         cx: &mut Context<'_>,
169         buf: &mut B,
170     ) -> Poll<io::Result<usize>> {
171         self.get_pin_mut().poll_write_buf(cx, buf)
172     }
173 
poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>174     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
175         self.get_pin_mut().poll_flush(cx)
176     }
177 
poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>178     fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
179         self.get_pin_mut().poll_shutdown(cx)
180     }
181 }
182 
183 impl<R: fmt::Debug> fmt::Debug for BufReader<R> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result184     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
185         f.debug_struct("BufReader")
186             .field("reader", &self.inner)
187             .field(
188                 "buffer",
189                 &format_args!("{}/{}", self.cap - self.pos, self.buf.len()),
190             )
191             .finish()
192     }
193 }
194 
195 #[cfg(test)]
196 mod tests {
197     use super::*;
198 
199     #[test]
assert_unpin()200     fn assert_unpin() {
201         crate::is_unpin::<BufReader<()>>();
202     }
203 }
204