1 //! `UnixStream` split support.
2 //!
3 //! A `UnixStream` can be split into a read half and a write half with
4 //! `UnixStream::split`. The read half implements `AsyncRead` while the write
5 //! half implements `AsyncWrite`.
6 //!
7 //! Compared to the generic split of `AsyncRead + AsyncWrite`, this specialized
8 //! split has no associated overhead and enforces all invariants at the type
9 //! level.
10
11 use crate::io::{AsyncRead, AsyncWrite, Interest, ReadBuf, Ready};
12 use crate::net::UnixStream;
13
14 use crate::net::unix::SocketAddr;
15 use std::io;
16 use std::net::Shutdown;
17 use std::pin::Pin;
18 use std::task::{Context, Poll};
19
20 cfg_io_util! {
21 use bytes::BufMut;
22 }
23
24 /// Borrowed read half of a [`UnixStream`], created by [`split`].
25 ///
26 /// Reading from a `ReadHalf` is usually done using the convenience methods found on the
27 /// [`AsyncReadExt`] trait.
28 ///
29 /// [`UnixStream`]: UnixStream
30 /// [`split`]: UnixStream::split()
31 /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
32 #[derive(Debug)]
33 pub struct ReadHalf<'a>(&'a UnixStream);
34
35 /// Borrowed write half of a [`UnixStream`], created by [`split`].
36 ///
37 /// Note that in the [`AsyncWrite`] implementation of this type, [`poll_shutdown`] will
38 /// shut down the UnixStream stream in the write direction.
39 ///
40 /// Writing to an `WriteHalf` is usually done using the convenience methods found
41 /// on the [`AsyncWriteExt`] trait.
42 ///
43 /// [`UnixStream`]: UnixStream
44 /// [`split`]: UnixStream::split()
45 /// [`AsyncWrite`]: trait@crate::io::AsyncWrite
46 /// [`poll_shutdown`]: fn@crate::io::AsyncWrite::poll_shutdown
47 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
48 #[derive(Debug)]
49 pub struct WriteHalf<'a>(&'a UnixStream);
50
split(stream: &mut UnixStream) -> (ReadHalf<'_>, WriteHalf<'_>)51 pub(crate) fn split(stream: &mut UnixStream) -> (ReadHalf<'_>, WriteHalf<'_>) {
52 (ReadHalf(stream), WriteHalf(stream))
53 }
54
55 impl ReadHalf<'_> {
56 /// Wait for any of the requested ready states.
57 ///
58 /// This function is usually paired with `try_read()` or `try_write()`. It
59 /// can be used to concurrently read / write to the same socket on a single
60 /// task without splitting the socket.
61 ///
62 /// # Cancel safety
63 ///
64 /// This method is cancel safe. Once a readiness event occurs, the method
65 /// will continue to return immediately until the readiness event is
66 /// consumed by an attempt to read or write that fails with `WouldBlock` or
67 /// `Poll::Pending`.
ready(&self, interest: Interest) -> io::Result<Ready>68 pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
69 self.0.ready(interest).await
70 }
71
72 /// Waits for the socket to become readable.
73 ///
74 /// This function is equivalent to `ready(Interest::READABLE)` and is usually
75 /// paired with `try_read()`.
76 ///
77 /// # Cancel safety
78 ///
79 /// This method is cancel safe. Once a readiness event occurs, the method
80 /// will continue to return immediately until the readiness event is
81 /// consumed by an attempt to read that fails with `WouldBlock` or
82 /// `Poll::Pending`.
readable(&self) -> io::Result<()>83 pub async fn readable(&self) -> io::Result<()> {
84 self.0.readable().await
85 }
86
87 /// Tries to read data from the stream into the provided buffer, returning how
88 /// many bytes were read.
89 ///
90 /// Receives any pending data from the socket but does not wait for new data
91 /// to arrive. On success, returns the number of bytes read. Because
92 /// `try_read()` is non-blocking, the buffer does not have to be stored by
93 /// the async task and can exist entirely on the stack.
94 ///
95 /// Usually, [`readable()`] or [`ready()`] is used with this function.
96 ///
97 /// [`readable()`]: Self::readable()
98 /// [`ready()`]: Self::ready()
99 ///
100 /// # Return
101 ///
102 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
103 /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
104 /// and will no longer yield data. If the stream is not ready to read data
105 /// `Err(io::ErrorKind::WouldBlock)` is returned.
try_read(&self, buf: &mut [u8]) -> io::Result<usize>106 pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
107 self.0.try_read(buf)
108 }
109
110 cfg_io_util! {
111 /// Tries to read data from the stream into the provided buffer, advancing the
112 /// buffer's internal cursor, returning how many bytes were read.
113 ///
114 /// Receives any pending data from the socket but does not wait for new data
115 /// to arrive. On success, returns the number of bytes read. Because
116 /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
117 /// the async task and can exist entirely on the stack.
118 ///
119 /// Usually, [`readable()`] or [`ready()`] is used with this function.
120 ///
121 /// [`readable()`]: Self::readable()
122 /// [`ready()`]: Self::ready()
123 ///
124 /// # Return
125 ///
126 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
127 /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
128 /// and will no longer yield data. If the stream is not ready to read data
129 pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
130 self.0.try_read_buf(buf)
131 }
132 }
133
134 /// Tries to read data from the stream into the provided buffers, returning
135 /// how many bytes were read.
136 ///
137 /// Data is copied to fill each buffer in order, with the final buffer
138 /// written to possibly being only partially filled. This method behaves
139 /// equivalently to a single call to [`try_read()`] with concatenated
140 /// buffers.
141 ///
142 /// Receives any pending data from the socket but does not wait for new data
143 /// to arrive. On success, returns the number of bytes read. Because
144 /// `try_read_vectored()` is non-blocking, the buffer does not have to be
145 /// stored by the async task and can exist entirely on the stack.
146 ///
147 /// Usually, [`readable()`] or [`ready()`] is used with this function.
148 ///
149 /// [`try_read()`]: Self::try_read()
150 /// [`readable()`]: Self::readable()
151 /// [`ready()`]: Self::ready()
152 ///
153 /// # Return
154 ///
155 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
156 /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
157 /// and will no longer yield data. If the stream is not ready to read data
158 /// `Err(io::ErrorKind::WouldBlock)` is returned.
try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize>159 pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
160 self.0.try_read_vectored(bufs)
161 }
162
163 /// Returns the socket address of the remote half of this connection.
peer_addr(&self) -> io::Result<SocketAddr>164 pub fn peer_addr(&self) -> io::Result<SocketAddr> {
165 self.0.peer_addr()
166 }
167
168 /// Returns the socket address of the local half of this connection.
local_addr(&self) -> io::Result<SocketAddr>169 pub fn local_addr(&self) -> io::Result<SocketAddr> {
170 self.0.local_addr()
171 }
172 }
173
174 impl WriteHalf<'_> {
175 /// Waits for any of the requested ready states.
176 ///
177 /// This function is usually paired with `try_read()` or `try_write()`. It
178 /// can be used to concurrently read / write to the same socket on a single
179 /// task without splitting the socket.
180 ///
181 /// # Cancel safety
182 ///
183 /// This method is cancel safe. Once a readiness event occurs, the method
184 /// will continue to return immediately until the readiness event is
185 /// consumed by an attempt to read or write that fails with `WouldBlock` or
186 /// `Poll::Pending`.
ready(&self, interest: Interest) -> io::Result<Ready>187 pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
188 self.0.ready(interest).await
189 }
190
191 /// Waits for the socket to become writable.
192 ///
193 /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
194 /// paired with `try_write()`.
195 ///
196 /// # Cancel safety
197 ///
198 /// This method is cancel safe. Once a readiness event occurs, the method
199 /// will continue to return immediately until the readiness event is
200 /// consumed by an attempt to write that fails with `WouldBlock` or
201 /// `Poll::Pending`.
writable(&self) -> io::Result<()>202 pub async fn writable(&self) -> io::Result<()> {
203 self.0.writable().await
204 }
205
206 /// Tries to write a buffer to the stream, returning how many bytes were
207 /// written.
208 ///
209 /// The function will attempt to write the entire contents of `buf`, but
210 /// only part of the buffer may be written.
211 ///
212 /// This function is usually paired with `writable()`.
213 ///
214 /// # Return
215 ///
216 /// If data is successfully written, `Ok(n)` is returned, where `n` is the
217 /// number of bytes written. If the stream is not ready to write data,
218 /// `Err(io::ErrorKind::WouldBlock)` is returned.
try_write(&self, buf: &[u8]) -> io::Result<usize>219 pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
220 self.0.try_write(buf)
221 }
222
223 /// Tries to write several buffers to the stream, returning how many bytes
224 /// were written.
225 ///
226 /// Data is written from each buffer in order, with the final buffer read
227 /// from possible being only partially consumed. This method behaves
228 /// equivalently to a single call to [`try_write()`] with concatenated
229 /// buffers.
230 ///
231 /// This function is usually paired with `writable()`.
232 ///
233 /// [`try_write()`]: Self::try_write()
234 ///
235 /// # Return
236 ///
237 /// If data is successfully written, `Ok(n)` is returned, where `n` is the
238 /// number of bytes written. If the stream is not ready to write data,
239 /// `Err(io::ErrorKind::WouldBlock)` is returned.
try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize>240 pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
241 self.0.try_write_vectored(buf)
242 }
243
244 /// Returns the socket address of the remote half of this connection.
peer_addr(&self) -> io::Result<SocketAddr>245 pub fn peer_addr(&self) -> io::Result<SocketAddr> {
246 self.0.peer_addr()
247 }
248
249 /// Returns the socket address of the local half of this connection.
local_addr(&self) -> io::Result<SocketAddr>250 pub fn local_addr(&self) -> io::Result<SocketAddr> {
251 self.0.local_addr()
252 }
253 }
254
255 impl AsyncRead for ReadHalf<'_> {
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>256 fn poll_read(
257 self: Pin<&mut Self>,
258 cx: &mut Context<'_>,
259 buf: &mut ReadBuf<'_>,
260 ) -> Poll<io::Result<()>> {
261 self.0.poll_read_priv(cx, buf)
262 }
263 }
264
265 impl AsyncWrite for WriteHalf<'_> {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>266 fn poll_write(
267 self: Pin<&mut Self>,
268 cx: &mut Context<'_>,
269 buf: &[u8],
270 ) -> Poll<io::Result<usize>> {
271 self.0.poll_write_priv(cx, buf)
272 }
273
poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll<io::Result<usize>>274 fn poll_write_vectored(
275 self: Pin<&mut Self>,
276 cx: &mut Context<'_>,
277 bufs: &[io::IoSlice<'_>],
278 ) -> Poll<io::Result<usize>> {
279 self.0.poll_write_vectored_priv(cx, bufs)
280 }
281
is_write_vectored(&self) -> bool282 fn is_write_vectored(&self) -> bool {
283 self.0.is_write_vectored()
284 }
285
poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>286 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
287 Poll::Ready(Ok(()))
288 }
289
poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>290 fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
291 self.0.shutdown_std(Shutdown::Write).into()
292 }
293 }
294
295 impl AsRef<UnixStream> for ReadHalf<'_> {
as_ref(&self) -> &UnixStream296 fn as_ref(&self) -> &UnixStream {
297 self.0
298 }
299 }
300
301 impl AsRef<UnixStream> for WriteHalf<'_> {
as_ref(&self) -> &UnixStream302 fn as_ref(&self) -> &UnixStream {
303 self.0
304 }
305 }
306