1 //! `TcpStream` split support.
2 //!
3 //! A `TcpStream` can be split into a `ReadHalf` and a
4 //! `WriteHalf` with the `TcpStream::split` method. `ReadHalf`
5 //! implements `AsyncRead` while `WriteHalf` implements `AsyncWrite`.
6 //!
7 //! Compared to the generic split of `AsyncRead + AsyncWrite`, this specialized
8 //! split has no associated overhead and enforces all invariants at the type
9 //! level.
10
11 use crate::future::poll_fn;
12 use crate::io::{AsyncRead, AsyncWrite, Interest, ReadBuf, Ready};
13 use crate::net::TcpStream;
14
15 use std::io;
16 use std::net::{Shutdown, SocketAddr};
17 use std::pin::Pin;
18 use std::task::{Context, Poll};
19
20 cfg_io_util! {
21 use bytes::BufMut;
22 }
23
24 /// Borrowed read half of a [`TcpStream`], created by [`split`].
25 ///
26 /// Reading from a `ReadHalf` is usually done using the convenience methods found on the
27 /// [`AsyncReadExt`] trait.
28 ///
29 /// [`TcpStream`]: TcpStream
30 /// [`split`]: TcpStream::split()
31 /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
32 #[derive(Debug)]
33 pub struct ReadHalf<'a>(&'a TcpStream);
34
35 /// Borrowed write half of a [`TcpStream`], created by [`split`].
36 ///
37 /// Note that in the [`AsyncWrite`] implementation of this type, [`poll_shutdown`] will
38 /// shut down the TCP stream in the write direction.
39 ///
40 /// Writing to an `WriteHalf` is usually done using the convenience methods found
41 /// on the [`AsyncWriteExt`] trait.
42 ///
43 /// [`TcpStream`]: TcpStream
44 /// [`split`]: TcpStream::split()
45 /// [`AsyncWrite`]: trait@crate::io::AsyncWrite
46 /// [`poll_shutdown`]: fn@crate::io::AsyncWrite::poll_shutdown
47 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
48 #[derive(Debug)]
49 pub struct WriteHalf<'a>(&'a TcpStream);
50
split(stream: &mut TcpStream) -> (ReadHalf<'_>, WriteHalf<'_>)51 pub(crate) fn split(stream: &mut TcpStream) -> (ReadHalf<'_>, WriteHalf<'_>) {
52 (ReadHalf(&*stream), WriteHalf(&*stream))
53 }
54
55 impl ReadHalf<'_> {
56 /// Attempts to receive data on the socket, without removing that data from
57 /// the queue, registering the current task for wakeup if data is not yet
58 /// available.
59 ///
60 /// Note that on multiple calls to `poll_peek` or `poll_read`, only the
61 /// `Waker` from the `Context` passed to the most recent call is scheduled
62 /// to receive a wakeup.
63 ///
64 /// See the [`TcpStream::poll_peek`] level documentation for more details.
65 ///
66 /// # Examples
67 ///
68 /// ```no_run
69 /// use tokio::io::{self, ReadBuf};
70 /// use tokio::net::TcpStream;
71 ///
72 /// use futures::future::poll_fn;
73 ///
74 /// #[tokio::main]
75 /// async fn main() -> io::Result<()> {
76 /// let mut stream = TcpStream::connect("127.0.0.1:8000").await?;
77 /// let (mut read_half, _) = stream.split();
78 /// let mut buf = [0; 10];
79 /// let mut buf = ReadBuf::new(&mut buf);
80 ///
81 /// poll_fn(|cx| {
82 /// read_half.poll_peek(cx, &mut buf)
83 /// }).await?;
84 ///
85 /// Ok(())
86 /// }
87 /// ```
88 ///
89 /// [`TcpStream::poll_peek`]: TcpStream::poll_peek
poll_peek( &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<usize>>90 pub fn poll_peek(
91 &mut self,
92 cx: &mut Context<'_>,
93 buf: &mut ReadBuf<'_>,
94 ) -> Poll<io::Result<usize>> {
95 self.0.poll_peek(cx, buf)
96 }
97
98 /// Receives data on the socket from the remote address to which it is
99 /// connected, without removing that data from the queue. On success,
100 /// returns the number of bytes peeked.
101 ///
102 /// See the [`TcpStream::peek`] level documentation for more details.
103 ///
104 /// [`TcpStream::peek`]: TcpStream::peek
105 ///
106 /// # Examples
107 ///
108 /// ```no_run
109 /// use tokio::net::TcpStream;
110 /// use tokio::io::AsyncReadExt;
111 /// use std::error::Error;
112 ///
113 /// #[tokio::main]
114 /// async fn main() -> Result<(), Box<dyn Error>> {
115 /// // Connect to a peer
116 /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
117 /// let (mut read_half, _) = stream.split();
118 ///
119 /// let mut b1 = [0; 10];
120 /// let mut b2 = [0; 10];
121 ///
122 /// // Peek at the data
123 /// let n = read_half.peek(&mut b1).await?;
124 ///
125 /// // Read the data
126 /// assert_eq!(n, read_half.read(&mut b2[..n]).await?);
127 /// assert_eq!(&b1[..n], &b2[..n]);
128 ///
129 /// Ok(())
130 /// }
131 /// ```
132 ///
133 /// The [`read`] method is defined on the [`AsyncReadExt`] trait.
134 ///
135 /// [`read`]: fn@crate::io::AsyncReadExt::read
136 /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
peek(&mut self, buf: &mut [u8]) -> io::Result<usize>137 pub async fn peek(&mut self, buf: &mut [u8]) -> io::Result<usize> {
138 let mut buf = ReadBuf::new(buf);
139 poll_fn(|cx| self.poll_peek(cx, &mut buf)).await
140 }
141
142 /// Waits for any of the requested ready states.
143 ///
144 /// This function is usually paired with `try_read()` or `try_write()`. It
145 /// can be used to concurrently read / write to the same socket on a single
146 /// task without splitting the socket.
147 ///
148 /// This function is equivalent to [`TcpStream::ready`].
149 ///
150 /// # Cancel safety
151 ///
152 /// This method is cancel safe. Once a readiness event occurs, the method
153 /// will continue to return immediately until the readiness event is
154 /// consumed by an attempt to read or write that fails with `WouldBlock` or
155 /// `Poll::Pending`.
ready(&self, interest: Interest) -> io::Result<Ready>156 pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
157 self.0.ready(interest).await
158 }
159
160 /// Waits for the socket to become readable.
161 ///
162 /// This function is equivalent to `ready(Interest::READABLE)` and is usually
163 /// paired with `try_read()`.
164 ///
165 /// This function is also equivalent to [`TcpStream::ready`].
166 ///
167 /// # Cancel safety
168 ///
169 /// This method is cancel safe. Once a readiness event occurs, the method
170 /// will continue to return immediately until the readiness event is
171 /// consumed by an attempt to read that fails with `WouldBlock` or
172 /// `Poll::Pending`.
readable(&self) -> io::Result<()>173 pub async fn readable(&self) -> io::Result<()> {
174 self.0.readable().await
175 }
176
177 /// Tries to read data from the stream into the provided buffer, returning how
178 /// many bytes were read.
179 ///
180 /// Receives any pending data from the socket but does not wait for new data
181 /// to arrive. On success, returns the number of bytes read. Because
182 /// `try_read()` is non-blocking, the buffer does not have to be stored by
183 /// the async task and can exist entirely on the stack.
184 ///
185 /// Usually, [`readable()`] or [`ready()`] is used with this function.
186 ///
187 /// [`readable()`]: Self::readable()
188 /// [`ready()`]: Self::ready()
189 ///
190 /// # Return
191 ///
192 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
193 /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
194 /// and will no longer yield data. If the stream is not ready to read data
195 /// `Err(io::ErrorKind::WouldBlock)` is returned.
try_read(&self, buf: &mut [u8]) -> io::Result<usize>196 pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
197 self.0.try_read(buf)
198 }
199
200 /// Tries to read data from the stream into the provided buffers, returning
201 /// how many bytes were read.
202 ///
203 /// Data is copied to fill each buffer in order, with the final buffer
204 /// written to possibly being only partially filled. This method behaves
205 /// equivalently to a single call to [`try_read()`] with concatenated
206 /// buffers.
207 ///
208 /// Receives any pending data from the socket but does not wait for new data
209 /// to arrive. On success, returns the number of bytes read. Because
210 /// `try_read_vectored()` is non-blocking, the buffer does not have to be
211 /// stored by the async task and can exist entirely on the stack.
212 ///
213 /// Usually, [`readable()`] or [`ready()`] is used with this function.
214 ///
215 /// [`try_read()`]: Self::try_read()
216 /// [`readable()`]: Self::readable()
217 /// [`ready()`]: Self::ready()
218 ///
219 /// # Return
220 ///
221 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
222 /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
223 /// and will no longer yield data. If the stream is not ready to read data
224 /// `Err(io::ErrorKind::WouldBlock)` is returned.
try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize>225 pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
226 self.0.try_read_vectored(bufs)
227 }
228
229 cfg_io_util! {
230 /// Tries to read data from the stream into the provided buffer, advancing the
231 /// buffer's internal cursor, returning how many bytes were read.
232 ///
233 /// Receives any pending data from the socket but does not wait for new data
234 /// to arrive. On success, returns the number of bytes read. Because
235 /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
236 /// the async task and can exist entirely on the stack.
237 ///
238 /// Usually, [`readable()`] or [`ready()`] is used with this function.
239 ///
240 /// [`readable()`]: Self::readable()
241 /// [`ready()`]: Self::ready()
242 ///
243 /// # Return
244 ///
245 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
246 /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
247 /// and will no longer yield data. If the stream is not ready to read data
248 /// `Err(io::ErrorKind::WouldBlock)` is returned.
249 pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
250 self.0.try_read_buf(buf)
251 }
252 }
253
254 /// Returns the remote address that this stream is connected to.
peer_addr(&self) -> io::Result<SocketAddr>255 pub fn peer_addr(&self) -> io::Result<SocketAddr> {
256 self.0.peer_addr()
257 }
258
259 /// Returns the local address that this stream is bound to.
local_addr(&self) -> io::Result<SocketAddr>260 pub fn local_addr(&self) -> io::Result<SocketAddr> {
261 self.0.local_addr()
262 }
263 }
264
265 impl WriteHalf<'_> {
266 /// Waits for any of the requested ready states.
267 ///
268 /// This function is usually paired with `try_read()` or `try_write()`. It
269 /// can be used to concurrently read / write to the same socket on a single
270 /// task without splitting the socket.
271 ///
272 /// This function is equivalent to [`TcpStream::ready`].
273 ///
274 /// # Cancel safety
275 ///
276 /// This method is cancel safe. Once a readiness event occurs, the method
277 /// will continue to return immediately until the readiness event is
278 /// consumed by an attempt to read or write that fails with `WouldBlock` or
279 /// `Poll::Pending`.
ready(&self, interest: Interest) -> io::Result<Ready>280 pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
281 self.0.ready(interest).await
282 }
283
284 /// Waits for the socket to become writable.
285 ///
286 /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
287 /// paired with `try_write()`.
288 ///
289 /// # Cancel safety
290 ///
291 /// This method is cancel safe. Once a readiness event occurs, the method
292 /// will continue to return immediately until the readiness event is
293 /// consumed by an attempt to write that fails with `WouldBlock` or
294 /// `Poll::Pending`.
writable(&self) -> io::Result<()>295 pub async fn writable(&self) -> io::Result<()> {
296 self.0.writable().await
297 }
298
299 /// Tries to write a buffer to the stream, returning how many bytes were
300 /// written.
301 ///
302 /// The function will attempt to write the entire contents of `buf`, but
303 /// only part of the buffer may be written.
304 ///
305 /// This function is usually paired with `writable()`.
306 ///
307 /// # Return
308 ///
309 /// If data is successfully written, `Ok(n)` is returned, where `n` is the
310 /// number of bytes written. If the stream is not ready to write data,
311 /// `Err(io::ErrorKind::WouldBlock)` is returned.
try_write(&self, buf: &[u8]) -> io::Result<usize>312 pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
313 self.0.try_write(buf)
314 }
315
316 /// Tries to write several buffers to the stream, returning how many bytes
317 /// were written.
318 ///
319 /// Data is written from each buffer in order, with the final buffer read
320 /// from possible being only partially consumed. This method behaves
321 /// equivalently to a single call to [`try_write()`] with concatenated
322 /// buffers.
323 ///
324 /// This function is usually paired with `writable()`.
325 ///
326 /// [`try_write()`]: Self::try_write()
327 ///
328 /// # Return
329 ///
330 /// If data is successfully written, `Ok(n)` is returned, where `n` is the
331 /// number of bytes written. If the stream is not ready to write data,
332 /// `Err(io::ErrorKind::WouldBlock)` is returned.
try_write_vectored(&self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize>333 pub fn try_write_vectored(&self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
334 self.0.try_write_vectored(bufs)
335 }
336
337 /// Returns the remote address that this stream is connected to.
peer_addr(&self) -> io::Result<SocketAddr>338 pub fn peer_addr(&self) -> io::Result<SocketAddr> {
339 self.0.peer_addr()
340 }
341
342 /// Returns the local address that this stream is bound to.
local_addr(&self) -> io::Result<SocketAddr>343 pub fn local_addr(&self) -> io::Result<SocketAddr> {
344 self.0.local_addr()
345 }
346 }
347
348 impl AsyncRead for ReadHalf<'_> {
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>349 fn poll_read(
350 self: Pin<&mut Self>,
351 cx: &mut Context<'_>,
352 buf: &mut ReadBuf<'_>,
353 ) -> Poll<io::Result<()>> {
354 self.0.poll_read_priv(cx, buf)
355 }
356 }
357
358 impl AsyncWrite for WriteHalf<'_> {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>359 fn poll_write(
360 self: Pin<&mut Self>,
361 cx: &mut Context<'_>,
362 buf: &[u8],
363 ) -> Poll<io::Result<usize>> {
364 self.0.poll_write_priv(cx, buf)
365 }
366
poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll<io::Result<usize>>367 fn poll_write_vectored(
368 self: Pin<&mut Self>,
369 cx: &mut Context<'_>,
370 bufs: &[io::IoSlice<'_>],
371 ) -> Poll<io::Result<usize>> {
372 self.0.poll_write_vectored_priv(cx, bufs)
373 }
374
is_write_vectored(&self) -> bool375 fn is_write_vectored(&self) -> bool {
376 self.0.is_write_vectored()
377 }
378
379 #[inline]
poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>380 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
381 // tcp flush is a no-op
382 Poll::Ready(Ok(()))
383 }
384
385 // `poll_shutdown` on a write half shutdowns the stream in the "write" direction.
poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>386 fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
387 self.0.shutdown_std(Shutdown::Write).into()
388 }
389 }
390
391 impl AsRef<TcpStream> for ReadHalf<'_> {
as_ref(&self) -> &TcpStream392 fn as_ref(&self) -> &TcpStream {
393 self.0
394 }
395 }
396
397 impl AsRef<TcpStream> for WriteHalf<'_> {
as_ref(&self) -> &TcpStream398 fn as_ref(&self) -> &TcpStream {
399 self.0
400 }
401 }
402