1 use crate::io::util::{BufReader, BufWriter};
2 use crate::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
3 
4 use pin_project_lite::pin_project;
5 use std::io::{self, IoSlice, SeekFrom};
6 use std::pin::Pin;
7 use std::task::{Context, Poll};
8 
9 pin_project! {
10     /// Wraps a type that is [`AsyncWrite`] and [`AsyncRead`], and buffers its input and output.
11     ///
12     /// It can be excessively inefficient to work directly with something that implements [`AsyncWrite`]
13     /// and [`AsyncRead`]. For example, every `write`, however small, has to traverse the syscall
14     /// interface, and similarly, every read has to do the same. The [`BufWriter`] and [`BufReader`]
15     /// types aid with these problems respectively, but do so in only one direction. `BufStream` wraps
16     /// one in the other so that both directions are buffered. See their documentation for details.
17     #[derive(Debug)]
18     #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
19     pub struct BufStream<RW> {
20         #[pin]
21         inner: BufReader<BufWriter<RW>>,
22     }
23 }
24 
25 impl<RW: AsyncRead + AsyncWrite> BufStream<RW> {
26     /// Wraps a type in both [`BufWriter`] and [`BufReader`].
27     ///
28     /// See the documentation for those types and [`BufStream`] for details.
new(stream: RW) -> BufStream<RW>29     pub fn new(stream: RW) -> BufStream<RW> {
30         BufStream {
31             inner: BufReader::new(BufWriter::new(stream)),
32         }
33     }
34 
35     /// Creates a `BufStream` with the specified [`BufReader`] capacity and [`BufWriter`]
36     /// capacity.
37     ///
38     /// See the documentation for those types and [`BufStream`] for details.
with_capacity( reader_capacity: usize, writer_capacity: usize, stream: RW, ) -> BufStream<RW>39     pub fn with_capacity(
40         reader_capacity: usize,
41         writer_capacity: usize,
42         stream: RW,
43     ) -> BufStream<RW> {
44         BufStream {
45             inner: BufReader::with_capacity(
46                 reader_capacity,
47                 BufWriter::with_capacity(writer_capacity, stream),
48             ),
49         }
50     }
51 
52     /// Gets a reference to the underlying I/O object.
53     ///
54     /// It is inadvisable to directly read from the underlying I/O object.
get_ref(&self) -> &RW55     pub fn get_ref(&self) -> &RW {
56         self.inner.get_ref().get_ref()
57     }
58 
59     /// Gets a mutable reference to the underlying I/O object.
60     ///
61     /// It is inadvisable to directly read from the underlying I/O object.
get_mut(&mut self) -> &mut RW62     pub fn get_mut(&mut self) -> &mut RW {
63         self.inner.get_mut().get_mut()
64     }
65 
66     /// Gets a pinned mutable reference to the underlying I/O object.
67     ///
68     /// It is inadvisable to directly read from the underlying I/O object.
get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut RW>69     pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut RW> {
70         self.project().inner.get_pin_mut().get_pin_mut()
71     }
72 
73     /// Consumes this `BufStream`, returning the underlying I/O object.
74     ///
75     /// Note that any leftover data in the internal buffer is lost.
into_inner(self) -> RW76     pub fn into_inner(self) -> RW {
77         self.inner.into_inner().into_inner()
78     }
79 }
80 
81 impl<RW> From<BufReader<BufWriter<RW>>> for BufStream<RW> {
from(b: BufReader<BufWriter<RW>>) -> Self82     fn from(b: BufReader<BufWriter<RW>>) -> Self {
83         BufStream { inner: b }
84     }
85 }
86 
87 impl<RW> From<BufWriter<BufReader<RW>>> for BufStream<RW> {
from(b: BufWriter<BufReader<RW>>) -> Self88     fn from(b: BufWriter<BufReader<RW>>) -> Self {
89         // we need to "invert" the reader and writer
90         let BufWriter {
91             inner:
92                 BufReader {
93                     inner,
94                     buf: rbuf,
95                     pos,
96                     cap,
97                     seek_state: rseek_state,
98                 },
99             buf: wbuf,
100             written,
101             seek_state: wseek_state,
102         } = b;
103 
104         BufStream {
105             inner: BufReader {
106                 inner: BufWriter {
107                     inner,
108                     buf: wbuf,
109                     written,
110                     seek_state: wseek_state,
111                 },
112                 buf: rbuf,
113                 pos,
114                 cap,
115                 seek_state: rseek_state,
116             },
117         }
118     }
119 }
120 
121 impl<RW: AsyncRead + AsyncWrite> AsyncWrite for BufStream<RW> {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>122     fn poll_write(
123         self: Pin<&mut Self>,
124         cx: &mut Context<'_>,
125         buf: &[u8],
126     ) -> Poll<io::Result<usize>> {
127         self.project().inner.poll_write(cx, buf)
128     }
129 
poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll<io::Result<usize>>130     fn poll_write_vectored(
131         self: Pin<&mut Self>,
132         cx: &mut Context<'_>,
133         bufs: &[IoSlice<'_>],
134     ) -> Poll<io::Result<usize>> {
135         self.project().inner.poll_write_vectored(cx, bufs)
136     }
137 
is_write_vectored(&self) -> bool138     fn is_write_vectored(&self) -> bool {
139         self.inner.is_write_vectored()
140     }
141 
poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>142     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
143         self.project().inner.poll_flush(cx)
144     }
145 
poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>146     fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
147         self.project().inner.poll_shutdown(cx)
148     }
149 }
150 
151 impl<RW: AsyncRead + AsyncWrite> AsyncRead for BufStream<RW> {
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>152     fn poll_read(
153         self: Pin<&mut Self>,
154         cx: &mut Context<'_>,
155         buf: &mut ReadBuf<'_>,
156     ) -> Poll<io::Result<()>> {
157         self.project().inner.poll_read(cx, buf)
158     }
159 }
160 
161 /// Seek to an offset, in bytes, in the underlying stream.
162 ///
163 /// The position used for seeking with `SeekFrom::Current(_)` is the
164 /// position the underlying stream would be at if the `BufStream` had no
165 /// internal buffer.
166 ///
167 /// Seeking always discards the internal buffer, even if the seek position
168 /// would otherwise fall within it. This guarantees that calling
169 /// `.into_inner()` immediately after a seek yields the underlying reader
170 /// at the same position.
171 ///
172 /// See [`AsyncSeek`] for more details.
173 ///
174 /// Note: In the edge case where you're seeking with `SeekFrom::Current(n)`
175 /// where `n` minus the internal buffer length overflows an `i64`, two
176 /// seeks will be performed instead of one. If the second seek returns
177 /// `Err`, the underlying reader will be left at the same position it would
178 /// have if you called `seek` with `SeekFrom::Current(0)`.
179 impl<RW: AsyncRead + AsyncWrite + AsyncSeek> AsyncSeek for BufStream<RW> {
start_seek(self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()>180     fn start_seek(self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()> {
181         self.project().inner.start_seek(position)
182     }
183 
poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>>184     fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
185         self.project().inner.poll_complete(cx)
186     }
187 }
188 
189 impl<RW: AsyncRead + AsyncWrite> AsyncBufRead for BufStream<RW> {
poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>>190     fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
191         self.project().inner.poll_fill_buf(cx)
192     }
193 
consume(self: Pin<&mut Self>, amt: usize)194     fn consume(self: Pin<&mut Self>, amt: usize) {
195         self.project().inner.consume(amt)
196     }
197 }
198 
199 #[cfg(test)]
200 mod tests {
201     use super::*;
202 
203     #[test]
assert_unpin()204     fn assert_unpin() {
205         crate::is_unpin::<BufStream<()>>();
206     }
207 }
208