1 use crate::future::poll_fn; 2 use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready}; 3 use crate::net::unix::split::{split, ReadHalf, WriteHalf}; 4 use crate::net::unix::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf}; 5 use crate::net::unix::ucred::{self, UCred}; 6 use crate::net::unix::SocketAddr; 7 8 use std::convert::TryFrom; 9 use std::fmt; 10 use std::io::{self, Read, Write}; 11 use std::net::Shutdown; 12 use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; 13 use std::os::unix::net; 14 use std::path::Path; 15 use std::pin::Pin; 16 use std::task::{Context, Poll}; 17 18 cfg_io_util! { 19 use bytes::BufMut; 20 } 21 22 cfg_net_unix! { 23 /// A structure representing a connected Unix socket. 24 /// 25 /// This socket can be connected directly with `UnixStream::connect` or accepted 26 /// from a listener with `UnixListener::incoming`. Additionally, a pair of 27 /// anonymous Unix sockets can be created with `UnixStream::pair`. 28 /// 29 /// To shut down the stream in the write direction, you can call the 30 /// [`shutdown()`] method. This will cause the other peer to receive a read of 31 /// length 0, indicating that no more data will be sent. This only closes 32 /// the stream in one direction. 33 /// 34 /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown 35 pub struct UnixStream { 36 io: PollEvented<mio::net::UnixStream>, 37 } 38 } 39 40 impl UnixStream { 41 /// Connects to the socket named by `path`. 42 /// 43 /// This function will create a new Unix socket and connect to the path 44 /// specified, associating the returned stream with the default event loop's 45 /// handle. connect<P>(path: P) -> io::Result<UnixStream> where P: AsRef<Path>,46 pub async fn connect<P>(path: P) -> io::Result<UnixStream> 47 where 48 P: AsRef<Path>, 49 { 50 let stream = mio::net::UnixStream::connect(path)?; 51 let stream = UnixStream::new(stream)?; 52 53 poll_fn(|cx| stream.io.registration().poll_write_ready(cx)).await?; 54 55 if let Some(e) = stream.io.take_error()? { 56 return Err(e); 57 } 58 59 Ok(stream) 60 } 61 62 /// Waits for any of the requested ready states. 63 /// 64 /// This function is usually paired with `try_read()` or `try_write()`. It 65 /// can be used to concurrently read / write to the same socket on a single 66 /// task without splitting the socket. 67 /// 68 /// # Cancel safety 69 /// 70 /// This method is cancel safe. Once a readiness event occurs, the method 71 /// will continue to return immediately until the readiness event is 72 /// consumed by an attempt to read or write that fails with `WouldBlock` or 73 /// `Poll::Pending`. 74 /// 75 /// # Examples 76 /// 77 /// Concurrently read and write to the stream on the same task without 78 /// splitting. 79 /// 80 /// ```no_run 81 /// use tokio::io::Interest; 82 /// use tokio::net::UnixStream; 83 /// use std::error::Error; 84 /// use std::io; 85 /// 86 /// #[tokio::main] 87 /// async fn main() -> Result<(), Box<dyn Error>> { 88 /// let dir = tempfile::tempdir().unwrap(); 89 /// let bind_path = dir.path().join("bind_path"); 90 /// let stream = UnixStream::connect(bind_path).await?; 91 /// 92 /// loop { 93 /// let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await?; 94 /// 95 /// if ready.is_readable() { 96 /// let mut data = vec![0; 1024]; 97 /// // Try to read data, this may still fail with `WouldBlock` 98 /// // if the readiness event is a false positive. 99 /// match stream.try_read(&mut data) { 100 /// Ok(n) => { 101 /// println!("read {} bytes", n); 102 /// } 103 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 104 /// continue; 105 /// } 106 /// Err(e) => { 107 /// return Err(e.into()); 108 /// } 109 /// } 110 /// 111 /// } 112 /// 113 /// if ready.is_writable() { 114 /// // Try to write data, this may still fail with `WouldBlock` 115 /// // if the readiness event is a false positive. 116 /// match stream.try_write(b"hello world") { 117 /// Ok(n) => { 118 /// println!("write {} bytes", n); 119 /// } 120 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 121 /// continue; 122 /// } 123 /// Err(e) => { 124 /// return Err(e.into()); 125 /// } 126 /// } 127 /// } 128 /// } 129 /// } 130 /// ``` ready(&self, interest: Interest) -> io::Result<Ready>131 pub async fn ready(&self, interest: Interest) -> io::Result<Ready> { 132 let event = self.io.registration().readiness(interest).await?; 133 Ok(event.ready) 134 } 135 136 /// Waits for the socket to become readable. 137 /// 138 /// This function is equivalent to `ready(Interest::READABLE)` and is usually 139 /// paired with `try_read()`. 140 /// 141 /// # Cancel safety 142 /// 143 /// This method is cancel safe. Once a readiness event occurs, the method 144 /// will continue to return immediately until the readiness event is 145 /// consumed by an attempt to read that fails with `WouldBlock` or 146 /// `Poll::Pending`. 147 /// 148 /// # Examples 149 /// 150 /// ```no_run 151 /// use tokio::net::UnixStream; 152 /// use std::error::Error; 153 /// use std::io; 154 /// 155 /// #[tokio::main] 156 /// async fn main() -> Result<(), Box<dyn Error>> { 157 /// // Connect to a peer 158 /// let dir = tempfile::tempdir().unwrap(); 159 /// let bind_path = dir.path().join("bind_path"); 160 /// let stream = UnixStream::connect(bind_path).await?; 161 /// 162 /// let mut msg = vec![0; 1024]; 163 /// 164 /// loop { 165 /// // Wait for the socket to be readable 166 /// stream.readable().await?; 167 /// 168 /// // Try to read data, this may still fail with `WouldBlock` 169 /// // if the readiness event is a false positive. 170 /// match stream.try_read(&mut msg) { 171 /// Ok(n) => { 172 /// msg.truncate(n); 173 /// break; 174 /// } 175 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 176 /// continue; 177 /// } 178 /// Err(e) => { 179 /// return Err(e.into()); 180 /// } 181 /// } 182 /// } 183 /// 184 /// println!("GOT = {:?}", msg); 185 /// Ok(()) 186 /// } 187 /// ``` readable(&self) -> io::Result<()>188 pub async fn readable(&self) -> io::Result<()> { 189 self.ready(Interest::READABLE).await?; 190 Ok(()) 191 } 192 193 /// Polls for read readiness. 194 /// 195 /// If the unix stream is not currently ready for reading, this method will 196 /// store a clone of the `Waker` from the provided `Context`. When the unix 197 /// stream becomes ready for reading, `Waker::wake` will be called on the 198 /// waker. 199 /// 200 /// Note that on multiple calls to `poll_read_ready` or `poll_read`, only 201 /// the `Waker` from the `Context` passed to the most recent call is 202 /// scheduled to receive a wakeup. (However, `poll_write_ready` retains a 203 /// second, independent waker.) 204 /// 205 /// This function is intended for cases where creating and pinning a future 206 /// via [`readable`] is not feasible. Where possible, using [`readable`] is 207 /// preferred, as this supports polling from multiple tasks at once. 208 /// 209 /// # Return value 210 /// 211 /// The function returns: 212 /// 213 /// * `Poll::Pending` if the unix stream is not ready for reading. 214 /// * `Poll::Ready(Ok(()))` if the unix stream is ready for reading. 215 /// * `Poll::Ready(Err(e))` if an error is encountered. 216 /// 217 /// # Errors 218 /// 219 /// This function may encounter any standard I/O error except `WouldBlock`. 220 /// 221 /// [`readable`]: method@Self::readable poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>222 pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { 223 self.io.registration().poll_read_ready(cx).map_ok(|_| ()) 224 } 225 226 /// Try to read data from the stream into the provided buffer, returning how 227 /// many bytes were read. 228 /// 229 /// Receives any pending data from the socket but does not wait for new data 230 /// to arrive. On success, returns the number of bytes read. Because 231 /// `try_read()` is non-blocking, the buffer does not have to be stored by 232 /// the async task and can exist entirely on the stack. 233 /// 234 /// Usually, [`readable()`] or [`ready()`] is used with this function. 235 /// 236 /// [`readable()`]: UnixStream::readable() 237 /// [`ready()`]: UnixStream::ready() 238 /// 239 /// # Return 240 /// 241 /// If data is successfully read, `Ok(n)` is returned, where `n` is the 242 /// number of bytes read. `Ok(0)` indicates the stream's read half is closed 243 /// and will no longer yield data. If the stream is not ready to read data 244 /// `Err(io::ErrorKind::WouldBlock)` is returned. 245 /// 246 /// # Examples 247 /// 248 /// ```no_run 249 /// use tokio::net::UnixStream; 250 /// use std::error::Error; 251 /// use std::io; 252 /// 253 /// #[tokio::main] 254 /// async fn main() -> Result<(), Box<dyn Error>> { 255 /// // Connect to a peer 256 /// let dir = tempfile::tempdir().unwrap(); 257 /// let bind_path = dir.path().join("bind_path"); 258 /// let stream = UnixStream::connect(bind_path).await?; 259 /// 260 /// loop { 261 /// // Wait for the socket to be readable 262 /// stream.readable().await?; 263 /// 264 /// // Creating the buffer **after** the `await` prevents it from 265 /// // being stored in the async task. 266 /// let mut buf = [0; 4096]; 267 /// 268 /// // Try to read data, this may still fail with `WouldBlock` 269 /// // if the readiness event is a false positive. 270 /// match stream.try_read(&mut buf) { 271 /// Ok(0) => break, 272 /// Ok(n) => { 273 /// println!("read {} bytes", n); 274 /// } 275 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 276 /// continue; 277 /// } 278 /// Err(e) => { 279 /// return Err(e.into()); 280 /// } 281 /// } 282 /// } 283 /// 284 /// Ok(()) 285 /// } 286 /// ``` try_read(&self, buf: &mut [u8]) -> io::Result<usize>287 pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> { 288 self.io 289 .registration() 290 .try_io(Interest::READABLE, || (&*self.io).read(buf)) 291 } 292 293 /// Tries to read data from the stream into the provided buffers, returning 294 /// how many bytes were read. 295 /// 296 /// Data is copied to fill each buffer in order, with the final buffer 297 /// written to possibly being only partially filled. This method behaves 298 /// equivalently to a single call to [`try_read()`] with concatenated 299 /// buffers. 300 /// 301 /// Receives any pending data from the socket but does not wait for new data 302 /// to arrive. On success, returns the number of bytes read. Because 303 /// `try_read_vectored()` is non-blocking, the buffer does not have to be 304 /// stored by the async task and can exist entirely on the stack. 305 /// 306 /// Usually, [`readable()`] or [`ready()`] is used with this function. 307 /// 308 /// [`try_read()`]: UnixStream::try_read() 309 /// [`readable()`]: UnixStream::readable() 310 /// [`ready()`]: UnixStream::ready() 311 /// 312 /// # Return 313 /// 314 /// If data is successfully read, `Ok(n)` is returned, where `n` is the 315 /// number of bytes read. `Ok(0)` indicates the stream's read half is closed 316 /// and will no longer yield data. If the stream is not ready to read data 317 /// `Err(io::ErrorKind::WouldBlock)` is returned. 318 /// 319 /// # Examples 320 /// 321 /// ```no_run 322 /// use tokio::net::UnixStream; 323 /// use std::error::Error; 324 /// use std::io::{self, IoSliceMut}; 325 /// 326 /// #[tokio::main] 327 /// async fn main() -> Result<(), Box<dyn Error>> { 328 /// // Connect to a peer 329 /// let dir = tempfile::tempdir().unwrap(); 330 /// let bind_path = dir.path().join("bind_path"); 331 /// let stream = UnixStream::connect(bind_path).await?; 332 /// 333 /// loop { 334 /// // Wait for the socket to be readable 335 /// stream.readable().await?; 336 /// 337 /// // Creating the buffer **after** the `await` prevents it from 338 /// // being stored in the async task. 339 /// let mut buf_a = [0; 512]; 340 /// let mut buf_b = [0; 1024]; 341 /// let mut bufs = [ 342 /// IoSliceMut::new(&mut buf_a), 343 /// IoSliceMut::new(&mut buf_b), 344 /// ]; 345 /// 346 /// // Try to read data, this may still fail with `WouldBlock` 347 /// // if the readiness event is a false positive. 348 /// match stream.try_read_vectored(&mut bufs) { 349 /// Ok(0) => break, 350 /// Ok(n) => { 351 /// println!("read {} bytes", n); 352 /// } 353 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 354 /// continue; 355 /// } 356 /// Err(e) => { 357 /// return Err(e.into()); 358 /// } 359 /// } 360 /// } 361 /// 362 /// Ok(()) 363 /// } 364 /// ``` try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize>365 pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> { 366 self.io 367 .registration() 368 .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs)) 369 } 370 371 cfg_io_util! { 372 /// Tries to read data from the stream into the provided buffer, advancing the 373 /// buffer's internal cursor, returning how many bytes were read. 374 /// 375 /// Receives any pending data from the socket but does not wait for new data 376 /// to arrive. On success, returns the number of bytes read. Because 377 /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by 378 /// the async task and can exist entirely on the stack. 379 /// 380 /// Usually, [`readable()`] or [`ready()`] is used with this function. 381 /// 382 /// [`readable()`]: UnixStream::readable() 383 /// [`ready()`]: UnixStream::ready() 384 /// 385 /// # Return 386 /// 387 /// If data is successfully read, `Ok(n)` is returned, where `n` is the 388 /// number of bytes read. `Ok(0)` indicates the stream's read half is closed 389 /// and will no longer yield data. If the stream is not ready to read data 390 /// `Err(io::ErrorKind::WouldBlock)` is returned. 391 /// 392 /// # Examples 393 /// 394 /// ```no_run 395 /// use tokio::net::UnixStream; 396 /// use std::error::Error; 397 /// use std::io; 398 /// 399 /// #[tokio::main] 400 /// async fn main() -> Result<(), Box<dyn Error>> { 401 /// // Connect to a peer 402 /// let dir = tempfile::tempdir().unwrap(); 403 /// let bind_path = dir.path().join("bind_path"); 404 /// let stream = UnixStream::connect(bind_path).await?; 405 /// 406 /// loop { 407 /// // Wait for the socket to be readable 408 /// stream.readable().await?; 409 /// 410 /// let mut buf = Vec::with_capacity(4096); 411 /// 412 /// // Try to read data, this may still fail with `WouldBlock` 413 /// // if the readiness event is a false positive. 414 /// match stream.try_read_buf(&mut buf) { 415 /// Ok(0) => break, 416 /// Ok(n) => { 417 /// println!("read {} bytes", n); 418 /// } 419 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 420 /// continue; 421 /// } 422 /// Err(e) => { 423 /// return Err(e.into()); 424 /// } 425 /// } 426 /// } 427 /// 428 /// Ok(()) 429 /// } 430 /// ``` 431 pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> { 432 self.io.registration().try_io(Interest::READABLE, || { 433 use std::io::Read; 434 435 let dst = buf.chunk_mut(); 436 let dst = 437 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) }; 438 439 // Safety: We trust `UnixStream::read` to have filled up `n` bytes in the 440 // buffer. 441 let n = (&*self.io).read(dst)?; 442 443 unsafe { 444 buf.advance_mut(n); 445 } 446 447 Ok(n) 448 }) 449 } 450 } 451 452 /// Waits for the socket to become writable. 453 /// 454 /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually 455 /// paired with `try_write()`. 456 /// 457 /// # Cancel safety 458 /// 459 /// This method is cancel safe. Once a readiness event occurs, the method 460 /// will continue to return immediately until the readiness event is 461 /// consumed by an attempt to write that fails with `WouldBlock` or 462 /// `Poll::Pending`. 463 /// 464 /// # Examples 465 /// 466 /// ```no_run 467 /// use tokio::net::UnixStream; 468 /// use std::error::Error; 469 /// use std::io; 470 /// 471 /// #[tokio::main] 472 /// async fn main() -> Result<(), Box<dyn Error>> { 473 /// // Connect to a peer 474 /// let dir = tempfile::tempdir().unwrap(); 475 /// let bind_path = dir.path().join("bind_path"); 476 /// let stream = UnixStream::connect(bind_path).await?; 477 /// 478 /// loop { 479 /// // Wait for the socket to be writable 480 /// stream.writable().await?; 481 /// 482 /// // Try to write data, this may still fail with `WouldBlock` 483 /// // if the readiness event is a false positive. 484 /// match stream.try_write(b"hello world") { 485 /// Ok(n) => { 486 /// break; 487 /// } 488 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 489 /// continue; 490 /// } 491 /// Err(e) => { 492 /// return Err(e.into()); 493 /// } 494 /// } 495 /// } 496 /// 497 /// Ok(()) 498 /// } 499 /// ``` writable(&self) -> io::Result<()>500 pub async fn writable(&self) -> io::Result<()> { 501 self.ready(Interest::WRITABLE).await?; 502 Ok(()) 503 } 504 505 /// Polls for write readiness. 506 /// 507 /// If the unix stream is not currently ready for writing, this method will 508 /// store a clone of the `Waker` from the provided `Context`. When the unix 509 /// stream becomes ready for writing, `Waker::wake` will be called on the 510 /// waker. 511 /// 512 /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only 513 /// the `Waker` from the `Context` passed to the most recent call is 514 /// scheduled to receive a wakeup. (However, `poll_read_ready` retains a 515 /// second, independent waker.) 516 /// 517 /// This function is intended for cases where creating and pinning a future 518 /// via [`writable`] is not feasible. Where possible, using [`writable`] is 519 /// preferred, as this supports polling from multiple tasks at once. 520 /// 521 /// # Return value 522 /// 523 /// The function returns: 524 /// 525 /// * `Poll::Pending` if the unix stream is not ready for writing. 526 /// * `Poll::Ready(Ok(()))` if the unix stream is ready for writing. 527 /// * `Poll::Ready(Err(e))` if an error is encountered. 528 /// 529 /// # Errors 530 /// 531 /// This function may encounter any standard I/O error except `WouldBlock`. 532 /// 533 /// [`writable`]: method@Self::writable poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>534 pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { 535 self.io.registration().poll_write_ready(cx).map_ok(|_| ()) 536 } 537 538 /// Tries to write a buffer to the stream, returning how many bytes were 539 /// written. 540 /// 541 /// The function will attempt to write the entire contents of `buf`, but 542 /// only part of the buffer may be written. 543 /// 544 /// This function is usually paired with `writable()`. 545 /// 546 /// # Return 547 /// 548 /// If data is successfully written, `Ok(n)` is returned, where `n` is the 549 /// number of bytes written. If the stream is not ready to write data, 550 /// `Err(io::ErrorKind::WouldBlock)` is returned. 551 /// 552 /// # Examples 553 /// 554 /// ```no_run 555 /// use tokio::net::UnixStream; 556 /// use std::error::Error; 557 /// use std::io; 558 /// 559 /// #[tokio::main] 560 /// async fn main() -> Result<(), Box<dyn Error>> { 561 /// // Connect to a peer 562 /// let dir = tempfile::tempdir().unwrap(); 563 /// let bind_path = dir.path().join("bind_path"); 564 /// let stream = UnixStream::connect(bind_path).await?; 565 /// 566 /// loop { 567 /// // Wait for the socket to be writable 568 /// stream.writable().await?; 569 /// 570 /// // Try to write data, this may still fail with `WouldBlock` 571 /// // if the readiness event is a false positive. 572 /// match stream.try_write(b"hello world") { 573 /// Ok(n) => { 574 /// break; 575 /// } 576 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 577 /// continue; 578 /// } 579 /// Err(e) => { 580 /// return Err(e.into()); 581 /// } 582 /// } 583 /// } 584 /// 585 /// Ok(()) 586 /// } 587 /// ``` try_write(&self, buf: &[u8]) -> io::Result<usize>588 pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> { 589 self.io 590 .registration() 591 .try_io(Interest::WRITABLE, || (&*self.io).write(buf)) 592 } 593 594 /// Tries to write several buffers to the stream, returning how many bytes 595 /// were written. 596 /// 597 /// Data is written from each buffer in order, with the final buffer read 598 /// from possible being only partially consumed. This method behaves 599 /// equivalently to a single call to [`try_write()`] with concatenated 600 /// buffers. 601 /// 602 /// This function is usually paired with `writable()`. 603 /// 604 /// [`try_write()`]: UnixStream::try_write() 605 /// 606 /// # Return 607 /// 608 /// If data is successfully written, `Ok(n)` is returned, where `n` is the 609 /// number of bytes written. If the stream is not ready to write data, 610 /// `Err(io::ErrorKind::WouldBlock)` is returned. 611 /// 612 /// # Examples 613 /// 614 /// ```no_run 615 /// use tokio::net::UnixStream; 616 /// use std::error::Error; 617 /// use std::io; 618 /// 619 /// #[tokio::main] 620 /// async fn main() -> Result<(), Box<dyn Error>> { 621 /// // Connect to a peer 622 /// let dir = tempfile::tempdir().unwrap(); 623 /// let bind_path = dir.path().join("bind_path"); 624 /// let stream = UnixStream::connect(bind_path).await?; 625 /// 626 /// let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")]; 627 /// 628 /// loop { 629 /// // Wait for the socket to be writable 630 /// stream.writable().await?; 631 /// 632 /// // Try to write data, this may still fail with `WouldBlock` 633 /// // if the readiness event is a false positive. 634 /// match stream.try_write_vectored(&bufs) { 635 /// Ok(n) => { 636 /// break; 637 /// } 638 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 639 /// continue; 640 /// } 641 /// Err(e) => { 642 /// return Err(e.into()); 643 /// } 644 /// } 645 /// } 646 /// 647 /// Ok(()) 648 /// } 649 /// ``` try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize>650 pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> { 651 self.io 652 .registration() 653 .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf)) 654 } 655 656 /// Tries to read or write from the socket using a user-provided IO operation. 657 /// 658 /// If the socket is ready, the provided closure is called. The closure 659 /// should attempt to perform IO operation from the socket by manually 660 /// calling the appropriate syscall. If the operation fails because the 661 /// socket is not actually ready, then the closure should return a 662 /// `WouldBlock` error and the readiness flag is cleared. The return value 663 /// of the closure is then returned by `try_io`. 664 /// 665 /// If the socket is not ready, then the closure is not called 666 /// and a `WouldBlock` error is returned. 667 /// 668 /// The closure should only return a `WouldBlock` error if it has performed 669 /// an IO operation on the socket that failed due to the socket not being 670 /// ready. Returning a `WouldBlock` error in any other situation will 671 /// incorrectly clear the readiness flag, which can cause the socket to 672 /// behave incorrectly. 673 /// 674 /// The closure should not perform the IO operation using any of the methods 675 /// defined on the Tokio `UnixStream` type, as this will mess with the 676 /// readiness flag and can cause the socket to behave incorrectly. 677 /// 678 /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function. 679 /// 680 /// [`readable()`]: UnixStream::readable() 681 /// [`writable()`]: UnixStream::writable() 682 /// [`ready()`]: UnixStream::ready() try_io<R>( &self, interest: Interest, f: impl FnOnce() -> io::Result<R>, ) -> io::Result<R>683 pub fn try_io<R>( 684 &self, 685 interest: Interest, 686 f: impl FnOnce() -> io::Result<R>, 687 ) -> io::Result<R> { 688 self.io.registration().try_io(interest, f) 689 } 690 691 /// Creates new `UnixStream` from a `std::os::unix::net::UnixStream`. 692 /// 693 /// This function is intended to be used to wrap a UnixStream from the 694 /// standard library in the Tokio equivalent. The conversion assumes 695 /// nothing about the underlying stream; it is left up to the user to set 696 /// it in non-blocking mode. 697 /// 698 /// # Panics 699 /// 700 /// This function panics if thread-local runtime is not set. 701 /// 702 /// The runtime is usually set implicitly when this function is called 703 /// from a future driven by a tokio runtime, otherwise runtime can be set 704 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. from_std(stream: net::UnixStream) -> io::Result<UnixStream>705 pub fn from_std(stream: net::UnixStream) -> io::Result<UnixStream> { 706 let stream = mio::net::UnixStream::from_std(stream); 707 let io = PollEvented::new(stream)?; 708 709 Ok(UnixStream { io }) 710 } 711 712 /// Turns a [`tokio::net::UnixStream`] into a [`std::os::unix::net::UnixStream`]. 713 /// 714 /// The returned [`std::os::unix::net::UnixStream`] will have nonblocking 715 /// mode set as `true`. Use [`set_nonblocking`] to change the blocking 716 /// mode if needed. 717 /// 718 /// # Examples 719 /// 720 /// ``` 721 /// use std::error::Error; 722 /// use std::io::Read; 723 /// use tokio::net::UnixListener; 724 /// # use tokio::net::UnixStream; 725 /// # use tokio::io::AsyncWriteExt; 726 /// 727 /// #[tokio::main] 728 /// async fn main() -> Result<(), Box<dyn Error>> { 729 /// let dir = tempfile::tempdir().unwrap(); 730 /// let bind_path = dir.path().join("bind_path"); 731 /// 732 /// let mut data = [0u8; 12]; 733 /// let listener = UnixListener::bind(&bind_path)?; 734 /// # let handle = tokio::spawn(async { 735 /// # let mut stream = UnixStream::connect(bind_path).await.unwrap(); 736 /// # stream.write(b"Hello world!").await.unwrap(); 737 /// # }); 738 /// let (tokio_unix_stream, _) = listener.accept().await?; 739 /// let mut std_unix_stream = tokio_unix_stream.into_std()?; 740 /// # handle.await.expect("The task being joined has panicked"); 741 /// std_unix_stream.set_nonblocking(false)?; 742 /// std_unix_stream.read_exact(&mut data)?; 743 /// # assert_eq!(b"Hello world!", &data); 744 /// Ok(()) 745 /// } 746 /// ``` 747 /// [`tokio::net::UnixStream`]: UnixStream 748 /// [`std::os::unix::net::UnixStream`]: std::os::unix::net::UnixStream 749 /// [`set_nonblocking`]: fn@std::os::unix::net::UnixStream::set_nonblocking into_std(self) -> io::Result<std::os::unix::net::UnixStream>750 pub fn into_std(self) -> io::Result<std::os::unix::net::UnixStream> { 751 self.io 752 .into_inner() 753 .map(|io| io.into_raw_fd()) 754 .map(|raw_fd| unsafe { std::os::unix::net::UnixStream::from_raw_fd(raw_fd) }) 755 } 756 757 /// Creates an unnamed pair of connected sockets. 758 /// 759 /// This function will create a pair of interconnected Unix sockets for 760 /// communicating back and forth between one another. Each socket will 761 /// be associated with the default event loop's handle. pair() -> io::Result<(UnixStream, UnixStream)>762 pub fn pair() -> io::Result<(UnixStream, UnixStream)> { 763 let (a, b) = mio::net::UnixStream::pair()?; 764 let a = UnixStream::new(a)?; 765 let b = UnixStream::new(b)?; 766 767 Ok((a, b)) 768 } 769 new(stream: mio::net::UnixStream) -> io::Result<UnixStream>770 pub(crate) fn new(stream: mio::net::UnixStream) -> io::Result<UnixStream> { 771 let io = PollEvented::new(stream)?; 772 Ok(UnixStream { io }) 773 } 774 775 /// Returns the socket address of the local half of this connection. 776 /// 777 /// # Examples 778 /// 779 /// ```no_run 780 /// use tokio::net::UnixStream; 781 /// 782 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { 783 /// let dir = tempfile::tempdir().unwrap(); 784 /// let bind_path = dir.path().join("bind_path"); 785 /// let stream = UnixStream::connect(bind_path).await?; 786 /// 787 /// println!("{:?}", stream.local_addr()?); 788 /// # Ok(()) 789 /// # } 790 /// ``` local_addr(&self) -> io::Result<SocketAddr>791 pub fn local_addr(&self) -> io::Result<SocketAddr> { 792 self.io.local_addr().map(SocketAddr) 793 } 794 795 /// Returns the socket address of the remote half of this connection. 796 /// 797 /// # Examples 798 /// 799 /// ```no_run 800 /// use tokio::net::UnixStream; 801 /// 802 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { 803 /// let dir = tempfile::tempdir().unwrap(); 804 /// let bind_path = dir.path().join("bind_path"); 805 /// let stream = UnixStream::connect(bind_path).await?; 806 /// 807 /// println!("{:?}", stream.peer_addr()?); 808 /// # Ok(()) 809 /// # } 810 /// ``` peer_addr(&self) -> io::Result<SocketAddr>811 pub fn peer_addr(&self) -> io::Result<SocketAddr> { 812 self.io.peer_addr().map(SocketAddr) 813 } 814 815 /// Returns effective credentials of the process which called `connect` or `pair`. peer_cred(&self) -> io::Result<UCred>816 pub fn peer_cred(&self) -> io::Result<UCred> { 817 ucred::get_peer_cred(self) 818 } 819 820 /// Returns the value of the `SO_ERROR` option. take_error(&self) -> io::Result<Option<io::Error>>821 pub fn take_error(&self) -> io::Result<Option<io::Error>> { 822 self.io.take_error() 823 } 824 825 /// Shuts down the read, write, or both halves of this connection. 826 /// 827 /// This function will cause all pending and future I/O calls on the 828 /// specified portions to immediately return with an appropriate value 829 /// (see the documentation of `Shutdown`). shutdown_std(&self, how: Shutdown) -> io::Result<()>830 pub(super) fn shutdown_std(&self, how: Shutdown) -> io::Result<()> { 831 self.io.shutdown(how) 832 } 833 834 // These lifetime markers also appear in the generated documentation, and make 835 // it more clear that this is a *borrowed* split. 836 #[allow(clippy::needless_lifetimes)] 837 /// Splits a `UnixStream` into a read half and a write half, which can be used 838 /// to read and write the stream concurrently. 839 /// 840 /// This method is more efficient than [`into_split`], but the halves cannot be 841 /// moved into independently spawned tasks. 842 /// 843 /// [`into_split`]: Self::into_split() split<'a>(&'a mut self) -> (ReadHalf<'a>, WriteHalf<'a>)844 pub fn split<'a>(&'a mut self) -> (ReadHalf<'a>, WriteHalf<'a>) { 845 split(self) 846 } 847 848 /// Splits a `UnixStream` into a read half and a write half, which can be used 849 /// to read and write the stream concurrently. 850 /// 851 /// Unlike [`split`], the owned halves can be moved to separate tasks, however 852 /// this comes at the cost of a heap allocation. 853 /// 854 /// **Note:** Dropping the write half will shut down the write half of the 855 /// stream. This is equivalent to calling [`shutdown()`] on the `UnixStream`. 856 /// 857 /// [`split`]: Self::split() 858 /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown into_split(self) -> (OwnedReadHalf, OwnedWriteHalf)859 pub fn into_split(self) -> (OwnedReadHalf, OwnedWriteHalf) { 860 split_owned(self) 861 } 862 } 863 864 impl TryFrom<net::UnixStream> for UnixStream { 865 type Error = io::Error; 866 867 /// Consumes stream, returning the tokio I/O object. 868 /// 869 /// This is equivalent to 870 /// [`UnixStream::from_std(stream)`](UnixStream::from_std). try_from(stream: net::UnixStream) -> io::Result<Self>871 fn try_from(stream: net::UnixStream) -> io::Result<Self> { 872 Self::from_std(stream) 873 } 874 } 875 876 impl AsyncRead for UnixStream { poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>877 fn poll_read( 878 self: Pin<&mut Self>, 879 cx: &mut Context<'_>, 880 buf: &mut ReadBuf<'_>, 881 ) -> Poll<io::Result<()>> { 882 self.poll_read_priv(cx, buf) 883 } 884 } 885 886 impl AsyncWrite for UnixStream { poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>887 fn poll_write( 888 self: Pin<&mut Self>, 889 cx: &mut Context<'_>, 890 buf: &[u8], 891 ) -> Poll<io::Result<usize>> { 892 self.poll_write_priv(cx, buf) 893 } 894 poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll<io::Result<usize>>895 fn poll_write_vectored( 896 self: Pin<&mut Self>, 897 cx: &mut Context<'_>, 898 bufs: &[io::IoSlice<'_>], 899 ) -> Poll<io::Result<usize>> { 900 self.poll_write_vectored_priv(cx, bufs) 901 } 902 is_write_vectored(&self) -> bool903 fn is_write_vectored(&self) -> bool { 904 true 905 } 906 poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>907 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { 908 Poll::Ready(Ok(())) 909 } 910 poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>911 fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { 912 self.shutdown_std(std::net::Shutdown::Write)?; 913 Poll::Ready(Ok(())) 914 } 915 } 916 917 impl UnixStream { 918 // == Poll IO functions that takes `&self` == 919 // 920 // To read or write without mutable access to the `UnixStream`, combine the 921 // `poll_read_ready` or `poll_write_ready` methods with the `try_read` or 922 // `try_write` methods. 923 poll_read_priv( &self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>924 pub(crate) fn poll_read_priv( 925 &self, 926 cx: &mut Context<'_>, 927 buf: &mut ReadBuf<'_>, 928 ) -> Poll<io::Result<()>> { 929 // Safety: `UnixStream::read` correctly handles reads into uninitialized memory 930 unsafe { self.io.poll_read(cx, buf) } 931 } 932 poll_write_priv( &self, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>933 pub(crate) fn poll_write_priv( 934 &self, 935 cx: &mut Context<'_>, 936 buf: &[u8], 937 ) -> Poll<io::Result<usize>> { 938 self.io.poll_write(cx, buf) 939 } 940 poll_write_vectored_priv( &self, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll<io::Result<usize>>941 pub(super) fn poll_write_vectored_priv( 942 &self, 943 cx: &mut Context<'_>, 944 bufs: &[io::IoSlice<'_>], 945 ) -> Poll<io::Result<usize>> { 946 self.io.poll_write_vectored(cx, bufs) 947 } 948 } 949 950 impl fmt::Debug for UnixStream { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result951 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 952 self.io.fmt(f) 953 } 954 } 955 956 impl AsRawFd for UnixStream { as_raw_fd(&self) -> RawFd957 fn as_raw_fd(&self) -> RawFd { 958 self.io.as_raw_fd() 959 } 960 } 961