1 //! `UnixStream` split support.
2 //!
3 //! A `UnixStream` can be split into a read half and a write half with
4 //! `UnixStream::split`. The read half implements `AsyncRead` while the write
5 //! half implements `AsyncWrite`.
6 //!
7 //! Compared to the generic split of `AsyncRead + AsyncWrite`, this specialized
8 //! split has no associated overhead and enforces all invariants at the type
9 //! level.
10 
11 use crate::io::{AsyncRead, AsyncWrite, Interest, ReadBuf, Ready};
12 use crate::net::UnixStream;
13 
14 use crate::net::unix::SocketAddr;
15 use std::io;
16 use std::net::Shutdown;
17 use std::pin::Pin;
18 use std::task::{Context, Poll};
19 
20 cfg_io_util! {
21     use bytes::BufMut;
22 }
23 
24 /// Borrowed read half of a [`UnixStream`], created by [`split`].
25 ///
26 /// Reading from a `ReadHalf` is usually done using the convenience methods found on the
27 /// [`AsyncReadExt`] trait.
28 ///
29 /// [`UnixStream`]: UnixStream
30 /// [`split`]: UnixStream::split()
31 /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
32 #[derive(Debug)]
33 pub struct ReadHalf<'a>(&'a UnixStream);
34 
35 /// Borrowed write half of a [`UnixStream`], created by [`split`].
36 ///
37 /// Note that in the [`AsyncWrite`] implementation of this type, [`poll_shutdown`] will
38 /// shut down the UnixStream stream in the write direction.
39 ///
40 /// Writing to an `WriteHalf` is usually done using the convenience methods found
41 /// on the [`AsyncWriteExt`] trait.
42 ///
43 /// [`UnixStream`]: UnixStream
44 /// [`split`]: UnixStream::split()
45 /// [`AsyncWrite`]: trait@crate::io::AsyncWrite
46 /// [`poll_shutdown`]: fn@crate::io::AsyncWrite::poll_shutdown
47 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
48 #[derive(Debug)]
49 pub struct WriteHalf<'a>(&'a UnixStream);
50 
split(stream: &mut UnixStream) -> (ReadHalf<'_>, WriteHalf<'_>)51 pub(crate) fn split(stream: &mut UnixStream) -> (ReadHalf<'_>, WriteHalf<'_>) {
52     (ReadHalf(stream), WriteHalf(stream))
53 }
54 
55 impl ReadHalf<'_> {
56     /// Wait for any of the requested ready states.
57     ///
58     /// This function is usually paired with `try_read()` or `try_write()`. It
59     /// can be used to concurrently read / write to the same socket on a single
60     /// task without splitting the socket.
61     ///
62     /// # Cancel safety
63     ///
64     /// This method is cancel safe. Once a readiness event occurs, the method
65     /// will continue to return immediately until the readiness event is
66     /// consumed by an attempt to read or write that fails with `WouldBlock` or
67     /// `Poll::Pending`.
ready(&self, interest: Interest) -> io::Result<Ready>68     pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
69         self.0.ready(interest).await
70     }
71 
72     /// Waits for the socket to become readable.
73     ///
74     /// This function is equivalent to `ready(Interest::READABLE)` and is usually
75     /// paired with `try_read()`.
76     ///
77     /// # Cancel safety
78     ///
79     /// This method is cancel safe. Once a readiness event occurs, the method
80     /// will continue to return immediately until the readiness event is
81     /// consumed by an attempt to read that fails with `WouldBlock` or
82     /// `Poll::Pending`.
readable(&self) -> io::Result<()>83     pub async fn readable(&self) -> io::Result<()> {
84         self.0.readable().await
85     }
86 
87     /// Tries to read data from the stream into the provided buffer, returning how
88     /// many bytes were read.
89     ///
90     /// Receives any pending data from the socket but does not wait for new data
91     /// to arrive. On success, returns the number of bytes read. Because
92     /// `try_read()` is non-blocking, the buffer does not have to be stored by
93     /// the async task and can exist entirely on the stack.
94     ///
95     /// Usually, [`readable()`] or [`ready()`] is used with this function.
96     ///
97     /// [`readable()`]: Self::readable()
98     /// [`ready()`]: Self::ready()
99     ///
100     /// # Return
101     ///
102     /// If data is successfully read, `Ok(n)` is returned, where `n` is the
103     /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
104     /// and will no longer yield data. If the stream is not ready to read data
105     /// `Err(io::ErrorKind::WouldBlock)` is returned.
try_read(&self, buf: &mut [u8]) -> io::Result<usize>106     pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
107         self.0.try_read(buf)
108     }
109 
110     cfg_io_util! {
111         /// Tries to read data from the stream into the provided buffer, advancing the
112         /// buffer's internal cursor, returning how many bytes were read.
113         ///
114         /// Receives any pending data from the socket but does not wait for new data
115         /// to arrive. On success, returns the number of bytes read. Because
116         /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
117         /// the async task and can exist entirely on the stack.
118         ///
119         /// Usually, [`readable()`] or [`ready()`] is used with this function.
120         ///
121         /// [`readable()`]: Self::readable()
122         /// [`ready()`]: Self::ready()
123         ///
124         /// # Return
125         ///
126         /// If data is successfully read, `Ok(n)` is returned, where `n` is the
127         /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
128         /// and will no longer yield data. If the stream is not ready to read data
129         pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
130             self.0.try_read_buf(buf)
131         }
132     }
133 
134     /// Tries to read data from the stream into the provided buffers, returning
135     /// how many bytes were read.
136     ///
137     /// Data is copied to fill each buffer in order, with the final buffer
138     /// written to possibly being only partially filled. This method behaves
139     /// equivalently to a single call to [`try_read()`] with concatenated
140     /// buffers.
141     ///
142     /// Receives any pending data from the socket but does not wait for new data
143     /// to arrive. On success, returns the number of bytes read. Because
144     /// `try_read_vectored()` is non-blocking, the buffer does not have to be
145     /// stored by the async task and can exist entirely on the stack.
146     ///
147     /// Usually, [`readable()`] or [`ready()`] is used with this function.
148     ///
149     /// [`try_read()`]: Self::try_read()
150     /// [`readable()`]: Self::readable()
151     /// [`ready()`]: Self::ready()
152     ///
153     /// # Return
154     ///
155     /// If data is successfully read, `Ok(n)` is returned, where `n` is the
156     /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
157     /// and will no longer yield data. If the stream is not ready to read data
158     /// `Err(io::ErrorKind::WouldBlock)` is returned.
try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize>159     pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
160         self.0.try_read_vectored(bufs)
161     }
162 
163     /// Returns the socket address of the remote half of this connection.
peer_addr(&self) -> io::Result<SocketAddr>164     pub fn peer_addr(&self) -> io::Result<SocketAddr> {
165         self.0.peer_addr()
166     }
167 
168     /// Returns the socket address of the local half of this connection.
local_addr(&self) -> io::Result<SocketAddr>169     pub fn local_addr(&self) -> io::Result<SocketAddr> {
170         self.0.local_addr()
171     }
172 }
173 
174 impl WriteHalf<'_> {
175     /// Waits for any of the requested ready states.
176     ///
177     /// This function is usually paired with `try_read()` or `try_write()`. It
178     /// can be used to concurrently read / write to the same socket on a single
179     /// task without splitting the socket.
180     ///
181     /// # Cancel safety
182     ///
183     /// This method is cancel safe. Once a readiness event occurs, the method
184     /// will continue to return immediately until the readiness event is
185     /// consumed by an attempt to read or write that fails with `WouldBlock` or
186     /// `Poll::Pending`.
ready(&self, interest: Interest) -> io::Result<Ready>187     pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
188         self.0.ready(interest).await
189     }
190 
191     /// Waits for the socket to become writable.
192     ///
193     /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
194     /// paired with `try_write()`.
195     ///
196     /// # Cancel safety
197     ///
198     /// This method is cancel safe. Once a readiness event occurs, the method
199     /// will continue to return immediately until the readiness event is
200     /// consumed by an attempt to write that fails with `WouldBlock` or
201     /// `Poll::Pending`.
writable(&self) -> io::Result<()>202     pub async fn writable(&self) -> io::Result<()> {
203         self.0.writable().await
204     }
205 
206     /// Tries to write a buffer to the stream, returning how many bytes were
207     /// written.
208     ///
209     /// The function will attempt to write the entire contents of `buf`, but
210     /// only part of the buffer may be written.
211     ///
212     /// This function is usually paired with `writable()`.
213     ///
214     /// # Return
215     ///
216     /// If data is successfully written, `Ok(n)` is returned, where `n` is the
217     /// number of bytes written. If the stream is not ready to write data,
218     /// `Err(io::ErrorKind::WouldBlock)` is returned.
try_write(&self, buf: &[u8]) -> io::Result<usize>219     pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
220         self.0.try_write(buf)
221     }
222 
223     /// Tries to write several buffers to the stream, returning how many bytes
224     /// were written.
225     ///
226     /// Data is written from each buffer in order, with the final buffer read
227     /// from possible being only partially consumed. This method behaves
228     /// equivalently to a single call to [`try_write()`] with concatenated
229     /// buffers.
230     ///
231     /// This function is usually paired with `writable()`.
232     ///
233     /// [`try_write()`]: Self::try_write()
234     ///
235     /// # Return
236     ///
237     /// If data is successfully written, `Ok(n)` is returned, where `n` is the
238     /// number of bytes written. If the stream is not ready to write data,
239     /// `Err(io::ErrorKind::WouldBlock)` is returned.
try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize>240     pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
241         self.0.try_write_vectored(buf)
242     }
243 
244     /// Returns the socket address of the remote half of this connection.
peer_addr(&self) -> io::Result<SocketAddr>245     pub fn peer_addr(&self) -> io::Result<SocketAddr> {
246         self.0.peer_addr()
247     }
248 
249     /// Returns the socket address of the local half of this connection.
local_addr(&self) -> io::Result<SocketAddr>250     pub fn local_addr(&self) -> io::Result<SocketAddr> {
251         self.0.local_addr()
252     }
253 }
254 
255 impl AsyncRead for ReadHalf<'_> {
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>256     fn poll_read(
257         self: Pin<&mut Self>,
258         cx: &mut Context<'_>,
259         buf: &mut ReadBuf<'_>,
260     ) -> Poll<io::Result<()>> {
261         self.0.poll_read_priv(cx, buf)
262     }
263 }
264 
265 impl AsyncWrite for WriteHalf<'_> {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>266     fn poll_write(
267         self: Pin<&mut Self>,
268         cx: &mut Context<'_>,
269         buf: &[u8],
270     ) -> Poll<io::Result<usize>> {
271         self.0.poll_write_priv(cx, buf)
272     }
273 
poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll<io::Result<usize>>274     fn poll_write_vectored(
275         self: Pin<&mut Self>,
276         cx: &mut Context<'_>,
277         bufs: &[io::IoSlice<'_>],
278     ) -> Poll<io::Result<usize>> {
279         self.0.poll_write_vectored_priv(cx, bufs)
280     }
281 
is_write_vectored(&self) -> bool282     fn is_write_vectored(&self) -> bool {
283         self.0.is_write_vectored()
284     }
285 
poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>286     fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
287         Poll::Ready(Ok(()))
288     }
289 
poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>290     fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
291         self.0.shutdown_std(Shutdown::Write).into()
292     }
293 }
294 
295 impl AsRef<UnixStream> for ReadHalf<'_> {
as_ref(&self) -> &UnixStream296     fn as_ref(&self) -> &UnixStream {
297         self.0
298     }
299 }
300 
301 impl AsRef<UnixStream> for WriteHalf<'_> {
as_ref(&self) -> &UnixStream302     fn as_ref(&self) -> &UnixStream {
303         self.0
304     }
305 }
306