1 use crate::io::util::DEFAULT_BUF_SIZE;
2 use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite};
3 
4 use pin_project_lite::pin_project;
5 use std::io::{self, Read};
6 use std::mem::MaybeUninit;
7 use std::pin::Pin;
8 use std::task::{Context, Poll};
9 use std::{cmp, fmt};
10 
11 pin_project! {
12     /// The `BufReader` struct adds buffering to any reader.
13     ///
14     /// It can be excessively inefficient to work directly with a [`AsyncRead`]
15     /// instance. A `BufReader` performs large, infrequent reads on the underlying
16     /// [`AsyncRead`] and maintains an in-memory buffer of the results.
17     ///
18     /// `BufReader` can improve the speed of programs that make *small* and
19     /// *repeated* read calls to the same file or network socket. It does not
20     /// help when reading very large amounts at once, or reading just one or a few
21     /// times. It also provides no advantage when reading from a source that is
22     /// already in memory, like a `Vec<u8>`.
23     ///
24     /// When the `BufReader` is dropped, the contents of its buffer will be
25     /// discarded. Creating multiple instances of a `BufReader` on the same
26     /// stream can cause data loss.
27     #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
28     pub struct BufReader<R> {
29         #[pin]
30         pub(super) inner: R,
31         pub(super) buf: Box<[u8]>,
32         pub(super) pos: usize,
33         pub(super) cap: usize,
34     }
35 }
36 
37 impl<R: AsyncRead> BufReader<R> {
38     /// Creates a new `BufReader` with a default buffer capacity. The default is currently 8 KB,
39     /// but may change in the future.
new(inner: R) -> Self40     pub fn new(inner: R) -> Self {
41         Self::with_capacity(DEFAULT_BUF_SIZE, inner)
42     }
43 
44     /// Creates a new `BufReader` with the specified buffer capacity.
with_capacity(capacity: usize, inner: R) -> Self45     pub fn with_capacity(capacity: usize, inner: R) -> Self {
46         unsafe {
47             let mut buffer = Vec::with_capacity(capacity);
48             buffer.set_len(capacity);
49 
50             {
51                 // Convert to MaybeUninit
52                 let b = &mut *(&mut buffer[..] as *mut [u8] as *mut [MaybeUninit<u8>]);
53                 inner.prepare_uninitialized_buffer(b);
54             }
55             Self {
56                 inner,
57                 buf: buffer.into_boxed_slice(),
58                 pos: 0,
59                 cap: 0,
60             }
61         }
62     }
63 
64     /// Gets a reference to the underlying reader.
65     ///
66     /// It is inadvisable to directly read from the underlying reader.
get_ref(&self) -> &R67     pub fn get_ref(&self) -> &R {
68         &self.inner
69     }
70 
71     /// Gets a mutable reference to the underlying reader.
72     ///
73     /// It is inadvisable to directly read from the underlying reader.
get_mut(&mut self) -> &mut R74     pub fn get_mut(&mut self) -> &mut R {
75         &mut self.inner
76     }
77 
78     /// Gets a pinned mutable reference to the underlying reader.
79     ///
80     /// It is inadvisable to directly read from the underlying reader.
get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R>81     pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
82         self.project().inner
83     }
84 
85     /// Consumes this `BufWriter`, returning the underlying reader.
86     ///
87     /// Note that any leftover data in the internal buffer is lost.
into_inner(self) -> R88     pub fn into_inner(self) -> R {
89         self.inner
90     }
91 
92     /// Returns a reference to the internally buffered data.
93     ///
94     /// Unlike `fill_buf`, this will not attempt to fill the buffer if it is empty.
buffer(&self) -> &[u8]95     pub fn buffer(&self) -> &[u8] {
96         &self.buf[self.pos..self.cap]
97     }
98 
99     /// Invalidates all data in the internal buffer.
100     #[inline]
discard_buffer(self: Pin<&mut Self>)101     fn discard_buffer(self: Pin<&mut Self>) {
102         let me = self.project();
103         *me.pos = 0;
104         *me.cap = 0;
105     }
106 }
107 
108 impl<R: AsyncRead> AsyncRead for BufReader<R> {
poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<io::Result<usize>>109     fn poll_read(
110         mut self: Pin<&mut Self>,
111         cx: &mut Context<'_>,
112         buf: &mut [u8],
113     ) -> Poll<io::Result<usize>> {
114         // If we don't have any buffered data and we're doing a massive read
115         // (larger than our internal buffer), bypass our internal buffer
116         // entirely.
117         if self.pos == self.cap && buf.len() >= self.buf.len() {
118             let res = ready!(self.as_mut().get_pin_mut().poll_read(cx, buf));
119             self.discard_buffer();
120             return Poll::Ready(res);
121         }
122         let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
123         let nread = rem.read(buf)?;
124         self.consume(nread);
125         Poll::Ready(Ok(nread))
126     }
127 
128     // we can't skip unconditionally because of the large buffer case in read.
prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool129     unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool {
130         self.inner.prepare_uninitialized_buffer(buf)
131     }
132 }
133 
134 impl<R: AsyncRead> AsyncBufRead for BufReader<R> {
poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>>135     fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
136         let me = self.project();
137 
138         // If we've reached the end of our internal buffer then we need to fetch
139         // some more data from the underlying reader.
140         // Branch using `>=` instead of the more correct `==`
141         // to tell the compiler that the pos..cap slice is always valid.
142         if *me.pos >= *me.cap {
143             debug_assert!(*me.pos == *me.cap);
144             *me.cap = ready!(me.inner.poll_read(cx, me.buf))?;
145             *me.pos = 0;
146         }
147         Poll::Ready(Ok(&me.buf[*me.pos..*me.cap]))
148     }
149 
consume(self: Pin<&mut Self>, amt: usize)150     fn consume(self: Pin<&mut Self>, amt: usize) {
151         let me = self.project();
152         *me.pos = cmp::min(*me.pos + amt, *me.cap);
153     }
154 }
155 
156 impl<R: AsyncRead + AsyncWrite> AsyncWrite for BufReader<R> {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>157     fn poll_write(
158         self: Pin<&mut Self>,
159         cx: &mut Context<'_>,
160         buf: &[u8],
161     ) -> Poll<io::Result<usize>> {
162         self.get_pin_mut().poll_write(cx, buf)
163     }
164 
poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>165     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
166         self.get_pin_mut().poll_flush(cx)
167     }
168 
poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>169     fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
170         self.get_pin_mut().poll_shutdown(cx)
171     }
172 }
173 
174 impl<R: fmt::Debug> fmt::Debug for BufReader<R> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result175     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
176         f.debug_struct("BufReader")
177             .field("reader", &self.inner)
178             .field(
179                 "buffer",
180                 &format_args!("{}/{}", self.cap - self.pos, self.buf.len()),
181             )
182             .finish()
183     }
184 }
185 
186 #[cfg(test)]
187 mod tests {
188     use super::*;
189 
190     #[test]
assert_unpin()191     fn assert_unpin() {
192         crate::is_unpin::<BufReader<()>>();
193     }
194 }
195