1 use crate::io::util::DEFAULT_BUF_SIZE; 2 use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite}; 3 4 use bytes::Buf; 5 use pin_project_lite::pin_project; 6 use std::io::{self, Read}; 7 use std::mem::MaybeUninit; 8 use std::pin::Pin; 9 use std::task::{Context, Poll}; 10 use std::{cmp, fmt}; 11 12 pin_project! { 13 /// The `BufReader` struct adds buffering to any reader. 14 /// 15 /// It can be excessively inefficient to work directly with a [`AsyncRead`] 16 /// instance. A `BufReader` performs large, infrequent reads on the underlying 17 /// [`AsyncRead`] and maintains an in-memory buffer of the results. 18 /// 19 /// `BufReader` can improve the speed of programs that make *small* and 20 /// *repeated* read calls to the same file or network socket. It does not 21 /// help when reading very large amounts at once, or reading just one or a few 22 /// times. It also provides no advantage when reading from a source that is 23 /// already in memory, like a `Vec<u8>`. 24 /// 25 /// When the `BufReader` is dropped, the contents of its buffer will be 26 /// discarded. Creating multiple instances of a `BufReader` on the same 27 /// stream can cause data loss. 28 #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] 29 pub struct BufReader<R> { 30 #[pin] 31 pub(super) inner: R, 32 pub(super) buf: Box<[u8]>, 33 pub(super) pos: usize, 34 pub(super) cap: usize, 35 } 36 } 37 38 impl<R: AsyncRead> BufReader<R> { 39 /// Creates a new `BufReader` with a default buffer capacity. The default is currently 8 KB, 40 /// but may change in the future. new(inner: R) -> Self41 pub fn new(inner: R) -> Self { 42 Self::with_capacity(DEFAULT_BUF_SIZE, inner) 43 } 44 45 /// Creates a new `BufReader` with the specified buffer capacity. with_capacity(capacity: usize, inner: R) -> Self46 pub fn with_capacity(capacity: usize, inner: R) -> Self { 47 unsafe { 48 let mut buffer = Vec::with_capacity(capacity); 49 buffer.set_len(capacity); 50 51 { 52 // Convert to MaybeUninit 53 let b = &mut *(&mut buffer[..] as *mut [u8] as *mut [MaybeUninit<u8>]); 54 inner.prepare_uninitialized_buffer(b); 55 } 56 Self { 57 inner, 58 buf: buffer.into_boxed_slice(), 59 pos: 0, 60 cap: 0, 61 } 62 } 63 } 64 65 /// Gets a reference to the underlying reader. 66 /// 67 /// It is inadvisable to directly read from the underlying reader. get_ref(&self) -> &R68 pub fn get_ref(&self) -> &R { 69 &self.inner 70 } 71 72 /// Gets a mutable reference to the underlying reader. 73 /// 74 /// It is inadvisable to directly read from the underlying reader. get_mut(&mut self) -> &mut R75 pub fn get_mut(&mut self) -> &mut R { 76 &mut self.inner 77 } 78 79 /// Gets a pinned mutable reference to the underlying reader. 80 /// 81 /// It is inadvisable to directly read from the underlying reader. get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R>82 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> { 83 self.project().inner 84 } 85 86 /// Consumes this `BufReader`, returning the underlying reader. 87 /// 88 /// Note that any leftover data in the internal buffer is lost. into_inner(self) -> R89 pub fn into_inner(self) -> R { 90 self.inner 91 } 92 93 /// Returns a reference to the internally buffered data. 94 /// 95 /// Unlike `fill_buf`, this will not attempt to fill the buffer if it is empty. buffer(&self) -> &[u8]96 pub fn buffer(&self) -> &[u8] { 97 &self.buf[self.pos..self.cap] 98 } 99 100 /// Invalidates all data in the internal buffer. 101 #[inline] discard_buffer(self: Pin<&mut Self>)102 fn discard_buffer(self: Pin<&mut Self>) { 103 let me = self.project(); 104 *me.pos = 0; 105 *me.cap = 0; 106 } 107 } 108 109 impl<R: AsyncRead> AsyncRead for BufReader<R> { poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<io::Result<usize>>110 fn poll_read( 111 mut self: Pin<&mut Self>, 112 cx: &mut Context<'_>, 113 buf: &mut [u8], 114 ) -> Poll<io::Result<usize>> { 115 // If we don't have any buffered data and we're doing a massive read 116 // (larger than our internal buffer), bypass our internal buffer 117 // entirely. 118 if self.pos == self.cap && buf.len() >= self.buf.len() { 119 let res = ready!(self.as_mut().get_pin_mut().poll_read(cx, buf)); 120 self.discard_buffer(); 121 return Poll::Ready(res); 122 } 123 let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?; 124 let nread = rem.read(buf)?; 125 self.consume(nread); 126 Poll::Ready(Ok(nread)) 127 } 128 129 // we can't skip unconditionally because of the large buffer case in read. prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool130 unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool { 131 self.inner.prepare_uninitialized_buffer(buf) 132 } 133 } 134 135 impl<R: AsyncRead> AsyncBufRead for BufReader<R> { poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>>136 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> { 137 let me = self.project(); 138 139 // If we've reached the end of our internal buffer then we need to fetch 140 // some more data from the underlying reader. 141 // Branch using `>=` instead of the more correct `==` 142 // to tell the compiler that the pos..cap slice is always valid. 143 if *me.pos >= *me.cap { 144 debug_assert!(*me.pos == *me.cap); 145 *me.cap = ready!(me.inner.poll_read(cx, me.buf))?; 146 *me.pos = 0; 147 } 148 Poll::Ready(Ok(&me.buf[*me.pos..*me.cap])) 149 } 150 consume(self: Pin<&mut Self>, amt: usize)151 fn consume(self: Pin<&mut Self>, amt: usize) { 152 let me = self.project(); 153 *me.pos = cmp::min(*me.pos + amt, *me.cap); 154 } 155 } 156 157 impl<R: AsyncRead + AsyncWrite> AsyncWrite for BufReader<R> { poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>158 fn poll_write( 159 self: Pin<&mut Self>, 160 cx: &mut Context<'_>, 161 buf: &[u8], 162 ) -> Poll<io::Result<usize>> { 163 self.get_pin_mut().poll_write(cx, buf) 164 } 165 poll_write_buf<B: Buf>( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut B, ) -> Poll<io::Result<usize>>166 fn poll_write_buf<B: Buf>( 167 self: Pin<&mut Self>, 168 cx: &mut Context<'_>, 169 buf: &mut B, 170 ) -> Poll<io::Result<usize>> { 171 self.get_pin_mut().poll_write_buf(cx, buf) 172 } 173 poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>174 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { 175 self.get_pin_mut().poll_flush(cx) 176 } 177 poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>178 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { 179 self.get_pin_mut().poll_shutdown(cx) 180 } 181 } 182 183 impl<R: fmt::Debug> fmt::Debug for BufReader<R> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result184 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 185 f.debug_struct("BufReader") 186 .field("reader", &self.inner) 187 .field( 188 "buffer", 189 &format_args!("{}/{}", self.cap - self.pos, self.buf.len()), 190 ) 191 .finish() 192 } 193 } 194 195 #[cfg(test)] 196 mod tests { 197 use super::*; 198 199 #[test] assert_unpin()200 fn assert_unpin() { 201 crate::is_unpin::<BufReader<()>>(); 202 } 203 } 204