1 use super::DEFAULT_BUF_SIZE; 2 use futures_core::ready; 3 use futures_core::task::{Context, Poll}; 4 use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSlice, SeekFrom}; 5 use pin_project_lite::pin_project; 6 use std::fmt; 7 use std::io::{self, Write}; 8 use std::pin::Pin; 9 use std::ptr; 10 11 pin_project! { 12 /// Wraps a writer and buffers its output. 13 /// 14 /// It can be excessively inefficient to work directly with something that 15 /// implements [`AsyncWrite`]. A `BufWriter` keeps an in-memory buffer of data and 16 /// writes it to an underlying writer in large, infrequent batches. 17 /// 18 /// `BufWriter` can improve the speed of programs that make *small* and 19 /// *repeated* write calls to the same file or network socket. It does not 20 /// help when writing very large amounts at once, or writing just one or a few 21 /// times. It also provides no advantage when writing to a destination that is 22 /// in memory, like a `Vec<u8>`. 23 /// 24 /// When the `BufWriter` is dropped, the contents of its buffer will be 25 /// discarded. Creating multiple instances of a `BufWriter` on the same 26 /// stream can cause data loss. If you need to write out the contents of its 27 /// buffer, you must manually call flush before the writer is dropped. 28 /// 29 /// [`AsyncWrite`]: futures_io::AsyncWrite 30 /// [`flush`]: super::AsyncWriteExt::flush 31 /// 32 // TODO: Examples 33 pub struct BufWriter<W> { 34 #[pin] 35 inner: W, 36 buf: Vec<u8>, 37 written: usize, 38 } 39 } 40 41 impl<W: AsyncWrite> BufWriter<W> { 42 /// Creates a new `BufWriter` with a default buffer capacity. The default is currently 8 KB, 43 /// but may change in the future. new(inner: W) -> Self44 pub fn new(inner: W) -> Self { 45 Self::with_capacity(DEFAULT_BUF_SIZE, inner) 46 } 47 48 /// Creates a new `BufWriter` with the specified buffer capacity. with_capacity(cap: usize, inner: W) -> Self49 pub fn with_capacity(cap: usize, inner: W) -> Self { 50 Self { inner, buf: Vec::with_capacity(cap), written: 0 } 51 } 52 flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>53 pub(super) fn flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { 54 let mut this = self.project(); 55 56 let len = this.buf.len(); 57 let mut ret = Ok(()); 58 while *this.written < len { 59 match ready!(this.inner.as_mut().poll_write(cx, &this.buf[*this.written..])) { 60 Ok(0) => { 61 ret = Err(io::Error::new( 62 io::ErrorKind::WriteZero, 63 "failed to write the buffered data", 64 )); 65 break; 66 } 67 Ok(n) => *this.written += n, 68 Err(e) => { 69 ret = Err(e); 70 break; 71 } 72 } 73 } 74 if *this.written > 0 { 75 this.buf.drain(..*this.written); 76 } 77 *this.written = 0; 78 Poll::Ready(ret) 79 } 80 81 delegate_access_inner!(inner, W, ()); 82 83 /// Returns a reference to the internally buffered data. buffer(&self) -> &[u8]84 pub fn buffer(&self) -> &[u8] { 85 &self.buf 86 } 87 88 /// Capacity of `buf`. how many chars can be held in buffer capacity(&self) -> usize89 pub(super) fn capacity(&self) -> usize { 90 self.buf.capacity() 91 } 92 93 /// Remaining number of bytes to reach `buf` 's capacity 94 #[inline] spare_capacity(&self) -> usize95 pub(super) fn spare_capacity(&self) -> usize { 96 self.buf.capacity() - self.buf.len() 97 } 98 99 /// Write a byte slice directly into buffer 100 /// 101 /// Will truncate the number of bytes written to `spare_capacity()` so you want to 102 /// calculate the size of your slice to avoid losing bytes 103 /// 104 /// Based on `std::io::BufWriter` write_to_buf(self: Pin<&mut Self>, buf: &[u8]) -> usize105 pub(super) fn write_to_buf(self: Pin<&mut Self>, buf: &[u8]) -> usize { 106 let available = self.spare_capacity(); 107 let amt_to_buffer = available.min(buf.len()); 108 109 // SAFETY: `amt_to_buffer` is <= buffer's spare capacity by construction. 110 unsafe { 111 self.write_to_buffer_unchecked(&buf[..amt_to_buffer]); 112 } 113 114 amt_to_buffer 115 } 116 117 /// Write byte slice directly into `self.buf` 118 /// 119 /// Based on `std::io::BufWriter` 120 #[inline] write_to_buffer_unchecked(self: Pin<&mut Self>, buf: &[u8])121 unsafe fn write_to_buffer_unchecked(self: Pin<&mut Self>, buf: &[u8]) { 122 debug_assert!(buf.len() <= self.spare_capacity()); 123 let this = self.project(); 124 let old_len = this.buf.len(); 125 let buf_len = buf.len(); 126 let src = buf.as_ptr(); 127 let dst = this.buf.as_mut_ptr().add(old_len); 128 ptr::copy_nonoverlapping(src, dst, buf_len); 129 this.buf.set_len(old_len + buf_len); 130 } 131 132 /// Write directly using `inner`, bypassing buffering inner_poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>133 pub(super) fn inner_poll_write( 134 self: Pin<&mut Self>, 135 cx: &mut Context<'_>, 136 buf: &[u8], 137 ) -> Poll<io::Result<usize>> { 138 self.project().inner.poll_write(cx, buf) 139 } 140 141 /// Write directly using `inner`, bypassing buffering inner_poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll<io::Result<usize>>142 pub(super) fn inner_poll_write_vectored( 143 self: Pin<&mut Self>, 144 cx: &mut Context<'_>, 145 bufs: &[IoSlice<'_>], 146 ) -> Poll<io::Result<usize>> { 147 self.project().inner.poll_write_vectored(cx, bufs) 148 } 149 } 150 151 impl<W: AsyncWrite> AsyncWrite for BufWriter<W> { poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>152 fn poll_write( 153 mut self: Pin<&mut Self>, 154 cx: &mut Context<'_>, 155 buf: &[u8], 156 ) -> Poll<io::Result<usize>> { 157 if self.buf.len() + buf.len() > self.buf.capacity() { 158 ready!(self.as_mut().flush_buf(cx))?; 159 } 160 if buf.len() >= self.buf.capacity() { 161 self.project().inner.poll_write(cx, buf) 162 } else { 163 Poll::Ready(self.project().buf.write(buf)) 164 } 165 } 166 poll_write_vectored( mut self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll<io::Result<usize>>167 fn poll_write_vectored( 168 mut self: Pin<&mut Self>, 169 cx: &mut Context<'_>, 170 bufs: &[IoSlice<'_>], 171 ) -> Poll<io::Result<usize>> { 172 let total_len = bufs.iter().map(|b| b.len()).sum::<usize>(); 173 if self.buf.len() + total_len > self.buf.capacity() { 174 ready!(self.as_mut().flush_buf(cx))?; 175 } 176 if total_len >= self.buf.capacity() { 177 self.project().inner.poll_write_vectored(cx, bufs) 178 } else { 179 Poll::Ready(self.project().buf.write_vectored(bufs)) 180 } 181 } 182 poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>183 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { 184 ready!(self.as_mut().flush_buf(cx))?; 185 self.project().inner.poll_flush(cx) 186 } 187 poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>188 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { 189 ready!(self.as_mut().flush_buf(cx))?; 190 self.project().inner.poll_close(cx) 191 } 192 } 193 194 impl<W: AsyncRead> AsyncRead for BufWriter<W> { 195 delegate_async_read!(inner); 196 } 197 198 impl<W: AsyncBufRead> AsyncBufRead for BufWriter<W> { 199 delegate_async_buf_read!(inner); 200 } 201 202 impl<W: fmt::Debug> fmt::Debug for BufWriter<W> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result203 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 204 f.debug_struct("BufWriter") 205 .field("writer", &self.inner) 206 .field("buffer", &format_args!("{}/{}", self.buf.len(), self.buf.capacity())) 207 .field("written", &self.written) 208 .finish() 209 } 210 } 211 212 impl<W: AsyncWrite + AsyncSeek> AsyncSeek for BufWriter<W> { 213 /// Seek to the offset, in bytes, in the underlying writer. 214 /// 215 /// Seeking always writes out the internal buffer before seeking. poll_seek( mut self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom, ) -> Poll<io::Result<u64>>216 fn poll_seek( 217 mut self: Pin<&mut Self>, 218 cx: &mut Context<'_>, 219 pos: SeekFrom, 220 ) -> Poll<io::Result<u64>> { 221 ready!(self.as_mut().flush_buf(cx))?; 222 self.project().inner.poll_seek(cx, pos) 223 } 224 } 225