1 use crate::io::util::{BufReader, BufWriter}; 2 use crate::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, ReadBuf}; 3 4 use pin_project_lite::pin_project; 5 use std::io::{self, IoSlice, SeekFrom}; 6 use std::pin::Pin; 7 use std::task::{Context, Poll}; 8 9 pin_project! { 10 /// Wraps a type that is [`AsyncWrite`] and [`AsyncRead`], and buffers its input and output. 11 /// 12 /// It can be excessively inefficient to work directly with something that implements [`AsyncWrite`] 13 /// and [`AsyncRead`]. For example, every `write`, however small, has to traverse the syscall 14 /// interface, and similarly, every read has to do the same. The [`BufWriter`] and [`BufReader`] 15 /// types aid with these problems respectively, but do so in only one direction. `BufStream` wraps 16 /// one in the other so that both directions are buffered. See their documentation for details. 17 #[derive(Debug)] 18 #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] 19 pub struct BufStream<RW> { 20 #[pin] 21 inner: BufReader<BufWriter<RW>>, 22 } 23 } 24 25 impl<RW: AsyncRead + AsyncWrite> BufStream<RW> { 26 /// Wraps a type in both [`BufWriter`] and [`BufReader`]. 27 /// 28 /// See the documentation for those types and [`BufStream`] for details. new(stream: RW) -> BufStream<RW>29 pub fn new(stream: RW) -> BufStream<RW> { 30 BufStream { 31 inner: BufReader::new(BufWriter::new(stream)), 32 } 33 } 34 35 /// Creates a `BufStream` with the specified [`BufReader`] capacity and [`BufWriter`] 36 /// capacity. 37 /// 38 /// See the documentation for those types and [`BufStream`] for details. with_capacity( reader_capacity: usize, writer_capacity: usize, stream: RW, ) -> BufStream<RW>39 pub fn with_capacity( 40 reader_capacity: usize, 41 writer_capacity: usize, 42 stream: RW, 43 ) -> BufStream<RW> { 44 BufStream { 45 inner: BufReader::with_capacity( 46 reader_capacity, 47 BufWriter::with_capacity(writer_capacity, stream), 48 ), 49 } 50 } 51 52 /// Gets a reference to the underlying I/O object. 53 /// 54 /// It is inadvisable to directly read from the underlying I/O object. get_ref(&self) -> &RW55 pub fn get_ref(&self) -> &RW { 56 self.inner.get_ref().get_ref() 57 } 58 59 /// Gets a mutable reference to the underlying I/O object. 60 /// 61 /// It is inadvisable to directly read from the underlying I/O object. get_mut(&mut self) -> &mut RW62 pub fn get_mut(&mut self) -> &mut RW { 63 self.inner.get_mut().get_mut() 64 } 65 66 /// Gets a pinned mutable reference to the underlying I/O object. 67 /// 68 /// It is inadvisable to directly read from the underlying I/O object. get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut RW>69 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut RW> { 70 self.project().inner.get_pin_mut().get_pin_mut() 71 } 72 73 /// Consumes this `BufStream`, returning the underlying I/O object. 74 /// 75 /// Note that any leftover data in the internal buffer is lost. into_inner(self) -> RW76 pub fn into_inner(self) -> RW { 77 self.inner.into_inner().into_inner() 78 } 79 } 80 81 impl<RW> From<BufReader<BufWriter<RW>>> for BufStream<RW> { from(b: BufReader<BufWriter<RW>>) -> Self82 fn from(b: BufReader<BufWriter<RW>>) -> Self { 83 BufStream { inner: b } 84 } 85 } 86 87 impl<RW> From<BufWriter<BufReader<RW>>> for BufStream<RW> { from(b: BufWriter<BufReader<RW>>) -> Self88 fn from(b: BufWriter<BufReader<RW>>) -> Self { 89 // we need to "invert" the reader and writer 90 let BufWriter { 91 inner: 92 BufReader { 93 inner, 94 buf: rbuf, 95 pos, 96 cap, 97 seek_state: rseek_state, 98 }, 99 buf: wbuf, 100 written, 101 seek_state: wseek_state, 102 } = b; 103 104 BufStream { 105 inner: BufReader { 106 inner: BufWriter { 107 inner, 108 buf: wbuf, 109 written, 110 seek_state: wseek_state, 111 }, 112 buf: rbuf, 113 pos, 114 cap, 115 seek_state: rseek_state, 116 }, 117 } 118 } 119 } 120 121 impl<RW: AsyncRead + AsyncWrite> AsyncWrite for BufStream<RW> { poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>122 fn poll_write( 123 self: Pin<&mut Self>, 124 cx: &mut Context<'_>, 125 buf: &[u8], 126 ) -> Poll<io::Result<usize>> { 127 self.project().inner.poll_write(cx, buf) 128 } 129 poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll<io::Result<usize>>130 fn poll_write_vectored( 131 self: Pin<&mut Self>, 132 cx: &mut Context<'_>, 133 bufs: &[IoSlice<'_>], 134 ) -> Poll<io::Result<usize>> { 135 self.project().inner.poll_write_vectored(cx, bufs) 136 } 137 is_write_vectored(&self) -> bool138 fn is_write_vectored(&self) -> bool { 139 self.inner.is_write_vectored() 140 } 141 poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>142 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { 143 self.project().inner.poll_flush(cx) 144 } 145 poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>146 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { 147 self.project().inner.poll_shutdown(cx) 148 } 149 } 150 151 impl<RW: AsyncRead + AsyncWrite> AsyncRead for BufStream<RW> { poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>152 fn poll_read( 153 self: Pin<&mut Self>, 154 cx: &mut Context<'_>, 155 buf: &mut ReadBuf<'_>, 156 ) -> Poll<io::Result<()>> { 157 self.project().inner.poll_read(cx, buf) 158 } 159 } 160 161 /// Seek to an offset, in bytes, in the underlying stream. 162 /// 163 /// The position used for seeking with `SeekFrom::Current(_)` is the 164 /// position the underlying stream would be at if the `BufStream` had no 165 /// internal buffer. 166 /// 167 /// Seeking always discards the internal buffer, even if the seek position 168 /// would otherwise fall within it. This guarantees that calling 169 /// `.into_inner()` immediately after a seek yields the underlying reader 170 /// at the same position. 171 /// 172 /// See [`AsyncSeek`] for more details. 173 /// 174 /// Note: In the edge case where you're seeking with `SeekFrom::Current(n)` 175 /// where `n` minus the internal buffer length overflows an `i64`, two 176 /// seeks will be performed instead of one. If the second seek returns 177 /// `Err`, the underlying reader will be left at the same position it would 178 /// have if you called `seek` with `SeekFrom::Current(0)`. 179 impl<RW: AsyncRead + AsyncWrite + AsyncSeek> AsyncSeek for BufStream<RW> { start_seek(self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()>180 fn start_seek(self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()> { 181 self.project().inner.start_seek(position) 182 } 183 poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>>184 fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> { 185 self.project().inner.poll_complete(cx) 186 } 187 } 188 189 impl<RW: AsyncRead + AsyncWrite> AsyncBufRead for BufStream<RW> { poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>>190 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> { 191 self.project().inner.poll_fill_buf(cx) 192 } 193 consume(self: Pin<&mut Self>, amt: usize)194 fn consume(self: Pin<&mut Self>, amt: usize) { 195 self.project().inner.consume(amt) 196 } 197 } 198 199 #[cfg(test)] 200 mod tests { 201 use super::*; 202 203 #[test] assert_unpin()204 fn assert_unpin() { 205 crate::is_unpin::<BufStream<()>>(); 206 } 207 } 208