1 use crate::io::util::{BufReader, BufWriter}; 2 use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite}; 3 4 use pin_project_lite::pin_project; 5 use std::io; 6 use std::mem::MaybeUninit; 7 use std::pin::Pin; 8 use std::task::{Context, Poll}; 9 10 pin_project! { 11 /// Wraps a type that is [`AsyncWrite`] and [`AsyncRead`], and buffers its input and output. 12 /// 13 /// It can be excessively inefficient to work directly with something that implements [`AsyncWrite`] 14 /// and [`AsyncRead`]. For example, every `write`, however small, has to traverse the syscall 15 /// interface, and similarly, every read has to do the same. The [`BufWriter`] and [`BufReader`] 16 /// types aid with these problems respectively, but do so in only one direction. `BufStream` wraps 17 /// one in the other so that both directions are buffered. See their documentation for details. 18 #[derive(Debug)] 19 #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] 20 pub struct BufStream<RW> { 21 #[pin] 22 inner: BufReader<BufWriter<RW>>, 23 } 24 } 25 26 impl<RW: AsyncRead + AsyncWrite> BufStream<RW> { 27 /// Wraps a type in both [`BufWriter`] and [`BufReader`]. 28 /// 29 /// See the documentation for those types and [`BufStream`] for details. new(stream: RW) -> BufStream<RW>30 pub fn new(stream: RW) -> BufStream<RW> { 31 BufStream { 32 inner: BufReader::new(BufWriter::new(stream)), 33 } 34 } 35 36 /// Creates a `BufStream` with the specified [`BufReader`] capacity and [`BufWriter`] 37 /// capacity. 38 /// 39 /// See the documentation for those types and [`BufStream`] for details. with_capacity( reader_capacity: usize, writer_capacity: usize, stream: RW, ) -> BufStream<RW>40 pub fn with_capacity( 41 reader_capacity: usize, 42 writer_capacity: usize, 43 stream: RW, 44 ) -> BufStream<RW> { 45 BufStream { 46 inner: BufReader::with_capacity( 47 reader_capacity, 48 BufWriter::with_capacity(writer_capacity, stream), 49 ), 50 } 51 } 52 53 /// Gets a reference to the underlying I/O object. 54 /// 55 /// It is inadvisable to directly read from the underlying I/O object. get_ref(&self) -> &RW56 pub fn get_ref(&self) -> &RW { 57 self.inner.get_ref().get_ref() 58 } 59 60 /// Gets a mutable reference to the underlying I/O object. 61 /// 62 /// It is inadvisable to directly read from the underlying I/O object. get_mut(&mut self) -> &mut RW63 pub fn get_mut(&mut self) -> &mut RW { 64 self.inner.get_mut().get_mut() 65 } 66 67 /// Gets a pinned mutable reference to the underlying I/O object. 68 /// 69 /// It is inadvisable to directly read from the underlying I/O object. get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut RW>70 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut RW> { 71 self.project().inner.get_pin_mut().get_pin_mut() 72 } 73 74 /// Consumes this `BufStream`, returning the underlying I/O object. 75 /// 76 /// Note that any leftover data in the internal buffer is lost. into_inner(self) -> RW77 pub fn into_inner(self) -> RW { 78 self.inner.into_inner().into_inner() 79 } 80 } 81 82 impl<RW> From<BufReader<BufWriter<RW>>> for BufStream<RW> { from(b: BufReader<BufWriter<RW>>) -> Self83 fn from(b: BufReader<BufWriter<RW>>) -> Self { 84 BufStream { inner: b } 85 } 86 } 87 88 impl<RW> From<BufWriter<BufReader<RW>>> for BufStream<RW> { from(b: BufWriter<BufReader<RW>>) -> Self89 fn from(b: BufWriter<BufReader<RW>>) -> Self { 90 // we need to "invert" the reader and writer 91 let BufWriter { 92 inner: 93 BufReader { 94 inner, 95 buf: rbuf, 96 pos, 97 cap, 98 }, 99 buf: wbuf, 100 written, 101 } = b; 102 103 BufStream { 104 inner: BufReader { 105 inner: BufWriter { 106 inner, 107 buf: wbuf, 108 written, 109 }, 110 buf: rbuf, 111 pos, 112 cap, 113 }, 114 } 115 } 116 } 117 118 impl<RW: AsyncRead + AsyncWrite> AsyncWrite for BufStream<RW> { poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>119 fn poll_write( 120 self: Pin<&mut Self>, 121 cx: &mut Context<'_>, 122 buf: &[u8], 123 ) -> Poll<io::Result<usize>> { 124 self.project().inner.poll_write(cx, buf) 125 } 126 poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>127 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { 128 self.project().inner.poll_flush(cx) 129 } 130 poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>131 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { 132 self.project().inner.poll_shutdown(cx) 133 } 134 } 135 136 impl<RW: AsyncRead + AsyncWrite> AsyncRead for BufStream<RW> { poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<io::Result<usize>>137 fn poll_read( 138 self: Pin<&mut Self>, 139 cx: &mut Context<'_>, 140 buf: &mut [u8], 141 ) -> Poll<io::Result<usize>> { 142 self.project().inner.poll_read(cx, buf) 143 } 144 145 // we can't skip unconditionally because of the large buffer case in read. prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool146 unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool { 147 self.inner.prepare_uninitialized_buffer(buf) 148 } 149 } 150 151 impl<RW: AsyncRead + AsyncWrite> AsyncBufRead for BufStream<RW> { poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>>152 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> { 153 self.project().inner.poll_fill_buf(cx) 154 } 155 consume(self: Pin<&mut Self>, amt: usize)156 fn consume(self: Pin<&mut Self>, amt: usize) { 157 self.project().inner.consume(amt) 158 } 159 } 160 161 #[cfg(test)] 162 mod tests { 163 use super::*; 164 165 #[test] assert_unpin()166 fn assert_unpin() { 167 crate::is_unpin::<BufStream<()>>(); 168 } 169 } 170