1 //! `UnixStream` owned split support.
2 //!
3 //! A `UnixStream` can be split into an `OwnedReadHalf` and a `OwnedWriteHalf`
4 //! with the `UnixStream::into_split` method. `OwnedReadHalf` implements
5 //! `AsyncRead` while `OwnedWriteHalf` implements `AsyncWrite`.
6 //!
7 //! Compared to the generic split of `AsyncRead + AsyncWrite`, this specialized
8 //! split has no associated overhead and enforces all invariants at the type
9 //! level.
10
11 use crate::io::{AsyncRead, AsyncWrite, Interest, ReadBuf, Ready};
12 use crate::net::UnixStream;
13
14 use crate::net::unix::SocketAddr;
15 use std::error::Error;
16 use std::net::Shutdown;
17 use std::pin::Pin;
18 use std::sync::Arc;
19 use std::task::{Context, Poll};
20 use std::{fmt, io};
21
22 cfg_io_util! {
23 use bytes::BufMut;
24 }
25
26 /// Owned read half of a [`UnixStream`], created by [`into_split`].
27 ///
28 /// Reading from an `OwnedReadHalf` is usually done using the convenience methods found
29 /// on the [`AsyncReadExt`] trait.
30 ///
31 /// [`UnixStream`]: crate::net::UnixStream
32 /// [`into_split`]: crate::net::UnixStream::into_split()
33 /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
34 #[derive(Debug)]
35 pub struct OwnedReadHalf {
36 inner: Arc<UnixStream>,
37 }
38
39 /// Owned write half of a [`UnixStream`], created by [`into_split`].
40 ///
41 /// Note that in the [`AsyncWrite`] implementation of this type,
42 /// [`poll_shutdown`] will shut down the stream in the write direction.
43 /// Dropping the write half will also shut down the write half of the stream.
44 ///
45 /// Writing to an `OwnedWriteHalf` is usually done using the convenience methods
46 /// found on the [`AsyncWriteExt`] trait.
47 ///
48 /// [`UnixStream`]: crate::net::UnixStream
49 /// [`into_split`]: crate::net::UnixStream::into_split()
50 /// [`AsyncWrite`]: trait@crate::io::AsyncWrite
51 /// [`poll_shutdown`]: fn@crate::io::AsyncWrite::poll_shutdown
52 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
53 #[derive(Debug)]
54 pub struct OwnedWriteHalf {
55 inner: Arc<UnixStream>,
56 shutdown_on_drop: bool,
57 }
58
split_owned(stream: UnixStream) -> (OwnedReadHalf, OwnedWriteHalf)59 pub(crate) fn split_owned(stream: UnixStream) -> (OwnedReadHalf, OwnedWriteHalf) {
60 let arc = Arc::new(stream);
61 let read = OwnedReadHalf {
62 inner: Arc::clone(&arc),
63 };
64 let write = OwnedWriteHalf {
65 inner: arc,
66 shutdown_on_drop: true,
67 };
68 (read, write)
69 }
70
reunite( read: OwnedReadHalf, write: OwnedWriteHalf, ) -> Result<UnixStream, ReuniteError>71 pub(crate) fn reunite(
72 read: OwnedReadHalf,
73 write: OwnedWriteHalf,
74 ) -> Result<UnixStream, ReuniteError> {
75 if Arc::ptr_eq(&read.inner, &write.inner) {
76 write.forget();
77 // This unwrap cannot fail as the api does not allow creating more than two Arcs,
78 // and we just dropped the other half.
79 Ok(Arc::try_unwrap(read.inner).expect("UnixStream: try_unwrap failed in reunite"))
80 } else {
81 Err(ReuniteError(read, write))
82 }
83 }
84
85 /// Error indicating that two halves were not from the same socket, and thus could
86 /// not be reunited.
87 #[derive(Debug)]
88 pub struct ReuniteError(pub OwnedReadHalf, pub OwnedWriteHalf);
89
90 impl fmt::Display for ReuniteError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result91 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
92 write!(
93 f,
94 "tried to reunite halves that are not from the same socket"
95 )
96 }
97 }
98
99 impl Error for ReuniteError {}
100
101 impl OwnedReadHalf {
102 /// Attempts to put the two halves of a `UnixStream` back together and
103 /// recover the original socket. Succeeds only if the two halves
104 /// originated from the same call to [`into_split`].
105 ///
106 /// [`into_split`]: crate::net::UnixStream::into_split()
reunite(self, other: OwnedWriteHalf) -> Result<UnixStream, ReuniteError>107 pub fn reunite(self, other: OwnedWriteHalf) -> Result<UnixStream, ReuniteError> {
108 reunite(self, other)
109 }
110
111 /// Waits for any of the requested ready states.
112 ///
113 /// This function is usually paired with `try_read()` or `try_write()`. It
114 /// can be used to concurrently read / write to the same socket on a single
115 /// task without splitting the socket.
116 ///
117 /// # Cancel safety
118 ///
119 /// This method is cancel safe. Once a readiness event occurs, the method
120 /// will continue to return immediately until the readiness event is
121 /// consumed by an attempt to read or write that fails with `WouldBlock` or
122 /// `Poll::Pending`.
ready(&self, interest: Interest) -> io::Result<Ready>123 pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
124 self.inner.ready(interest).await
125 }
126
127 /// Waits for the socket to become readable.
128 ///
129 /// This function is equivalent to `ready(Interest::READABLE)` and is usually
130 /// paired with `try_read()`.
131 ///
132 /// # Cancel safety
133 ///
134 /// This method is cancel safe. Once a readiness event occurs, the method
135 /// will continue to return immediately until the readiness event is
136 /// consumed by an attempt to read that fails with `WouldBlock` or
137 /// `Poll::Pending`.
readable(&self) -> io::Result<()>138 pub async fn readable(&self) -> io::Result<()> {
139 self.inner.readable().await
140 }
141
142 /// Tries to read data from the stream into the provided buffer, returning how
143 /// many bytes were read.
144 ///
145 /// Receives any pending data from the socket but does not wait for new data
146 /// to arrive. On success, returns the number of bytes read. Because
147 /// `try_read()` is non-blocking, the buffer does not have to be stored by
148 /// the async task and can exist entirely on the stack.
149 ///
150 /// Usually, [`readable()`] or [`ready()`] is used with this function.
151 ///
152 /// [`readable()`]: Self::readable()
153 /// [`ready()`]: Self::ready()
154 ///
155 /// # Return
156 ///
157 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
158 /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
159 /// and will no longer yield data. If the stream is not ready to read data
160 /// `Err(io::ErrorKind::WouldBlock)` is returned.
try_read(&self, buf: &mut [u8]) -> io::Result<usize>161 pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
162 self.inner.try_read(buf)
163 }
164
165 cfg_io_util! {
166 /// Tries to read data from the stream into the provided buffer, advancing the
167 /// buffer's internal cursor, returning how many bytes were read.
168 ///
169 /// Receives any pending data from the socket but does not wait for new data
170 /// to arrive. On success, returns the number of bytes read. Because
171 /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
172 /// the async task and can exist entirely on the stack.
173 ///
174 /// Usually, [`readable()`] or [`ready()`] is used with this function.
175 ///
176 /// [`readable()`]: Self::readable()
177 /// [`ready()`]: Self::ready()
178 ///
179 /// # Return
180 ///
181 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
182 /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
183 /// and will no longer yield data. If the stream is not ready to read data
184 /// `Err(io::ErrorKind::WouldBlock)` is returned.
185 pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
186 self.inner.try_read_buf(buf)
187 }
188 }
189
190 /// Tries to read data from the stream into the provided buffers, returning
191 /// how many bytes were read.
192 ///
193 /// Data is copied to fill each buffer in order, with the final buffer
194 /// written to possibly being only partially filled. This method behaves
195 /// equivalently to a single call to [`try_read()`] with concatenated
196 /// buffers.
197 ///
198 /// Receives any pending data from the socket but does not wait for new data
199 /// to arrive. On success, returns the number of bytes read. Because
200 /// `try_read_vectored()` is non-blocking, the buffer does not have to be
201 /// stored by the async task and can exist entirely on the stack.
202 ///
203 /// Usually, [`readable()`] or [`ready()`] is used with this function.
204 ///
205 /// [`try_read()`]: Self::try_read()
206 /// [`readable()`]: Self::readable()
207 /// [`ready()`]: Self::ready()
208 ///
209 /// # Return
210 ///
211 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
212 /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
213 /// and will no longer yield data. If the stream is not ready to read data
214 /// `Err(io::ErrorKind::WouldBlock)` is returned.
try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize>215 pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
216 self.inner.try_read_vectored(bufs)
217 }
218
219 /// Returns the socket address of the remote half of this connection.
peer_addr(&self) -> io::Result<SocketAddr>220 pub fn peer_addr(&self) -> io::Result<SocketAddr> {
221 self.inner.peer_addr()
222 }
223
224 /// Returns the socket address of the local half of this connection.
local_addr(&self) -> io::Result<SocketAddr>225 pub fn local_addr(&self) -> io::Result<SocketAddr> {
226 self.inner.local_addr()
227 }
228 }
229
230 impl AsyncRead for OwnedReadHalf {
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>231 fn poll_read(
232 self: Pin<&mut Self>,
233 cx: &mut Context<'_>,
234 buf: &mut ReadBuf<'_>,
235 ) -> Poll<io::Result<()>> {
236 self.inner.poll_read_priv(cx, buf)
237 }
238 }
239
240 impl OwnedWriteHalf {
241 /// Attempts to put the two halves of a `UnixStream` back together and
242 /// recover the original socket. Succeeds only if the two halves
243 /// originated from the same call to [`into_split`].
244 ///
245 /// [`into_split`]: crate::net::UnixStream::into_split()
reunite(self, other: OwnedReadHalf) -> Result<UnixStream, ReuniteError>246 pub fn reunite(self, other: OwnedReadHalf) -> Result<UnixStream, ReuniteError> {
247 reunite(other, self)
248 }
249
250 /// Destroys the write half, but don't close the write half of the stream
251 /// until the read half is dropped. If the read half has already been
252 /// dropped, this closes the stream.
forget(mut self)253 pub fn forget(mut self) {
254 self.shutdown_on_drop = false;
255 drop(self);
256 }
257
258 /// Waits for any of the requested ready states.
259 ///
260 /// This function is usually paired with `try_read()` or `try_write()`. It
261 /// can be used to concurrently read / write to the same socket on a single
262 /// task without splitting the socket.
263 ///
264 /// # Cancel safety
265 ///
266 /// This method is cancel safe. Once a readiness event occurs, the method
267 /// will continue to return immediately until the readiness event is
268 /// consumed by an attempt to read or write that fails with `WouldBlock` or
269 /// `Poll::Pending`.
ready(&self, interest: Interest) -> io::Result<Ready>270 pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
271 self.inner.ready(interest).await
272 }
273
274 /// Waits for the socket to become writable.
275 ///
276 /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
277 /// paired with `try_write()`.
278 ///
279 /// # Cancel safety
280 ///
281 /// This method is cancel safe. Once a readiness event occurs, the method
282 /// will continue to return immediately until the readiness event is
283 /// consumed by an attempt to write that fails with `WouldBlock` or
284 /// `Poll::Pending`.
writable(&self) -> io::Result<()>285 pub async fn writable(&self) -> io::Result<()> {
286 self.inner.writable().await
287 }
288
289 /// Tries to write a buffer to the stream, returning how many bytes were
290 /// written.
291 ///
292 /// The function will attempt to write the entire contents of `buf`, but
293 /// only part of the buffer may be written.
294 ///
295 /// This function is usually paired with `writable()`.
296 ///
297 /// # Return
298 ///
299 /// If data is successfully written, `Ok(n)` is returned, where `n` is the
300 /// number of bytes written. If the stream is not ready to write data,
301 /// `Err(io::ErrorKind::WouldBlock)` is returned.
try_write(&self, buf: &[u8]) -> io::Result<usize>302 pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
303 self.inner.try_write(buf)
304 }
305
306 /// Tries to write several buffers to the stream, returning how many bytes
307 /// were written.
308 ///
309 /// Data is written from each buffer in order, with the final buffer read
310 /// from possible being only partially consumed. This method behaves
311 /// equivalently to a single call to [`try_write()`] with concatenated
312 /// buffers.
313 ///
314 /// This function is usually paired with `writable()`.
315 ///
316 /// [`try_write()`]: Self::try_write()
317 ///
318 /// # Return
319 ///
320 /// If data is successfully written, `Ok(n)` is returned, where `n` is the
321 /// number of bytes written. If the stream is not ready to write data,
322 /// `Err(io::ErrorKind::WouldBlock)` is returned.
try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize>323 pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
324 self.inner.try_write_vectored(buf)
325 }
326
327 /// Returns the socket address of the remote half of this connection.
peer_addr(&self) -> io::Result<SocketAddr>328 pub fn peer_addr(&self) -> io::Result<SocketAddr> {
329 self.inner.peer_addr()
330 }
331
332 /// Returns the socket address of the local half of this connection.
local_addr(&self) -> io::Result<SocketAddr>333 pub fn local_addr(&self) -> io::Result<SocketAddr> {
334 self.inner.local_addr()
335 }
336 }
337
338 impl Drop for OwnedWriteHalf {
drop(&mut self)339 fn drop(&mut self) {
340 if self.shutdown_on_drop {
341 let _ = self.inner.shutdown_std(Shutdown::Write);
342 }
343 }
344 }
345
346 impl AsyncWrite for OwnedWriteHalf {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>347 fn poll_write(
348 self: Pin<&mut Self>,
349 cx: &mut Context<'_>,
350 buf: &[u8],
351 ) -> Poll<io::Result<usize>> {
352 self.inner.poll_write_priv(cx, buf)
353 }
354
poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll<io::Result<usize>>355 fn poll_write_vectored(
356 self: Pin<&mut Self>,
357 cx: &mut Context<'_>,
358 bufs: &[io::IoSlice<'_>],
359 ) -> Poll<io::Result<usize>> {
360 self.inner.poll_write_vectored_priv(cx, bufs)
361 }
362
is_write_vectored(&self) -> bool363 fn is_write_vectored(&self) -> bool {
364 self.inner.is_write_vectored()
365 }
366
367 #[inline]
poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>368 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
369 // flush is a no-op
370 Poll::Ready(Ok(()))
371 }
372
373 // `poll_shutdown` on a write half shutdowns the stream in the "write" direction.
poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>374 fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
375 let res = self.inner.shutdown_std(Shutdown::Write);
376 if res.is_ok() {
377 Pin::into_inner(self).shutdown_on_drop = false;
378 }
379 res.into()
380 }
381 }
382
383 impl AsRef<UnixStream> for OwnedReadHalf {
as_ref(&self) -> &UnixStream384 fn as_ref(&self) -> &UnixStream {
385 &*self.inner
386 }
387 }
388
389 impl AsRef<UnixStream> for OwnedWriteHalf {
as_ref(&self) -> &UnixStream390 fn as_ref(&self) -> &UnixStream {
391 &*self.inner
392 }
393 }
394