1 //! `TcpStream` split support.
2 //!
3 //! A `TcpStream` can be split into a `ReadHalf` and a
4 //! `WriteHalf` with the `TcpStream::split` method. `ReadHalf`
5 //! implements `AsyncRead` while `WriteHalf` implements `AsyncWrite`.
6 //!
7 //! Compared to the generic split of `AsyncRead + AsyncWrite`, this specialized
8 //! split has no associated overhead and enforces all invariants at the type
9 //! level.
10
11 use crate::future::poll_fn;
12 use crate::io::{AsyncRead, AsyncWrite};
13 use crate::net::TcpStream;
14
15 use bytes::Buf;
16 use std::io;
17 use std::mem::MaybeUninit;
18 use std::net::Shutdown;
19 use std::pin::Pin;
20 use std::task::{Context, Poll};
21
22 /// Borrowed read half of a [`TcpStream`], created by [`split`].
23 ///
24 /// Reading from a `ReadHalf` is usually done using the convenience methods found on the
25 /// [`AsyncReadExt`] trait. Examples import this trait through [the prelude].
26 ///
27 /// [`TcpStream`]: TcpStream
28 /// [`split`]: TcpStream::split()
29 /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
30 /// [the prelude]: crate::prelude
31 #[derive(Debug)]
32 pub struct ReadHalf<'a>(&'a TcpStream);
33
34 /// Borrowed write half of a [`TcpStream`], created by [`split`].
35 ///
36 /// Note that in the [`AsyncWrite`] implemenation of this type, [`poll_shutdown`] will
37 /// shut down the TCP stream in the write direction.
38 ///
39 /// Writing to an `WriteHalf` is usually done using the convenience methods found
40 /// on the [`AsyncWriteExt`] trait. Examples import this trait through [the prelude].
41 ///
42 /// [`TcpStream`]: TcpStream
43 /// [`split`]: TcpStream::split()
44 /// [`AsyncWrite`]: trait@crate::io::AsyncWrite
45 /// [`poll_shutdown`]: fn@crate::io::AsyncWrite::poll_shutdown
46 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
47 /// [the prelude]: crate::prelude
48 #[derive(Debug)]
49 pub struct WriteHalf<'a>(&'a TcpStream);
50
split(stream: &mut TcpStream) -> (ReadHalf<'_>, WriteHalf<'_>)51 pub(crate) fn split(stream: &mut TcpStream) -> (ReadHalf<'_>, WriteHalf<'_>) {
52 (ReadHalf(&*stream), WriteHalf(&*stream))
53 }
54
55 impl ReadHalf<'_> {
56 /// Attempt to receive data on the socket, without removing that data from
57 /// the queue, registering the current task for wakeup if data is not yet
58 /// available.
59 ///
60 /// See the [`TcpStream::poll_peek`] level documenation for more details.
61 ///
62 /// # Examples
63 ///
64 /// ```no_run
65 /// use tokio::io;
66 /// use tokio::net::TcpStream;
67 ///
68 /// use futures::future::poll_fn;
69 ///
70 /// #[tokio::main]
71 /// async fn main() -> io::Result<()> {
72 /// let mut stream = TcpStream::connect("127.0.0.1:8000").await?;
73 /// let (mut read_half, _) = stream.split();
74 /// let mut buf = [0; 10];
75 ///
76 /// poll_fn(|cx| {
77 /// read_half.poll_peek(cx, &mut buf)
78 /// }).await?;
79 ///
80 /// Ok(())
81 /// }
82 /// ```
83 ///
84 /// [`TcpStream::poll_peek`]: TcpStream::poll_peek
poll_peek(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>>85 pub fn poll_peek(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
86 self.0.poll_peek2(cx, buf)
87 }
88
89 /// Receives data on the socket from the remote address to which it is
90 /// connected, without removing that data from the queue. On success,
91 /// returns the number of bytes peeked.
92 ///
93 /// See the [`TcpStream::peek`] level documenation for more details.
94 ///
95 /// [`TcpStream::peek`]: TcpStream::peek
96 ///
97 /// # Examples
98 ///
99 /// ```no_run
100 /// use tokio::net::TcpStream;
101 /// use tokio::prelude::*;
102 /// use std::error::Error;
103 ///
104 /// #[tokio::main]
105 /// async fn main() -> Result<(), Box<dyn Error>> {
106 /// // Connect to a peer
107 /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
108 /// let (mut read_half, _) = stream.split();
109 ///
110 /// let mut b1 = [0; 10];
111 /// let mut b2 = [0; 10];
112 ///
113 /// // Peek at the data
114 /// let n = read_half.peek(&mut b1).await?;
115 ///
116 /// // Read the data
117 /// assert_eq!(n, read_half.read(&mut b2[..n]).await?);
118 /// assert_eq!(&b1[..n], &b2[..n]);
119 ///
120 /// Ok(())
121 /// }
122 /// ```
123 ///
124 /// The [`read`] method is defined on the [`AsyncReadExt`] trait.
125 ///
126 /// [`read`]: fn@crate::io::AsyncReadExt::read
127 /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
peek(&mut self, buf: &mut [u8]) -> io::Result<usize>128 pub async fn peek(&mut self, buf: &mut [u8]) -> io::Result<usize> {
129 poll_fn(|cx| self.poll_peek(cx, buf)).await
130 }
131 }
132
133 impl AsyncRead for ReadHalf<'_> {
prepare_uninitialized_buffer(&self, _: &mut [MaybeUninit<u8>]) -> bool134 unsafe fn prepare_uninitialized_buffer(&self, _: &mut [MaybeUninit<u8>]) -> bool {
135 false
136 }
137
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<io::Result<usize>>138 fn poll_read(
139 self: Pin<&mut Self>,
140 cx: &mut Context<'_>,
141 buf: &mut [u8],
142 ) -> Poll<io::Result<usize>> {
143 self.0.poll_read_priv(cx, buf)
144 }
145 }
146
147 impl AsyncWrite for WriteHalf<'_> {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>148 fn poll_write(
149 self: Pin<&mut Self>,
150 cx: &mut Context<'_>,
151 buf: &[u8],
152 ) -> Poll<io::Result<usize>> {
153 self.0.poll_write_priv(cx, buf)
154 }
155
poll_write_buf<B: Buf>( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut B, ) -> Poll<io::Result<usize>>156 fn poll_write_buf<B: Buf>(
157 self: Pin<&mut Self>,
158 cx: &mut Context<'_>,
159 buf: &mut B,
160 ) -> Poll<io::Result<usize>> {
161 self.0.poll_write_buf_priv(cx, buf)
162 }
163
164 #[inline]
poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>165 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
166 // tcp flush is a no-op
167 Poll::Ready(Ok(()))
168 }
169
170 // `poll_shutdown` on a write half shutdowns the stream in the "write" direction.
poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>171 fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
172 self.0.shutdown(Shutdown::Write).into()
173 }
174 }
175
176 impl AsRef<TcpStream> for ReadHalf<'_> {
as_ref(&self) -> &TcpStream177 fn as_ref(&self) -> &TcpStream {
178 self.0
179 }
180 }
181
182 impl AsRef<TcpStream> for WriteHalf<'_> {
as_ref(&self) -> &TcpStream183 fn as_ref(&self) -> &TcpStream {
184 self.0
185 }
186 }
187