1 use crate::io::util::{BufReader, BufWriter};
2 use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite};
3 
4 use pin_project_lite::pin_project;
5 use std::io;
6 use std::mem::MaybeUninit;
7 use std::pin::Pin;
8 use std::task::{Context, Poll};
9 
10 pin_project! {
11     /// Wraps a type that is [`AsyncWrite`] and [`AsyncRead`], and buffers its input and output.
12     ///
13     /// It can be excessively inefficient to work directly with something that implements [`AsyncWrite`]
14     /// and [`AsyncRead`]. For example, every `write`, however small, has to traverse the syscall
15     /// interface, and similarly, every read has to do the same. The [`BufWriter`] and [`BufReader`]
16     /// types aid with these problems respectively, but do so in only one direction. `BufStream` wraps
17     /// one in the other so that both directions are buffered. See their documentation for details.
18     #[derive(Debug)]
19     #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
20     pub struct BufStream<RW> {
21         #[pin]
22         inner: BufReader<BufWriter<RW>>,
23     }
24 }
25 
26 impl<RW: AsyncRead + AsyncWrite> BufStream<RW> {
27     /// Wraps a type in both [`BufWriter`] and [`BufReader`].
28     ///
29     /// See the documentation for those types and [`BufStream`] for details.
new(stream: RW) -> BufStream<RW>30     pub fn new(stream: RW) -> BufStream<RW> {
31         BufStream {
32             inner: BufReader::new(BufWriter::new(stream)),
33         }
34     }
35 
36     /// Creates a `BufStream` with the specified [`BufReader`] capacity and [`BufWriter`]
37     /// capacity.
38     ///
39     /// See the documentation for those types and [`BufStream`] for details.
with_capacity( reader_capacity: usize, writer_capacity: usize, stream: RW, ) -> BufStream<RW>40     pub fn with_capacity(
41         reader_capacity: usize,
42         writer_capacity: usize,
43         stream: RW,
44     ) -> BufStream<RW> {
45         BufStream {
46             inner: BufReader::with_capacity(
47                 reader_capacity,
48                 BufWriter::with_capacity(writer_capacity, stream),
49             ),
50         }
51     }
52 
53     /// Gets a reference to the underlying I/O object.
54     ///
55     /// It is inadvisable to directly read from the underlying I/O object.
get_ref(&self) -> &RW56     pub fn get_ref(&self) -> &RW {
57         self.inner.get_ref().get_ref()
58     }
59 
60     /// Gets a mutable reference to the underlying I/O object.
61     ///
62     /// It is inadvisable to directly read from the underlying I/O object.
get_mut(&mut self) -> &mut RW63     pub fn get_mut(&mut self) -> &mut RW {
64         self.inner.get_mut().get_mut()
65     }
66 
67     /// Gets a pinned mutable reference to the underlying I/O object.
68     ///
69     /// It is inadvisable to directly read from the underlying I/O object.
get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut RW>70     pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut RW> {
71         self.project().inner.get_pin_mut().get_pin_mut()
72     }
73 
74     /// Consumes this `BufStream`, returning the underlying I/O object.
75     ///
76     /// Note that any leftover data in the internal buffer is lost.
into_inner(self) -> RW77     pub fn into_inner(self) -> RW {
78         self.inner.into_inner().into_inner()
79     }
80 }
81 
82 impl<RW> From<BufReader<BufWriter<RW>>> for BufStream<RW> {
from(b: BufReader<BufWriter<RW>>) -> Self83     fn from(b: BufReader<BufWriter<RW>>) -> Self {
84         BufStream { inner: b }
85     }
86 }
87 
88 impl<RW> From<BufWriter<BufReader<RW>>> for BufStream<RW> {
from(b: BufWriter<BufReader<RW>>) -> Self89     fn from(b: BufWriter<BufReader<RW>>) -> Self {
90         // we need to "invert" the reader and writer
91         let BufWriter {
92             inner:
93                 BufReader {
94                     inner,
95                     buf: rbuf,
96                     pos,
97                     cap,
98                 },
99             buf: wbuf,
100             written,
101         } = b;
102 
103         BufStream {
104             inner: BufReader {
105                 inner: BufWriter {
106                     inner,
107                     buf: wbuf,
108                     written,
109                 },
110                 buf: rbuf,
111                 pos,
112                 cap,
113             },
114         }
115     }
116 }
117 
118 impl<RW: AsyncRead + AsyncWrite> AsyncWrite for BufStream<RW> {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>119     fn poll_write(
120         self: Pin<&mut Self>,
121         cx: &mut Context<'_>,
122         buf: &[u8],
123     ) -> Poll<io::Result<usize>> {
124         self.project().inner.poll_write(cx, buf)
125     }
126 
poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>127     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
128         self.project().inner.poll_flush(cx)
129     }
130 
poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>131     fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
132         self.project().inner.poll_shutdown(cx)
133     }
134 }
135 
136 impl<RW: AsyncRead + AsyncWrite> AsyncRead for BufStream<RW> {
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<io::Result<usize>>137     fn poll_read(
138         self: Pin<&mut Self>,
139         cx: &mut Context<'_>,
140         buf: &mut [u8],
141     ) -> Poll<io::Result<usize>> {
142         self.project().inner.poll_read(cx, buf)
143     }
144 
145     // we can't skip unconditionally because of the large buffer case in read.
prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool146     unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool {
147         self.inner.prepare_uninitialized_buffer(buf)
148     }
149 }
150 
151 impl<RW: AsyncRead + AsyncWrite> AsyncBufRead for BufStream<RW> {
poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>>152     fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
153         self.project().inner.poll_fill_buf(cx)
154     }
155 
consume(self: Pin<&mut Self>, amt: usize)156     fn consume(self: Pin<&mut Self>, amt: usize) {
157         self.project().inner.consume(amt)
158     }
159 }
160 
161 #[cfg(test)]
162 mod tests {
163     use super::*;
164 
165     #[test]
assert_unpin()166     fn assert_unpin() {
167         crate::is_unpin::<BufStream<()>>();
168     }
169 }
170