1 use crate::io::{Interest, PollEvented, ReadBuf, Ready}; 2 use crate::net::unix::SocketAddr; 3 4 use std::convert::TryFrom; 5 use std::fmt; 6 use std::io; 7 use std::net::Shutdown; 8 use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; 9 use std::os::unix::net; 10 use std::path::Path; 11 use std::task::{Context, Poll}; 12 13 cfg_io_util! { 14 use bytes::BufMut; 15 } 16 17 cfg_net_unix! { 18 /// An I/O object representing a Unix datagram socket. 19 /// 20 /// A socket can be either named (associated with a filesystem path) or 21 /// unnamed. 22 /// 23 /// This type does not provide a `split` method, because this functionality 24 /// can be achieved by wrapping the socket in an [`Arc`]. Note that you do 25 /// not need a `Mutex` to share the `UnixDatagram` — an `Arc<UnixDatagram>` 26 /// is enough. This is because all of the methods take `&self` instead of 27 /// `&mut self`. 28 /// 29 /// **Note:** named sockets are persisted even after the object is dropped 30 /// and the program has exited, and cannot be reconnected. It is advised 31 /// that you either check for and unlink the existing socket if it exists, 32 /// or use a temporary file that is guaranteed to not already exist. 33 /// 34 /// [`Arc`]: std::sync::Arc 35 /// 36 /// # Examples 37 /// Using named sockets, associated with a filesystem path: 38 /// ``` 39 /// # use std::error::Error; 40 /// # #[tokio::main] 41 /// # async fn main() -> Result<(), Box<dyn Error>> { 42 /// use tokio::net::UnixDatagram; 43 /// use tempfile::tempdir; 44 /// 45 /// // We use a temporary directory so that the socket 46 /// // files left by the bound sockets will get cleaned up. 47 /// let tmp = tempdir()?; 48 /// 49 /// // Bind each socket to a filesystem path 50 /// let tx_path = tmp.path().join("tx"); 51 /// let tx = UnixDatagram::bind(&tx_path)?; 52 /// let rx_path = tmp.path().join("rx"); 53 /// let rx = UnixDatagram::bind(&rx_path)?; 54 /// 55 /// let bytes = b"hello world"; 56 /// tx.send_to(bytes, &rx_path).await?; 57 /// 58 /// let mut buf = vec![0u8; 24]; 59 /// let (size, addr) = rx.recv_from(&mut buf).await?; 60 /// 61 /// let dgram = &buf[..size]; 62 /// assert_eq!(dgram, bytes); 63 /// assert_eq!(addr.as_pathname().unwrap(), &tx_path); 64 /// 65 /// # Ok(()) 66 /// # } 67 /// ``` 68 /// 69 /// Using unnamed sockets, created as a pair 70 /// ``` 71 /// # use std::error::Error; 72 /// # #[tokio::main] 73 /// # async fn main() -> Result<(), Box<dyn Error>> { 74 /// use tokio::net::UnixDatagram; 75 /// 76 /// // Create the pair of sockets 77 /// let (sock1, sock2) = UnixDatagram::pair()?; 78 /// 79 /// // Since the sockets are paired, the paired send/recv 80 /// // functions can be used 81 /// let bytes = b"hello world"; 82 /// sock1.send(bytes).await?; 83 /// 84 /// let mut buff = vec![0u8; 24]; 85 /// let size = sock2.recv(&mut buff).await?; 86 /// 87 /// let dgram = &buff[..size]; 88 /// assert_eq!(dgram, bytes); 89 /// 90 /// # Ok(()) 91 /// # } 92 /// ``` 93 pub struct UnixDatagram { 94 io: PollEvented<mio::net::UnixDatagram>, 95 } 96 } 97 98 impl UnixDatagram { 99 /// Waits for any of the requested ready states. 100 /// 101 /// This function is usually paired with `try_recv()` or `try_send()`. It 102 /// can be used to concurrently recv / send to the same socket on a single 103 /// task without splitting the socket. 104 /// 105 /// The function may complete without the socket being ready. This is a 106 /// false-positive and attempting an operation will return with 107 /// `io::ErrorKind::WouldBlock`. 108 /// 109 /// # Cancel safety 110 /// 111 /// This method is cancel safe. Once a readiness event occurs, the method 112 /// will continue to return immediately until the readiness event is 113 /// consumed by an attempt to read or write that fails with `WouldBlock` or 114 /// `Poll::Pending`. 115 /// 116 /// # Examples 117 /// 118 /// Concurrently receive from and send to the socket on the same task 119 /// without splitting. 120 /// 121 /// ```no_run 122 /// use tokio::io::Interest; 123 /// use tokio::net::UnixDatagram; 124 /// use std::io; 125 /// 126 /// #[tokio::main] 127 /// async fn main() -> io::Result<()> { 128 /// let dir = tempfile::tempdir().unwrap(); 129 /// let client_path = dir.path().join("client.sock"); 130 /// let server_path = dir.path().join("server.sock"); 131 /// let socket = UnixDatagram::bind(&client_path)?; 132 /// socket.connect(&server_path)?; 133 /// 134 /// loop { 135 /// let ready = socket.ready(Interest::READABLE | Interest::WRITABLE).await?; 136 /// 137 /// if ready.is_readable() { 138 /// let mut data = [0; 1024]; 139 /// match socket.try_recv(&mut data[..]) { 140 /// Ok(n) => { 141 /// println!("received {:?}", &data[..n]); 142 /// } 143 /// // False-positive, continue 144 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {} 145 /// Err(e) => { 146 /// return Err(e); 147 /// } 148 /// } 149 /// } 150 /// 151 /// if ready.is_writable() { 152 /// // Write some data 153 /// match socket.try_send(b"hello world") { 154 /// Ok(n) => { 155 /// println!("sent {} bytes", n); 156 /// } 157 /// // False-positive, continue 158 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {} 159 /// Err(e) => { 160 /// return Err(e); 161 /// } 162 /// } 163 /// } 164 /// } 165 /// } 166 /// ``` ready(&self, interest: Interest) -> io::Result<Ready>167 pub async fn ready(&self, interest: Interest) -> io::Result<Ready> { 168 let event = self.io.registration().readiness(interest).await?; 169 Ok(event.ready) 170 } 171 172 /// Waits for the socket to become writable. 173 /// 174 /// This function is equivalent to `ready(Interest::WRITABLE)` and is 175 /// usually paired with `try_send()` or `try_send_to()`. 176 /// 177 /// The function may complete without the socket being writable. This is a 178 /// false-positive and attempting a `try_send()` will return with 179 /// `io::ErrorKind::WouldBlock`. 180 /// 181 /// # Cancel safety 182 /// 183 /// This method is cancel safe. Once a readiness event occurs, the method 184 /// will continue to return immediately until the readiness event is 185 /// consumed by an attempt to write that fails with `WouldBlock` or 186 /// `Poll::Pending`. 187 /// 188 /// # Examples 189 /// 190 /// ```no_run 191 /// use tokio::net::UnixDatagram; 192 /// use std::io; 193 /// 194 /// #[tokio::main] 195 /// async fn main() -> io::Result<()> { 196 /// let dir = tempfile::tempdir().unwrap(); 197 /// let client_path = dir.path().join("client.sock"); 198 /// let server_path = dir.path().join("server.sock"); 199 /// let socket = UnixDatagram::bind(&client_path)?; 200 /// socket.connect(&server_path)?; 201 /// 202 /// loop { 203 /// // Wait for the socket to be writable 204 /// socket.writable().await?; 205 /// 206 /// // Try to send data, this may still fail with `WouldBlock` 207 /// // if the readiness event is a false positive. 208 /// match socket.try_send(b"hello world") { 209 /// Ok(n) => { 210 /// break; 211 /// } 212 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 213 /// continue; 214 /// } 215 /// Err(e) => { 216 /// return Err(e); 217 /// } 218 /// } 219 /// } 220 /// 221 /// Ok(()) 222 /// } 223 /// ``` writable(&self) -> io::Result<()>224 pub async fn writable(&self) -> io::Result<()> { 225 self.ready(Interest::WRITABLE).await?; 226 Ok(()) 227 } 228 229 /// Polls for write/send readiness. 230 /// 231 /// If the socket is not currently ready for sending, this method will 232 /// store a clone of the `Waker` from the provided `Context`. When the socket 233 /// becomes ready for sending, `Waker::wake` will be called on the 234 /// waker. 235 /// 236 /// Note that on multiple calls to `poll_send_ready` or `poll_send`, only 237 /// the `Waker` from the `Context` passed to the most recent call is 238 /// scheduled to receive a wakeup. (However, `poll_recv_ready` retains a 239 /// second, independent waker.) 240 /// 241 /// This function is intended for cases where creating and pinning a future 242 /// via [`writable`] is not feasible. Where possible, using [`writable`] is 243 /// preferred, as this supports polling from multiple tasks at once. 244 /// 245 /// # Return value 246 /// 247 /// The function returns: 248 /// 249 /// * `Poll::Pending` if the socket is not ready for writing. 250 /// * `Poll::Ready(Ok(()))` if the socket is ready for writing. 251 /// * `Poll::Ready(Err(e))` if an error is encountered. 252 /// 253 /// # Errors 254 /// 255 /// This function may encounter any standard I/O error except `WouldBlock`. 256 /// 257 /// [`writable`]: method@Self::writable poll_send_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>258 pub fn poll_send_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { 259 self.io.registration().poll_write_ready(cx).map_ok(|_| ()) 260 } 261 262 /// Waits for the socket to become readable. 263 /// 264 /// This function is equivalent to `ready(Interest::READABLE)` and is usually 265 /// paired with `try_recv()`. 266 /// 267 /// The function may complete without the socket being readable. This is a 268 /// false-positive and attempting a `try_recv()` will return with 269 /// `io::ErrorKind::WouldBlock`. 270 /// 271 /// # Cancel safety 272 /// 273 /// This method is cancel safe. Once a readiness event occurs, the method 274 /// will continue to return immediately until the readiness event is 275 /// consumed by an attempt to read that fails with `WouldBlock` or 276 /// `Poll::Pending`. 277 /// 278 /// # Examples 279 /// 280 /// ```no_run 281 /// use tokio::net::UnixDatagram; 282 /// use std::io; 283 /// 284 /// #[tokio::main] 285 /// async fn main() -> io::Result<()> { 286 /// // Connect to a peer 287 /// let dir = tempfile::tempdir().unwrap(); 288 /// let client_path = dir.path().join("client.sock"); 289 /// let server_path = dir.path().join("server.sock"); 290 /// let socket = UnixDatagram::bind(&client_path)?; 291 /// socket.connect(&server_path)?; 292 /// 293 /// loop { 294 /// // Wait for the socket to be readable 295 /// socket.readable().await?; 296 /// 297 /// // The buffer is **not** included in the async task and will 298 /// // only exist on the stack. 299 /// let mut buf = [0; 1024]; 300 /// 301 /// // Try to recv data, this may still fail with `WouldBlock` 302 /// // if the readiness event is a false positive. 303 /// match socket.try_recv(&mut buf) { 304 /// Ok(n) => { 305 /// println!("GOT {:?}", &buf[..n]); 306 /// break; 307 /// } 308 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 309 /// continue; 310 /// } 311 /// Err(e) => { 312 /// return Err(e); 313 /// } 314 /// } 315 /// } 316 /// 317 /// Ok(()) 318 /// } 319 /// ``` readable(&self) -> io::Result<()>320 pub async fn readable(&self) -> io::Result<()> { 321 self.ready(Interest::READABLE).await?; 322 Ok(()) 323 } 324 325 /// Polls for read/receive readiness. 326 /// 327 /// If the socket is not currently ready for receiving, this method will 328 /// store a clone of the `Waker` from the provided `Context`. When the 329 /// socket becomes ready for reading, `Waker::wake` will be called on the 330 /// waker. 331 /// 332 /// Note that on multiple calls to `poll_recv_ready`, `poll_recv` or 333 /// `poll_peek`, only the `Waker` from the `Context` passed to the most 334 /// recent call is scheduled to receive a wakeup. (However, 335 /// `poll_send_ready` retains a second, independent waker.) 336 /// 337 /// This function is intended for cases where creating and pinning a future 338 /// via [`readable`] is not feasible. Where possible, using [`readable`] is 339 /// preferred, as this supports polling from multiple tasks at once. 340 /// 341 /// # Return value 342 /// 343 /// The function returns: 344 /// 345 /// * `Poll::Pending` if the socket is not ready for reading. 346 /// * `Poll::Ready(Ok(()))` if the socket is ready for reading. 347 /// * `Poll::Ready(Err(e))` if an error is encountered. 348 /// 349 /// # Errors 350 /// 351 /// This function may encounter any standard I/O error except `WouldBlock`. 352 /// 353 /// [`readable`]: method@Self::readable poll_recv_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>354 pub fn poll_recv_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { 355 self.io.registration().poll_read_ready(cx).map_ok(|_| ()) 356 } 357 358 /// Creates a new `UnixDatagram` bound to the specified path. 359 /// 360 /// # Examples 361 /// ``` 362 /// # use std::error::Error; 363 /// # #[tokio::main] 364 /// # async fn main() -> Result<(), Box<dyn Error>> { 365 /// use tokio::net::UnixDatagram; 366 /// use tempfile::tempdir; 367 /// 368 /// // We use a temporary directory so that the socket 369 /// // files left by the bound sockets will get cleaned up. 370 /// let tmp = tempdir()?; 371 /// 372 /// // Bind the socket to a filesystem path 373 /// let socket_path = tmp.path().join("socket"); 374 /// let socket = UnixDatagram::bind(&socket_path)?; 375 /// 376 /// # Ok(()) 377 /// # } 378 /// ``` bind<P>(path: P) -> io::Result<UnixDatagram> where P: AsRef<Path>,379 pub fn bind<P>(path: P) -> io::Result<UnixDatagram> 380 where 381 P: AsRef<Path>, 382 { 383 let socket = mio::net::UnixDatagram::bind(path)?; 384 UnixDatagram::new(socket) 385 } 386 387 /// Creates an unnamed pair of connected sockets. 388 /// 389 /// This function will create a pair of interconnected Unix sockets for 390 /// communicating back and forth between one another. 391 /// 392 /// # Examples 393 /// ``` 394 /// # use std::error::Error; 395 /// # #[tokio::main] 396 /// # async fn main() -> Result<(), Box<dyn Error>> { 397 /// use tokio::net::UnixDatagram; 398 /// 399 /// // Create the pair of sockets 400 /// let (sock1, sock2) = UnixDatagram::pair()?; 401 /// 402 /// // Since the sockets are paired, the paired send/recv 403 /// // functions can be used 404 /// let bytes = b"hail eris"; 405 /// sock1.send(bytes).await?; 406 /// 407 /// let mut buff = vec![0u8; 24]; 408 /// let size = sock2.recv(&mut buff).await?; 409 /// 410 /// let dgram = &buff[..size]; 411 /// assert_eq!(dgram, bytes); 412 /// 413 /// # Ok(()) 414 /// # } 415 /// ``` pair() -> io::Result<(UnixDatagram, UnixDatagram)>416 pub fn pair() -> io::Result<(UnixDatagram, UnixDatagram)> { 417 let (a, b) = mio::net::UnixDatagram::pair()?; 418 let a = UnixDatagram::new(a)?; 419 let b = UnixDatagram::new(b)?; 420 421 Ok((a, b)) 422 } 423 424 /// Creates new `UnixDatagram` from a `std::os::unix::net::UnixDatagram`. 425 /// 426 /// This function is intended to be used to wrap a UnixDatagram from the 427 /// standard library in the Tokio equivalent. The conversion assumes 428 /// nothing about the underlying datagram; it is left up to the user to set 429 /// it in non-blocking mode. 430 /// 431 /// # Panics 432 /// 433 /// This function panics if thread-local runtime is not set. 434 /// 435 /// The runtime is usually set implicitly when this function is called 436 /// from a future driven by a Tokio runtime, otherwise runtime can be set 437 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. 438 /// # Examples 439 /// ``` 440 /// # use std::error::Error; 441 /// # #[tokio::main] 442 /// # async fn main() -> Result<(), Box<dyn Error>> { 443 /// use tokio::net::UnixDatagram; 444 /// use std::os::unix::net::UnixDatagram as StdUDS; 445 /// use tempfile::tempdir; 446 /// 447 /// // We use a temporary directory so that the socket 448 /// // files left by the bound sockets will get cleaned up. 449 /// let tmp = tempdir()?; 450 /// 451 /// // Bind the socket to a filesystem path 452 /// let socket_path = tmp.path().join("socket"); 453 /// let std_socket = StdUDS::bind(&socket_path)?; 454 /// std_socket.set_nonblocking(true)?; 455 /// let tokio_socket = UnixDatagram::from_std(std_socket)?; 456 /// 457 /// # Ok(()) 458 /// # } 459 /// ``` from_std(datagram: net::UnixDatagram) -> io::Result<UnixDatagram>460 pub fn from_std(datagram: net::UnixDatagram) -> io::Result<UnixDatagram> { 461 let socket = mio::net::UnixDatagram::from_std(datagram); 462 let io = PollEvented::new(socket)?; 463 Ok(UnixDatagram { io }) 464 } 465 466 /// Turns a [`tokio::net::UnixDatagram`] into a [`std::os::unix::net::UnixDatagram`]. 467 /// 468 /// The returned [`std::os::unix::net::UnixDatagram`] will have nonblocking 469 /// mode set as `true`. Use [`set_nonblocking`] to change the blocking mode 470 /// if needed. 471 /// 472 /// # Examples 473 /// 474 /// ```rust,no_run 475 /// use std::error::Error; 476 /// 477 /// #[tokio::main] 478 /// async fn main() -> Result<(), Box<dyn Error>> { 479 /// let tokio_socket = tokio::net::UnixDatagram::bind("127.0.0.1:0")?; 480 /// let std_socket = tokio_socket.into_std()?; 481 /// std_socket.set_nonblocking(false)?; 482 /// Ok(()) 483 /// } 484 /// ``` 485 /// 486 /// [`tokio::net::UnixDatagram`]: UnixDatagram 487 /// [`std::os::unix::net::UnixDatagram`]: std::os::unix::net::UnixDatagram 488 /// [`set_nonblocking`]: fn@std::os::unix::net::UnixDatagram::set_nonblocking into_std(self) -> io::Result<std::os::unix::net::UnixDatagram>489 pub fn into_std(self) -> io::Result<std::os::unix::net::UnixDatagram> { 490 self.io 491 .into_inner() 492 .map(|io| io.into_raw_fd()) 493 .map(|raw_fd| unsafe { std::os::unix::net::UnixDatagram::from_raw_fd(raw_fd) }) 494 } 495 new(socket: mio::net::UnixDatagram) -> io::Result<UnixDatagram>496 fn new(socket: mio::net::UnixDatagram) -> io::Result<UnixDatagram> { 497 let io = PollEvented::new(socket)?; 498 Ok(UnixDatagram { io }) 499 } 500 501 /// Creates a new `UnixDatagram` which is not bound to any address. 502 /// 503 /// # Examples 504 /// ``` 505 /// # use std::error::Error; 506 /// # #[tokio::main] 507 /// # async fn main() -> Result<(), Box<dyn Error>> { 508 /// use tokio::net::UnixDatagram; 509 /// use tempfile::tempdir; 510 /// 511 /// // Create an unbound socket 512 /// let tx = UnixDatagram::unbound()?; 513 /// 514 /// // Create another, bound socket 515 /// let tmp = tempdir()?; 516 /// let rx_path = tmp.path().join("rx"); 517 /// let rx = UnixDatagram::bind(&rx_path)?; 518 /// 519 /// // Send to the bound socket 520 /// let bytes = b"hello world"; 521 /// tx.send_to(bytes, &rx_path).await?; 522 /// 523 /// let mut buf = vec![0u8; 24]; 524 /// let (size, addr) = rx.recv_from(&mut buf).await?; 525 /// 526 /// let dgram = &buf[..size]; 527 /// assert_eq!(dgram, bytes); 528 /// 529 /// # Ok(()) 530 /// # } 531 /// ``` unbound() -> io::Result<UnixDatagram>532 pub fn unbound() -> io::Result<UnixDatagram> { 533 let socket = mio::net::UnixDatagram::unbound()?; 534 UnixDatagram::new(socket) 535 } 536 537 /// Connects the socket to the specified address. 538 /// 539 /// The `send` method may be used to send data to the specified address. 540 /// `recv` and `recv_from` will only receive data from that address. 541 /// 542 /// # Examples 543 /// ``` 544 /// # use std::error::Error; 545 /// # #[tokio::main] 546 /// # async fn main() -> Result<(), Box<dyn Error>> { 547 /// use tokio::net::UnixDatagram; 548 /// use tempfile::tempdir; 549 /// 550 /// // Create an unbound socket 551 /// let tx = UnixDatagram::unbound()?; 552 /// 553 /// // Create another, bound socket 554 /// let tmp = tempdir()?; 555 /// let rx_path = tmp.path().join("rx"); 556 /// let rx = UnixDatagram::bind(&rx_path)?; 557 /// 558 /// // Connect to the bound socket 559 /// tx.connect(&rx_path)?; 560 /// 561 /// // Send to the bound socket 562 /// let bytes = b"hello world"; 563 /// tx.send(bytes).await?; 564 /// 565 /// let mut buf = vec![0u8; 24]; 566 /// let (size, addr) = rx.recv_from(&mut buf).await?; 567 /// 568 /// let dgram = &buf[..size]; 569 /// assert_eq!(dgram, bytes); 570 /// 571 /// # Ok(()) 572 /// # } 573 /// ``` connect<P: AsRef<Path>>(&self, path: P) -> io::Result<()>574 pub fn connect<P: AsRef<Path>>(&self, path: P) -> io::Result<()> { 575 self.io.connect(path) 576 } 577 578 /// Sends data on the socket to the socket's peer. 579 /// 580 /// # Cancel safety 581 /// 582 /// This method is cancel safe. If `send` is used as the event in a 583 /// [`tokio::select!`](crate::select) statement and some other branch 584 /// completes first, then it is guaranteed that the message was not sent. 585 /// 586 /// # Examples 587 /// ``` 588 /// # use std::error::Error; 589 /// # #[tokio::main] 590 /// # async fn main() -> Result<(), Box<dyn Error>> { 591 /// use tokio::net::UnixDatagram; 592 /// 593 /// // Create the pair of sockets 594 /// let (sock1, sock2) = UnixDatagram::pair()?; 595 /// 596 /// // Since the sockets are paired, the paired send/recv 597 /// // functions can be used 598 /// let bytes = b"hello world"; 599 /// sock1.send(bytes).await?; 600 /// 601 /// let mut buff = vec![0u8; 24]; 602 /// let size = sock2.recv(&mut buff).await?; 603 /// 604 /// let dgram = &buff[..size]; 605 /// assert_eq!(dgram, bytes); 606 /// 607 /// # Ok(()) 608 /// # } 609 /// ``` send(&self, buf: &[u8]) -> io::Result<usize>610 pub async fn send(&self, buf: &[u8]) -> io::Result<usize> { 611 self.io 612 .registration() 613 .async_io(Interest::WRITABLE, || self.io.send(buf)) 614 .await 615 } 616 617 /// Tries to send a datagram to the peer without waiting. 618 /// 619 /// # Examples 620 /// 621 /// ```no_run 622 /// use tokio::net::UnixDatagram; 623 /// use std::io; 624 /// 625 /// #[tokio::main] 626 /// async fn main() -> io::Result<()> { 627 /// let dir = tempfile::tempdir().unwrap(); 628 /// let client_path = dir.path().join("client.sock"); 629 /// let server_path = dir.path().join("server.sock"); 630 /// let socket = UnixDatagram::bind(&client_path)?; 631 /// socket.connect(&server_path)?; 632 /// 633 /// loop { 634 /// // Wait for the socket to be writable 635 /// socket.writable().await?; 636 /// 637 /// // Try to send data, this may still fail with `WouldBlock` 638 /// // if the readiness event is a false positive. 639 /// match socket.try_send(b"hello world") { 640 /// Ok(n) => { 641 /// break; 642 /// } 643 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 644 /// continue; 645 /// } 646 /// Err(e) => { 647 /// return Err(e); 648 /// } 649 /// } 650 /// } 651 /// 652 /// Ok(()) 653 /// } 654 /// ``` try_send(&self, buf: &[u8]) -> io::Result<usize>655 pub fn try_send(&self, buf: &[u8]) -> io::Result<usize> { 656 self.io 657 .registration() 658 .try_io(Interest::WRITABLE, || self.io.send(buf)) 659 } 660 661 /// Tries to send a datagram to the peer without waiting. 662 /// 663 /// # Examples 664 /// 665 /// ```no_run 666 /// use tokio::net::UnixDatagram; 667 /// use std::io; 668 /// 669 /// #[tokio::main] 670 /// async fn main() -> io::Result<()> { 671 /// let dir = tempfile::tempdir().unwrap(); 672 /// let client_path = dir.path().join("client.sock"); 673 /// let server_path = dir.path().join("server.sock"); 674 /// let socket = UnixDatagram::bind(&client_path)?; 675 /// 676 /// loop { 677 /// // Wait for the socket to be writable 678 /// socket.writable().await?; 679 /// 680 /// // Try to send data, this may still fail with `WouldBlock` 681 /// // if the readiness event is a false positive. 682 /// match socket.try_send_to(b"hello world", &server_path) { 683 /// Ok(n) => { 684 /// break; 685 /// } 686 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 687 /// continue; 688 /// } 689 /// Err(e) => { 690 /// return Err(e); 691 /// } 692 /// } 693 /// } 694 /// 695 /// Ok(()) 696 /// } 697 /// ``` try_send_to<P>(&self, buf: &[u8], target: P) -> io::Result<usize> where P: AsRef<Path>,698 pub fn try_send_to<P>(&self, buf: &[u8], target: P) -> io::Result<usize> 699 where 700 P: AsRef<Path>, 701 { 702 self.io 703 .registration() 704 .try_io(Interest::WRITABLE, || self.io.send_to(buf, target)) 705 } 706 707 /// Receives data from the socket. 708 /// 709 /// # Cancel safety 710 /// 711 /// This method is cancel safe. If `recv` is used as the event in a 712 /// [`tokio::select!`](crate::select) statement and some other branch 713 /// completes first, it is guaranteed that no messages were received on this 714 /// socket. 715 /// 716 /// # Examples 717 /// ``` 718 /// # use std::error::Error; 719 /// # #[tokio::main] 720 /// # async fn main() -> Result<(), Box<dyn Error>> { 721 /// use tokio::net::UnixDatagram; 722 /// 723 /// // Create the pair of sockets 724 /// let (sock1, sock2) = UnixDatagram::pair()?; 725 /// 726 /// // Since the sockets are paired, the paired send/recv 727 /// // functions can be used 728 /// let bytes = b"hello world"; 729 /// sock1.send(bytes).await?; 730 /// 731 /// let mut buff = vec![0u8; 24]; 732 /// let size = sock2.recv(&mut buff).await?; 733 /// 734 /// let dgram = &buff[..size]; 735 /// assert_eq!(dgram, bytes); 736 /// 737 /// # Ok(()) 738 /// # } 739 /// ``` recv(&self, buf: &mut [u8]) -> io::Result<usize>740 pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> { 741 self.io 742 .registration() 743 .async_io(Interest::READABLE, || self.io.recv(buf)) 744 .await 745 } 746 747 /// Tries to receive a datagram from the peer without waiting. 748 /// 749 /// # Examples 750 /// 751 /// ```no_run 752 /// use tokio::net::UnixDatagram; 753 /// use std::io; 754 /// 755 /// #[tokio::main] 756 /// async fn main() -> io::Result<()> { 757 /// // Connect to a peer 758 /// let dir = tempfile::tempdir().unwrap(); 759 /// let client_path = dir.path().join("client.sock"); 760 /// let server_path = dir.path().join("server.sock"); 761 /// let socket = UnixDatagram::bind(&client_path)?; 762 /// socket.connect(&server_path)?; 763 /// 764 /// loop { 765 /// // Wait for the socket to be readable 766 /// socket.readable().await?; 767 /// 768 /// // The buffer is **not** included in the async task and will 769 /// // only exist on the stack. 770 /// let mut buf = [0; 1024]; 771 /// 772 /// // Try to recv data, this may still fail with `WouldBlock` 773 /// // if the readiness event is a false positive. 774 /// match socket.try_recv(&mut buf) { 775 /// Ok(n) => { 776 /// println!("GOT {:?}", &buf[..n]); 777 /// break; 778 /// } 779 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 780 /// continue; 781 /// } 782 /// Err(e) => { 783 /// return Err(e); 784 /// } 785 /// } 786 /// } 787 /// 788 /// Ok(()) 789 /// } 790 /// ``` try_recv(&self, buf: &mut [u8]) -> io::Result<usize>791 pub fn try_recv(&self, buf: &mut [u8]) -> io::Result<usize> { 792 self.io 793 .registration() 794 .try_io(Interest::READABLE, || self.io.recv(buf)) 795 } 796 797 cfg_io_util! { 798 /// Tries to receive data from the socket without waiting. 799 /// 800 /// # Examples 801 /// 802 /// ```no_run 803 /// use tokio::net::UnixDatagram; 804 /// use std::io; 805 /// 806 /// #[tokio::main] 807 /// async fn main() -> io::Result<()> { 808 /// // Connect to a peer 809 /// let dir = tempfile::tempdir().unwrap(); 810 /// let client_path = dir.path().join("client.sock"); 811 /// let server_path = dir.path().join("server.sock"); 812 /// let socket = UnixDatagram::bind(&client_path)?; 813 /// 814 /// loop { 815 /// // Wait for the socket to be readable 816 /// socket.readable().await?; 817 /// 818 /// let mut buf = Vec::with_capacity(1024); 819 /// 820 /// // Try to recv data, this may still fail with `WouldBlock` 821 /// // if the readiness event is a false positive. 822 /// match socket.try_recv_buf_from(&mut buf) { 823 /// Ok((n, _addr)) => { 824 /// println!("GOT {:?}", &buf[..n]); 825 /// break; 826 /// } 827 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 828 /// continue; 829 /// } 830 /// Err(e) => { 831 /// return Err(e); 832 /// } 833 /// } 834 /// } 835 /// 836 /// Ok(()) 837 /// } 838 /// ``` 839 pub fn try_recv_buf_from<B: BufMut>(&self, buf: &mut B) -> io::Result<(usize, SocketAddr)> { 840 let (n, addr) = self.io.registration().try_io(Interest::READABLE, || { 841 let dst = buf.chunk_mut(); 842 let dst = 843 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) }; 844 845 // Safety: We trust `UnixDatagram::recv_from` to have filled up `n` bytes in the 846 // buffer. 847 let (n, addr) = (&*self.io).recv_from(dst)?; 848 849 unsafe { 850 buf.advance_mut(n); 851 } 852 853 Ok((n, addr)) 854 })?; 855 856 Ok((n, SocketAddr(addr))) 857 } 858 859 /// Tries to read data from the stream into the provided buffer, advancing the 860 /// buffer's internal cursor, returning how many bytes were read. 861 /// 862 /// # Examples 863 /// 864 /// ```no_run 865 /// use tokio::net::UnixDatagram; 866 /// use std::io; 867 /// 868 /// #[tokio::main] 869 /// async fn main() -> io::Result<()> { 870 /// // Connect to a peer 871 /// let dir = tempfile::tempdir().unwrap(); 872 /// let client_path = dir.path().join("client.sock"); 873 /// let server_path = dir.path().join("server.sock"); 874 /// let socket = UnixDatagram::bind(&client_path)?; 875 /// socket.connect(&server_path)?; 876 /// 877 /// loop { 878 /// // Wait for the socket to be readable 879 /// socket.readable().await?; 880 /// 881 /// let mut buf = Vec::with_capacity(1024); 882 /// 883 /// // Try to recv data, this may still fail with `WouldBlock` 884 /// // if the readiness event is a false positive. 885 /// match socket.try_recv_buf(&mut buf) { 886 /// Ok(n) => { 887 /// println!("GOT {:?}", &buf[..n]); 888 /// break; 889 /// } 890 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 891 /// continue; 892 /// } 893 /// Err(e) => { 894 /// return Err(e); 895 /// } 896 /// } 897 /// } 898 /// 899 /// Ok(()) 900 /// } 901 /// ``` 902 pub fn try_recv_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> { 903 self.io.registration().try_io(Interest::READABLE, || { 904 let dst = buf.chunk_mut(); 905 let dst = 906 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) }; 907 908 // Safety: We trust `UnixDatagram::recv` to have filled up `n` bytes in the 909 // buffer. 910 let n = (&*self.io).recv(dst)?; 911 912 unsafe { 913 buf.advance_mut(n); 914 } 915 916 Ok(n) 917 }) 918 } 919 } 920 921 /// Sends data on the socket to the specified address. 922 /// 923 /// # Cancel safety 924 /// 925 /// This method is cancel safe. If `send_to` is used as the event in a 926 /// [`tokio::select!`](crate::select) statement and some other branch 927 /// completes first, then it is guaranteed that the message was not sent. 928 /// 929 /// # Examples 930 /// ``` 931 /// # use std::error::Error; 932 /// # #[tokio::main] 933 /// # async fn main() -> Result<(), Box<dyn Error>> { 934 /// use tokio::net::UnixDatagram; 935 /// use tempfile::tempdir; 936 /// 937 /// // We use a temporary directory so that the socket 938 /// // files left by the bound sockets will get cleaned up. 939 /// let tmp = tempdir()?; 940 /// 941 /// // Bind each socket to a filesystem path 942 /// let tx_path = tmp.path().join("tx"); 943 /// let tx = UnixDatagram::bind(&tx_path)?; 944 /// let rx_path = tmp.path().join("rx"); 945 /// let rx = UnixDatagram::bind(&rx_path)?; 946 /// 947 /// let bytes = b"hello world"; 948 /// tx.send_to(bytes, &rx_path).await?; 949 /// 950 /// let mut buf = vec![0u8; 24]; 951 /// let (size, addr) = rx.recv_from(&mut buf).await?; 952 /// 953 /// let dgram = &buf[..size]; 954 /// assert_eq!(dgram, bytes); 955 /// assert_eq!(addr.as_pathname().unwrap(), &tx_path); 956 /// 957 /// # Ok(()) 958 /// # } 959 /// ``` send_to<P>(&self, buf: &[u8], target: P) -> io::Result<usize> where P: AsRef<Path>,960 pub async fn send_to<P>(&self, buf: &[u8], target: P) -> io::Result<usize> 961 where 962 P: AsRef<Path>, 963 { 964 self.io 965 .registration() 966 .async_io(Interest::WRITABLE, || self.io.send_to(buf, target.as_ref())) 967 .await 968 } 969 970 /// Receives data from the socket. 971 /// 972 /// # Cancel safety 973 /// 974 /// This method is cancel safe. If `recv_from` is used as the event in a 975 /// [`tokio::select!`](crate::select) statement and some other branch 976 /// completes first, it is guaranteed that no messages were received on this 977 /// socket. 978 /// 979 /// # Examples 980 /// ``` 981 /// # use std::error::Error; 982 /// # #[tokio::main] 983 /// # async fn main() -> Result<(), Box<dyn Error>> { 984 /// use tokio::net::UnixDatagram; 985 /// use tempfile::tempdir; 986 /// 987 /// // We use a temporary directory so that the socket 988 /// // files left by the bound sockets will get cleaned up. 989 /// let tmp = tempdir()?; 990 /// 991 /// // Bind each socket to a filesystem path 992 /// let tx_path = tmp.path().join("tx"); 993 /// let tx = UnixDatagram::bind(&tx_path)?; 994 /// let rx_path = tmp.path().join("rx"); 995 /// let rx = UnixDatagram::bind(&rx_path)?; 996 /// 997 /// let bytes = b"hello world"; 998 /// tx.send_to(bytes, &rx_path).await?; 999 /// 1000 /// let mut buf = vec![0u8; 24]; 1001 /// let (size, addr) = rx.recv_from(&mut buf).await?; 1002 /// 1003 /// let dgram = &buf[..size]; 1004 /// assert_eq!(dgram, bytes); 1005 /// assert_eq!(addr.as_pathname().unwrap(), &tx_path); 1006 /// 1007 /// # Ok(()) 1008 /// # } 1009 /// ``` recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)>1010 pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { 1011 let (n, addr) = self 1012 .io 1013 .registration() 1014 .async_io(Interest::READABLE, || self.io.recv_from(buf)) 1015 .await?; 1016 1017 Ok((n, SocketAddr(addr))) 1018 } 1019 1020 /// Attempts to receive a single datagram on the specified address. 1021 /// 1022 /// Note that on multiple calls to a `poll_*` method in the recv direction, only the 1023 /// `Waker` from the `Context` passed to the most recent call will be scheduled to 1024 /// receive a wakeup. 1025 /// 1026 /// # Return value 1027 /// 1028 /// The function returns: 1029 /// 1030 /// * `Poll::Pending` if the socket is not ready to read 1031 /// * `Poll::Ready(Ok(addr))` reads data from `addr` into `ReadBuf` if the socket is ready 1032 /// * `Poll::Ready(Err(e))` if an error is encountered. 1033 /// 1034 /// # Errors 1035 /// 1036 /// This function may encounter any standard I/O error except `WouldBlock`. poll_recv_from( &self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<SocketAddr>>1037 pub fn poll_recv_from( 1038 &self, 1039 cx: &mut Context<'_>, 1040 buf: &mut ReadBuf<'_>, 1041 ) -> Poll<io::Result<SocketAddr>> { 1042 let (n, addr) = ready!(self.io.registration().poll_read_io(cx, || { 1043 // Safety: will not read the maybe uninitialized bytes. 1044 let b = unsafe { 1045 &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) 1046 }; 1047 1048 self.io.recv_from(b) 1049 }))?; 1050 1051 // Safety: We trust `recv` to have filled up `n` bytes in the buffer. 1052 unsafe { 1053 buf.assume_init(n); 1054 } 1055 buf.advance(n); 1056 Poll::Ready(Ok(SocketAddr(addr))) 1057 } 1058 1059 /// Attempts to send data to the specified address. 1060 /// 1061 /// Note that on multiple calls to a `poll_*` method in the send direction, only the 1062 /// `Waker` from the `Context` passed to the most recent call will be scheduled to 1063 /// receive a wakeup. 1064 /// 1065 /// # Return value 1066 /// 1067 /// The function returns: 1068 /// 1069 /// * `Poll::Pending` if the socket is not ready to write 1070 /// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent. 1071 /// * `Poll::Ready(Err(e))` if an error is encountered. 1072 /// 1073 /// # Errors 1074 /// 1075 /// This function may encounter any standard I/O error except `WouldBlock`. poll_send_to<P>( &self, cx: &mut Context<'_>, buf: &[u8], target: P, ) -> Poll<io::Result<usize>> where P: AsRef<Path>,1076 pub fn poll_send_to<P>( 1077 &self, 1078 cx: &mut Context<'_>, 1079 buf: &[u8], 1080 target: P, 1081 ) -> Poll<io::Result<usize>> 1082 where 1083 P: AsRef<Path>, 1084 { 1085 self.io 1086 .registration() 1087 .poll_write_io(cx, || self.io.send_to(buf, target.as_ref())) 1088 } 1089 1090 /// Attempts to send data on the socket to the remote address to which it 1091 /// was previously `connect`ed. 1092 /// 1093 /// The [`connect`] method will connect this socket to a remote address. 1094 /// This method will fail if the socket is not connected. 1095 /// 1096 /// Note that on multiple calls to a `poll_*` method in the send direction, 1097 /// only the `Waker` from the `Context` passed to the most recent call will 1098 /// be scheduled to receive a wakeup. 1099 /// 1100 /// # Return value 1101 /// 1102 /// The function returns: 1103 /// 1104 /// * `Poll::Pending` if the socket is not available to write 1105 /// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent 1106 /// * `Poll::Ready(Err(e))` if an error is encountered. 1107 /// 1108 /// # Errors 1109 /// 1110 /// This function may encounter any standard I/O error except `WouldBlock`. 1111 /// 1112 /// [`connect`]: method@Self::connect poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>>1113 pub fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> { 1114 self.io 1115 .registration() 1116 .poll_write_io(cx, || self.io.send(buf)) 1117 } 1118 1119 /// Attempts to receive a single datagram message on the socket from the remote 1120 /// address to which it is `connect`ed. 1121 /// 1122 /// The [`connect`] method will connect this socket to a remote address. This method 1123 /// resolves to an error if the socket is not connected. 1124 /// 1125 /// Note that on multiple calls to a `poll_*` method in the recv direction, only the 1126 /// `Waker` from the `Context` passed to the most recent call will be scheduled to 1127 /// receive a wakeup. 1128 /// 1129 /// # Return value 1130 /// 1131 /// The function returns: 1132 /// 1133 /// * `Poll::Pending` if the socket is not ready to read 1134 /// * `Poll::Ready(Ok(()))` reads data `ReadBuf` if the socket is ready 1135 /// * `Poll::Ready(Err(e))` if an error is encountered. 1136 /// 1137 /// # Errors 1138 /// 1139 /// This function may encounter any standard I/O error except `WouldBlock`. 1140 /// 1141 /// [`connect`]: method@Self::connect poll_recv(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>>1142 pub fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> { 1143 let n = ready!(self.io.registration().poll_read_io(cx, || { 1144 // Safety: will not read the maybe uninitialized bytes. 1145 let b = unsafe { 1146 &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) 1147 }; 1148 1149 self.io.recv(b) 1150 }))?; 1151 1152 // Safety: We trust `recv` to have filled up `n` bytes in the buffer. 1153 unsafe { 1154 buf.assume_init(n); 1155 } 1156 buf.advance(n); 1157 Poll::Ready(Ok(())) 1158 } 1159 1160 /// Tries to receive data from the socket without waiting. 1161 /// 1162 /// # Examples 1163 /// 1164 /// ```no_run 1165 /// use tokio::net::UnixDatagram; 1166 /// use std::io; 1167 /// 1168 /// #[tokio::main] 1169 /// async fn main() -> io::Result<()> { 1170 /// // Connect to a peer 1171 /// let dir = tempfile::tempdir().unwrap(); 1172 /// let client_path = dir.path().join("client.sock"); 1173 /// let server_path = dir.path().join("server.sock"); 1174 /// let socket = UnixDatagram::bind(&client_path)?; 1175 /// 1176 /// loop { 1177 /// // Wait for the socket to be readable 1178 /// socket.readable().await?; 1179 /// 1180 /// // The buffer is **not** included in the async task and will 1181 /// // only exist on the stack. 1182 /// let mut buf = [0; 1024]; 1183 /// 1184 /// // Try to recv data, this may still fail with `WouldBlock` 1185 /// // if the readiness event is a false positive. 1186 /// match socket.try_recv_from(&mut buf) { 1187 /// Ok((n, _addr)) => { 1188 /// println!("GOT {:?}", &buf[..n]); 1189 /// break; 1190 /// } 1191 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 1192 /// continue; 1193 /// } 1194 /// Err(e) => { 1195 /// return Err(e); 1196 /// } 1197 /// } 1198 /// } 1199 /// 1200 /// Ok(()) 1201 /// } 1202 /// ``` try_recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)>1203 pub fn try_recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { 1204 let (n, addr) = self 1205 .io 1206 .registration() 1207 .try_io(Interest::READABLE, || self.io.recv_from(buf))?; 1208 1209 Ok((n, SocketAddr(addr))) 1210 } 1211 1212 /// Tries to read or write from the socket using a user-provided IO operation. 1213 /// 1214 /// If the socket is ready, the provided closure is called. The closure 1215 /// should attempt to perform IO operation from the socket by manually 1216 /// calling the appropriate syscall. If the operation fails because the 1217 /// socket is not actually ready, then the closure should return a 1218 /// `WouldBlock` error and the readiness flag is cleared. The return value 1219 /// of the closure is then returned by `try_io`. 1220 /// 1221 /// If the socket is not ready, then the closure is not called 1222 /// and a `WouldBlock` error is returned. 1223 /// 1224 /// The closure should only return a `WouldBlock` error if it has performed 1225 /// an IO operation on the socket that failed due to the socket not being 1226 /// ready. Returning a `WouldBlock` error in any other situation will 1227 /// incorrectly clear the readiness flag, which can cause the socket to 1228 /// behave incorrectly. 1229 /// 1230 /// The closure should not perform the IO operation using any of the methods 1231 /// defined on the Tokio `UnixDatagram` type, as this will mess with the 1232 /// readiness flag and can cause the socket to behave incorrectly. 1233 /// 1234 /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function. 1235 /// 1236 /// [`readable()`]: UnixDatagram::readable() 1237 /// [`writable()`]: UnixDatagram::writable() 1238 /// [`ready()`]: UnixDatagram::ready() try_io<R>( &self, interest: Interest, f: impl FnOnce() -> io::Result<R>, ) -> io::Result<R>1239 pub fn try_io<R>( 1240 &self, 1241 interest: Interest, 1242 f: impl FnOnce() -> io::Result<R>, 1243 ) -> io::Result<R> { 1244 self.io.registration().try_io(interest, f) 1245 } 1246 1247 /// Returns the local address that this socket is bound to. 1248 /// 1249 /// # Examples 1250 /// For a socket bound to a local path 1251 /// ``` 1252 /// # use std::error::Error; 1253 /// # #[tokio::main] 1254 /// # async fn main() -> Result<(), Box<dyn Error>> { 1255 /// use tokio::net::UnixDatagram; 1256 /// use tempfile::tempdir; 1257 /// 1258 /// // We use a temporary directory so that the socket 1259 /// // files left by the bound sockets will get cleaned up. 1260 /// let tmp = tempdir()?; 1261 /// 1262 /// // Bind socket to a filesystem path 1263 /// let socket_path = tmp.path().join("socket"); 1264 /// let socket = UnixDatagram::bind(&socket_path)?; 1265 /// 1266 /// assert_eq!(socket.local_addr()?.as_pathname().unwrap(), &socket_path); 1267 /// 1268 /// # Ok(()) 1269 /// # } 1270 /// ``` 1271 /// 1272 /// For an unbound socket 1273 /// ``` 1274 /// # use std::error::Error; 1275 /// # #[tokio::main] 1276 /// # async fn main() -> Result<(), Box<dyn Error>> { 1277 /// use tokio::net::UnixDatagram; 1278 /// 1279 /// // Create an unbound socket 1280 /// let socket = UnixDatagram::unbound()?; 1281 /// 1282 /// assert!(socket.local_addr()?.is_unnamed()); 1283 /// 1284 /// # Ok(()) 1285 /// # } 1286 /// ``` local_addr(&self) -> io::Result<SocketAddr>1287 pub fn local_addr(&self) -> io::Result<SocketAddr> { 1288 self.io.local_addr().map(SocketAddr) 1289 } 1290 1291 /// Returns the address of this socket's peer. 1292 /// 1293 /// The `connect` method will connect the socket to a peer. 1294 /// 1295 /// # Examples 1296 /// For a peer with a local path 1297 /// ``` 1298 /// # use std::error::Error; 1299 /// # #[tokio::main] 1300 /// # async fn main() -> Result<(), Box<dyn Error>> { 1301 /// use tokio::net::UnixDatagram; 1302 /// use tempfile::tempdir; 1303 /// 1304 /// // Create an unbound socket 1305 /// let tx = UnixDatagram::unbound()?; 1306 /// 1307 /// // Create another, bound socket 1308 /// let tmp = tempdir()?; 1309 /// let rx_path = tmp.path().join("rx"); 1310 /// let rx = UnixDatagram::bind(&rx_path)?; 1311 /// 1312 /// // Connect to the bound socket 1313 /// tx.connect(&rx_path)?; 1314 /// 1315 /// assert_eq!(tx.peer_addr()?.as_pathname().unwrap(), &rx_path); 1316 /// 1317 /// # Ok(()) 1318 /// # } 1319 /// ``` 1320 /// 1321 /// For an unbound peer 1322 /// ``` 1323 /// # use std::error::Error; 1324 /// # #[tokio::main] 1325 /// # async fn main() -> Result<(), Box<dyn Error>> { 1326 /// use tokio::net::UnixDatagram; 1327 /// 1328 /// // Create the pair of sockets 1329 /// let (sock1, sock2) = UnixDatagram::pair()?; 1330 /// 1331 /// assert!(sock1.peer_addr()?.is_unnamed()); 1332 /// 1333 /// # Ok(()) 1334 /// # } 1335 /// ``` peer_addr(&self) -> io::Result<SocketAddr>1336 pub fn peer_addr(&self) -> io::Result<SocketAddr> { 1337 self.io.peer_addr().map(SocketAddr) 1338 } 1339 1340 /// Returns the value of the `SO_ERROR` option. 1341 /// 1342 /// # Examples 1343 /// ``` 1344 /// # use std::error::Error; 1345 /// # #[tokio::main] 1346 /// # async fn main() -> Result<(), Box<dyn Error>> { 1347 /// use tokio::net::UnixDatagram; 1348 /// 1349 /// // Create an unbound socket 1350 /// let socket = UnixDatagram::unbound()?; 1351 /// 1352 /// if let Ok(Some(err)) = socket.take_error() { 1353 /// println!("Got error: {:?}", err); 1354 /// } 1355 /// 1356 /// # Ok(()) 1357 /// # } 1358 /// ``` take_error(&self) -> io::Result<Option<io::Error>>1359 pub fn take_error(&self) -> io::Result<Option<io::Error>> { 1360 self.io.take_error() 1361 } 1362 1363 /// Shuts down the read, write, or both halves of this connection. 1364 /// 1365 /// This function will cause all pending and future I/O calls on the 1366 /// specified portions to immediately return with an appropriate value 1367 /// (see the documentation of `Shutdown`). 1368 /// 1369 /// # Examples 1370 /// ``` 1371 /// # use std::error::Error; 1372 /// # #[tokio::main] 1373 /// # async fn main() -> Result<(), Box<dyn Error>> { 1374 /// use tokio::net::UnixDatagram; 1375 /// use std::net::Shutdown; 1376 /// 1377 /// // Create an unbound socket 1378 /// let (socket, other) = UnixDatagram::pair()?; 1379 /// 1380 /// socket.shutdown(Shutdown::Both)?; 1381 /// 1382 /// // NOTE: the following commented out code does NOT work as expected. 1383 /// // Due to an underlying issue, the recv call will block indefinitely. 1384 /// // See: https://github.com/tokio-rs/tokio/issues/1679 1385 /// //let mut buff = vec![0u8; 24]; 1386 /// //let size = socket.recv(&mut buff).await?; 1387 /// //assert_eq!(size, 0); 1388 /// 1389 /// let send_result = socket.send(b"hello world").await; 1390 /// assert!(send_result.is_err()); 1391 /// 1392 /// # Ok(()) 1393 /// # } 1394 /// ``` shutdown(&self, how: Shutdown) -> io::Result<()>1395 pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { 1396 self.io.shutdown(how) 1397 } 1398 } 1399 1400 impl TryFrom<std::os::unix::net::UnixDatagram> for UnixDatagram { 1401 type Error = io::Error; 1402 1403 /// Consumes stream, returning the Tokio I/O object. 1404 /// 1405 /// This is equivalent to 1406 /// [`UnixDatagram::from_std(stream)`](UnixDatagram::from_std). try_from(stream: std::os::unix::net::UnixDatagram) -> Result<Self, Self::Error>1407 fn try_from(stream: std::os::unix::net::UnixDatagram) -> Result<Self, Self::Error> { 1408 Self::from_std(stream) 1409 } 1410 } 1411 1412 impl fmt::Debug for UnixDatagram { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1413 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 1414 self.io.fmt(f) 1415 } 1416 } 1417 1418 impl AsRawFd for UnixDatagram { as_raw_fd(&self) -> RawFd1419 fn as_raw_fd(&self) -> RawFd { 1420 self.io.as_raw_fd() 1421 } 1422 } 1423