1 use super::DEFAULT_BUF_SIZE; 2 use futures_core::future::Future; 3 use futures_core::ready; 4 use futures_core::task::{Context, Poll}; 5 #[cfg(feature = "read-initializer")] 6 use futures_io::Initializer; 7 use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSliceMut, SeekFrom}; 8 use pin_project_lite::pin_project; 9 use std::io::{self, Read}; 10 use std::pin::Pin; 11 use std::{cmp, fmt}; 12 13 pin_project! { 14 /// The `BufReader` struct adds buffering to any reader. 15 /// 16 /// It can be excessively inefficient to work directly with a [`AsyncRead`] 17 /// instance. A `BufReader` performs large, infrequent reads on the underlying 18 /// [`AsyncRead`] and maintains an in-memory buffer of the results. 19 /// 20 /// `BufReader` can improve the speed of programs that make *small* and 21 /// *repeated* read calls to the same file or network socket. It does not 22 /// help when reading very large amounts at once, or reading just one or a few 23 /// times. It also provides no advantage when reading from a source that is 24 /// already in memory, like a `Vec<u8>`. 25 /// 26 /// When the `BufReader` is dropped, the contents of its buffer will be 27 /// discarded. Creating multiple instances of a `BufReader` on the same 28 /// stream can cause data loss. 29 /// 30 /// [`AsyncRead`]: futures_io::AsyncRead 31 /// 32 // TODO: Examples 33 pub struct BufReader<R> { 34 #[pin] 35 inner: R, 36 buffer: Box<[u8]>, 37 pos: usize, 38 cap: usize, 39 } 40 } 41 42 impl<R: AsyncRead> BufReader<R> { 43 /// Creates a new `BufReader` with a default buffer capacity. The default is currently 8 KB, 44 /// but may change in the future. new(inner: R) -> Self45 pub fn new(inner: R) -> Self { 46 Self::with_capacity(DEFAULT_BUF_SIZE, inner) 47 } 48 49 /// Creates a new `BufReader` with the specified buffer capacity. with_capacity(capacity: usize, inner: R) -> Self50 pub fn with_capacity(capacity: usize, inner: R) -> Self { 51 unsafe { 52 let mut buffer = Vec::with_capacity(capacity); 53 buffer.set_len(capacity); 54 super::initialize(&inner, &mut buffer); 55 Self { inner, buffer: buffer.into_boxed_slice(), pos: 0, cap: 0 } 56 } 57 } 58 59 delegate_access_inner!(inner, R, ()); 60 61 /// Returns a reference to the internally buffered data. 62 /// 63 /// Unlike `fill_buf`, this will not attempt to fill the buffer if it is empty. buffer(&self) -> &[u8]64 pub fn buffer(&self) -> &[u8] { 65 &self.buffer[self.pos..self.cap] 66 } 67 68 /// Invalidates all data in the internal buffer. 69 #[inline] discard_buffer(self: Pin<&mut Self>)70 fn discard_buffer(self: Pin<&mut Self>) { 71 let this = self.project(); 72 *this.pos = 0; 73 *this.cap = 0; 74 } 75 } 76 77 impl<R: AsyncRead + AsyncSeek> BufReader<R> { 78 /// Seeks relative to the current position. If the new position lies within the buffer, 79 /// the buffer will not be flushed, allowing for more efficient seeks. 80 /// This method does not return the location of the underlying reader, so the caller 81 /// must track this information themselves if it is required. seek_relative(self: Pin<&mut Self>, offset: i64) -> SeeKRelative<'_, R>82 pub fn seek_relative(self: Pin<&mut Self>, offset: i64) -> SeeKRelative<'_, R> { 83 SeeKRelative { inner: self, offset, first: true } 84 } 85 86 /// Attempts to seek relative to the current position. If the new position lies within the buffer, 87 /// the buffer will not be flushed, allowing for more efficient seeks. 88 /// This method does not return the location of the underlying reader, so the caller 89 /// must track this information themselves if it is required. poll_seek_relative( self: Pin<&mut Self>, cx: &mut Context<'_>, offset: i64, ) -> Poll<io::Result<()>>90 pub fn poll_seek_relative( 91 self: Pin<&mut Self>, 92 cx: &mut Context<'_>, 93 offset: i64, 94 ) -> Poll<io::Result<()>> { 95 let pos = self.pos as u64; 96 if offset < 0 { 97 if let Some(new_pos) = pos.checked_sub((-offset) as u64) { 98 *self.project().pos = new_pos as usize; 99 return Poll::Ready(Ok(())); 100 } 101 } else if let Some(new_pos) = pos.checked_add(offset as u64) { 102 if new_pos <= self.cap as u64 { 103 *self.project().pos = new_pos as usize; 104 return Poll::Ready(Ok(())); 105 } 106 } 107 self.poll_seek(cx, SeekFrom::Current(offset)).map(|res| res.map(|_| ())) 108 } 109 } 110 111 impl<R: AsyncRead> AsyncRead for BufReader<R> { poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<io::Result<usize>>112 fn poll_read( 113 mut self: Pin<&mut Self>, 114 cx: &mut Context<'_>, 115 buf: &mut [u8], 116 ) -> Poll<io::Result<usize>> { 117 // If we don't have any buffered data and we're doing a massive read 118 // (larger than our internal buffer), bypass our internal buffer 119 // entirely. 120 if self.pos == self.cap && buf.len() >= self.buffer.len() { 121 let res = ready!(self.as_mut().project().inner.poll_read(cx, buf)); 122 self.discard_buffer(); 123 return Poll::Ready(res); 124 } 125 let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?; 126 let nread = rem.read(buf)?; 127 self.consume(nread); 128 Poll::Ready(Ok(nread)) 129 } 130 poll_read_vectored( mut self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>], ) -> Poll<io::Result<usize>>131 fn poll_read_vectored( 132 mut self: Pin<&mut Self>, 133 cx: &mut Context<'_>, 134 bufs: &mut [IoSliceMut<'_>], 135 ) -> Poll<io::Result<usize>> { 136 let total_len = bufs.iter().map(|b| b.len()).sum::<usize>(); 137 if self.pos == self.cap && total_len >= self.buffer.len() { 138 let res = ready!(self.as_mut().project().inner.poll_read_vectored(cx, bufs)); 139 self.discard_buffer(); 140 return Poll::Ready(res); 141 } 142 let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?; 143 let nread = rem.read_vectored(bufs)?; 144 self.consume(nread); 145 Poll::Ready(Ok(nread)) 146 } 147 148 // we can't skip unconditionally because of the large buffer case in read. 149 #[cfg(feature = "read-initializer")] initializer(&self) -> Initializer150 unsafe fn initializer(&self) -> Initializer { 151 self.inner.initializer() 152 } 153 } 154 155 impl<R: AsyncRead> AsyncBufRead for BufReader<R> { poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>>156 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> { 157 let this = self.project(); 158 159 // If we've reached the end of our internal buffer then we need to fetch 160 // some more data from the underlying reader. 161 // Branch using `>=` instead of the more correct `==` 162 // to tell the compiler that the pos..cap slice is always valid. 163 if *this.pos >= *this.cap { 164 debug_assert!(*this.pos == *this.cap); 165 *this.cap = ready!(this.inner.poll_read(cx, this.buffer))?; 166 *this.pos = 0; 167 } 168 Poll::Ready(Ok(&this.buffer[*this.pos..*this.cap])) 169 } 170 consume(self: Pin<&mut Self>, amt: usize)171 fn consume(self: Pin<&mut Self>, amt: usize) { 172 *self.project().pos = cmp::min(self.pos + amt, self.cap); 173 } 174 } 175 176 impl<R: AsyncWrite> AsyncWrite for BufReader<R> { 177 delegate_async_write!(inner); 178 } 179 180 impl<R: fmt::Debug> fmt::Debug for BufReader<R> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result181 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 182 f.debug_struct("BufReader") 183 .field("reader", &self.inner) 184 .field("buffer", &format_args!("{}/{}", self.cap - self.pos, self.buffer.len())) 185 .finish() 186 } 187 } 188 189 impl<R: AsyncRead + AsyncSeek> AsyncSeek for BufReader<R> { 190 /// Seek to an offset, in bytes, in the underlying reader. 191 /// 192 /// The position used for seeking with `SeekFrom::Current(_)` is the 193 /// position the underlying reader would be at if the `BufReader` had no 194 /// internal buffer. 195 /// 196 /// Seeking always discards the internal buffer, even if the seek position 197 /// would otherwise fall within it. This guarantees that calling 198 /// `.into_inner()` immediately after a seek yields the underlying reader 199 /// at the same position. 200 /// 201 /// To seek without discarding the internal buffer, use 202 /// [`BufReader::seek_relative`](BufReader::seek_relative) or 203 /// [`BufReader::poll_seek_relative`](BufReader::poll_seek_relative). 204 /// 205 /// See [`AsyncSeek`](futures_io::AsyncSeek) for more details. 206 /// 207 /// Note: In the edge case where you're seeking with `SeekFrom::Current(n)` 208 /// where `n` minus the internal buffer length overflows an `i64`, two 209 /// seeks will be performed instead of one. If the second seek returns 210 /// `Err`, the underlying reader will be left at the same position it would 211 /// have if you called `seek` with `SeekFrom::Current(0)`. poll_seek( mut self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom, ) -> Poll<io::Result<u64>>212 fn poll_seek( 213 mut self: Pin<&mut Self>, 214 cx: &mut Context<'_>, 215 pos: SeekFrom, 216 ) -> Poll<io::Result<u64>> { 217 let result: u64; 218 if let SeekFrom::Current(n) = pos { 219 let remainder = (self.cap - self.pos) as i64; 220 // it should be safe to assume that remainder fits within an i64 as the alternative 221 // means we managed to allocate 8 exbibytes and that's absurd. 222 // But it's not out of the realm of possibility for some weird underlying reader to 223 // support seeking by i64::min_value() so we need to handle underflow when subtracting 224 // remainder. 225 if let Some(offset) = n.checked_sub(remainder) { 226 result = 227 ready!(self.as_mut().project().inner.poll_seek(cx, SeekFrom::Current(offset)))?; 228 } else { 229 // seek backwards by our remainder, and then by the offset 230 ready!(self.as_mut().project().inner.poll_seek(cx, SeekFrom::Current(-remainder)))?; 231 self.as_mut().discard_buffer(); 232 result = ready!(self.as_mut().project().inner.poll_seek(cx, SeekFrom::Current(n)))?; 233 } 234 } else { 235 // Seeking with Start/End doesn't care about our buffer length. 236 result = ready!(self.as_mut().project().inner.poll_seek(cx, pos))?; 237 } 238 self.discard_buffer(); 239 Poll::Ready(Ok(result)) 240 } 241 } 242 243 /// Future for the [`BufReader::seek_relative`](self::BufReader::seek_relative) method. 244 #[derive(Debug)] 245 #[must_use = "futures do nothing unless polled"] 246 pub struct SeeKRelative<'a, R> { 247 inner: Pin<&'a mut BufReader<R>>, 248 offset: i64, 249 first: bool, 250 } 251 252 impl<R> Future for SeeKRelative<'_, R> 253 where 254 R: AsyncRead + AsyncSeek, 255 { 256 type Output = io::Result<()>; 257 poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>258 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 259 let offset = self.offset; 260 if self.first { 261 self.first = false; 262 self.inner.as_mut().poll_seek_relative(cx, offset) 263 } else { 264 self.inner 265 .as_mut() 266 .as_mut() 267 .poll_seek(cx, SeekFrom::Current(offset)) 268 .map(|res| res.map(|_| ())) 269 } 270 } 271 } 272