1 //! `TcpStream` split support.
2 //!
3 //! A `TcpStream` can be split into a `ReadHalf` and a
4 //! `WriteHalf` with the `TcpStream::split` method. `ReadHalf`
5 //! implements `AsyncRead` while `WriteHalf` implements `AsyncWrite`.
6 //!
7 //! Compared to the generic split of `AsyncRead + AsyncWrite`, this specialized
8 //! split has no associated overhead and enforces all invariants at the type
9 //! level.
10 
11 use crate::future::poll_fn;
12 use crate::io::{AsyncRead, AsyncWrite};
13 use crate::net::TcpStream;
14 
15 use bytes::Buf;
16 use std::io;
17 use std::mem::MaybeUninit;
18 use std::net::Shutdown;
19 use std::pin::Pin;
20 use std::task::{Context, Poll};
21 
22 /// Borrowed read half of a [`TcpStream`], created by [`split`].
23 ///
24 /// Reading from a `ReadHalf` is usually done using the convenience methods found on the
25 /// [`AsyncReadExt`] trait. Examples import this trait through [the prelude].
26 ///
27 /// [`TcpStream`]: TcpStream
28 /// [`split`]: TcpStream::split()
29 /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
30 /// [the prelude]: crate::prelude
31 #[derive(Debug)]
32 pub struct ReadHalf<'a>(&'a TcpStream);
33 
34 /// Borrowed write half of a [`TcpStream`], created by [`split`].
35 ///
36 /// Note that in the [`AsyncWrite`] implemenation of this type, [`poll_shutdown`] will
37 /// shut down the TCP stream in the write direction.
38 ///
39 /// Writing to an `WriteHalf` is usually done using the convenience methods found
40 /// on the [`AsyncWriteExt`] trait. Examples import this trait through [the prelude].
41 ///
42 /// [`TcpStream`]: TcpStream
43 /// [`split`]: TcpStream::split()
44 /// [`AsyncWrite`]: trait@crate::io::AsyncWrite
45 /// [`poll_shutdown`]: fn@crate::io::AsyncWrite::poll_shutdown
46 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
47 /// [the prelude]: crate::prelude
48 #[derive(Debug)]
49 pub struct WriteHalf<'a>(&'a TcpStream);
50 
split(stream: &mut TcpStream) -> (ReadHalf<'_>, WriteHalf<'_>)51 pub(crate) fn split(stream: &mut TcpStream) -> (ReadHalf<'_>, WriteHalf<'_>) {
52     (ReadHalf(&*stream), WriteHalf(&*stream))
53 }
54 
55 impl ReadHalf<'_> {
56     /// Attempt to receive data on the socket, without removing that data from
57     /// the queue, registering the current task for wakeup if data is not yet
58     /// available.
59     ///
60     /// See the [`TcpStream::poll_peek`] level documenation for more details.
61     ///
62     /// # Examples
63     ///
64     /// ```no_run
65     /// use tokio::io;
66     /// use tokio::net::TcpStream;
67     ///
68     /// use futures::future::poll_fn;
69     ///
70     /// #[tokio::main]
71     /// async fn main() -> io::Result<()> {
72     ///     let mut stream = TcpStream::connect("127.0.0.1:8000").await?;
73     ///     let (mut read_half, _) = stream.split();
74     ///     let mut buf = [0; 10];
75     ///
76     ///     poll_fn(|cx| {
77     ///         read_half.poll_peek(cx, &mut buf)
78     ///     }).await?;
79     ///
80     ///     Ok(())
81     /// }
82     /// ```
83     ///
84     /// [`TcpStream::poll_peek`]: TcpStream::poll_peek
poll_peek(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>>85     pub fn poll_peek(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
86         self.0.poll_peek2(cx, buf)
87     }
88 
89     /// Receives data on the socket from the remote address to which it is
90     /// connected, without removing that data from the queue. On success,
91     /// returns the number of bytes peeked.
92     ///
93     /// See the [`TcpStream::peek`] level documenation for more details.
94     ///
95     /// [`TcpStream::peek`]: TcpStream::peek
96     ///
97     /// # Examples
98     ///
99     /// ```no_run
100     /// use tokio::net::TcpStream;
101     /// use tokio::prelude::*;
102     /// use std::error::Error;
103     ///
104     /// #[tokio::main]
105     /// async fn main() -> Result<(), Box<dyn Error>> {
106     ///     // Connect to a peer
107     ///     let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
108     ///     let (mut read_half, _) = stream.split();
109     ///
110     ///     let mut b1 = [0; 10];
111     ///     let mut b2 = [0; 10];
112     ///
113     ///     // Peek at the data
114     ///     let n = read_half.peek(&mut b1).await?;
115     ///
116     ///     // Read the data
117     ///     assert_eq!(n, read_half.read(&mut b2[..n]).await?);
118     ///     assert_eq!(&b1[..n], &b2[..n]);
119     ///
120     ///     Ok(())
121     /// }
122     /// ```
123     ///
124     /// The [`read`] method is defined on the [`AsyncReadExt`] trait.
125     ///
126     /// [`read`]: fn@crate::io::AsyncReadExt::read
127     /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
peek(&mut self, buf: &mut [u8]) -> io::Result<usize>128     pub async fn peek(&mut self, buf: &mut [u8]) -> io::Result<usize> {
129         poll_fn(|cx| self.poll_peek(cx, buf)).await
130     }
131 }
132 
133 impl AsyncRead for ReadHalf<'_> {
prepare_uninitialized_buffer(&self, _: &mut [MaybeUninit<u8>]) -> bool134     unsafe fn prepare_uninitialized_buffer(&self, _: &mut [MaybeUninit<u8>]) -> bool {
135         false
136     }
137 
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<io::Result<usize>>138     fn poll_read(
139         self: Pin<&mut Self>,
140         cx: &mut Context<'_>,
141         buf: &mut [u8],
142     ) -> Poll<io::Result<usize>> {
143         self.0.poll_read_priv(cx, buf)
144     }
145 }
146 
147 impl AsyncWrite for WriteHalf<'_> {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>148     fn poll_write(
149         self: Pin<&mut Self>,
150         cx: &mut Context<'_>,
151         buf: &[u8],
152     ) -> Poll<io::Result<usize>> {
153         self.0.poll_write_priv(cx, buf)
154     }
155 
poll_write_buf<B: Buf>( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut B, ) -> Poll<io::Result<usize>>156     fn poll_write_buf<B: Buf>(
157         self: Pin<&mut Self>,
158         cx: &mut Context<'_>,
159         buf: &mut B,
160     ) -> Poll<io::Result<usize>> {
161         self.0.poll_write_buf_priv(cx, buf)
162     }
163 
164     #[inline]
poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>165     fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
166         // tcp flush is a no-op
167         Poll::Ready(Ok(()))
168     }
169 
170     // `poll_shutdown` on a write half shutdowns the stream in the "write" direction.
poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>171     fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
172         self.0.shutdown(Shutdown::Write).into()
173     }
174 }
175 
176 impl AsRef<TcpStream> for ReadHalf<'_> {
as_ref(&self) -> &TcpStream177     fn as_ref(&self) -> &TcpStream {
178         self.0
179     }
180 }
181 
182 impl AsRef<TcpStream> for WriteHalf<'_> {
as_ref(&self) -> &TcpStream183     fn as_ref(&self) -> &TcpStream {
184         self.0
185     }
186 }
187