1 use crate::io::{Interest, PollEvented, ReadBuf, Ready}; 2 use crate::net::{to_socket_addrs, ToSocketAddrs}; 3 4 use std::convert::TryFrom; 5 use std::fmt; 6 use std::io; 7 use std::net::{self, Ipv4Addr, Ipv6Addr, SocketAddr}; 8 use std::task::{Context, Poll}; 9 10 cfg_io_util! { 11 use bytes::BufMut; 12 } 13 14 cfg_net! { 15 /// A UDP socket. 16 /// 17 /// UDP is "connectionless", unlike TCP. Meaning, regardless of what address you've bound to, a `UdpSocket` 18 /// is free to communicate with many different remotes. In tokio there are basically two main ways to use `UdpSocket`: 19 /// 20 /// * one to many: [`bind`](`UdpSocket::bind`) and use [`send_to`](`UdpSocket::send_to`) 21 /// and [`recv_from`](`UdpSocket::recv_from`) to communicate with many different addresses 22 /// * one to one: [`connect`](`UdpSocket::connect`) and associate with a single address, using [`send`](`UdpSocket::send`) 23 /// and [`recv`](`UdpSocket::recv`) to communicate only with that remote address 24 /// 25 /// This type does not provide a `split` method, because this functionality 26 /// can be achieved by instead wrapping the socket in an [`Arc`]. Note that 27 /// you do not need a `Mutex` to share the `UdpSocket` — an `Arc<UdpSocket>` 28 /// is enough. This is because all of the methods take `&self` instead of 29 /// `&mut self`. Once you have wrapped it in an `Arc`, you can call 30 /// `.clone()` on the `Arc<UdpSocket>` to get multiple shared handles to the 31 /// same socket. An example of such usage can be found further down. 32 /// 33 /// [`Arc`]: std::sync::Arc 34 /// 35 /// # Streams 36 /// 37 /// If you need to listen over UDP and produce a [`Stream`], you can look 38 /// at [`UdpFramed`]. 39 /// 40 /// [`UdpFramed`]: https://docs.rs/tokio-util/latest/tokio_util/udp/struct.UdpFramed.html 41 /// [`Stream`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html 42 /// 43 /// # Example: one to many (bind) 44 /// 45 /// Using `bind` we can create a simple echo server that sends and recv's with many different clients: 46 /// ```no_run 47 /// use tokio::net::UdpSocket; 48 /// use std::io; 49 /// 50 /// #[tokio::main] 51 /// async fn main() -> io::Result<()> { 52 /// let sock = UdpSocket::bind("0.0.0.0:8080").await?; 53 /// let mut buf = [0; 1024]; 54 /// loop { 55 /// let (len, addr) = sock.recv_from(&mut buf).await?; 56 /// println!("{:?} bytes received from {:?}", len, addr); 57 /// 58 /// let len = sock.send_to(&buf[..len], addr).await?; 59 /// println!("{:?} bytes sent", len); 60 /// } 61 /// } 62 /// ``` 63 /// 64 /// # Example: one to one (connect) 65 /// 66 /// Or using `connect` we can echo with a single remote address using `send` and `recv`: 67 /// ```no_run 68 /// use tokio::net::UdpSocket; 69 /// use std::io; 70 /// 71 /// #[tokio::main] 72 /// async fn main() -> io::Result<()> { 73 /// let sock = UdpSocket::bind("0.0.0.0:8080").await?; 74 /// 75 /// let remote_addr = "127.0.0.1:59611"; 76 /// sock.connect(remote_addr).await?; 77 /// let mut buf = [0; 1024]; 78 /// loop { 79 /// let len = sock.recv(&mut buf).await?; 80 /// println!("{:?} bytes received from {:?}", len, remote_addr); 81 /// 82 /// let len = sock.send(&buf[..len]).await?; 83 /// println!("{:?} bytes sent", len); 84 /// } 85 /// } 86 /// ``` 87 /// 88 /// # Example: Splitting with `Arc` 89 /// 90 /// Because `send_to` and `recv_from` take `&self`. It's perfectly alright 91 /// to use an `Arc<UdpSocket>` and share the references to multiple tasks. 92 /// Here is a similar "echo" example that supports concurrent 93 /// sending/receiving: 94 /// 95 /// ```no_run 96 /// use tokio::{net::UdpSocket, sync::mpsc}; 97 /// use std::{io, net::SocketAddr, sync::Arc}; 98 /// 99 /// #[tokio::main] 100 /// async fn main() -> io::Result<()> { 101 /// let sock = UdpSocket::bind("0.0.0.0:8080".parse::<SocketAddr>().unwrap()).await?; 102 /// let r = Arc::new(sock); 103 /// let s = r.clone(); 104 /// let (tx, mut rx) = mpsc::channel::<(Vec<u8>, SocketAddr)>(1_000); 105 /// 106 /// tokio::spawn(async move { 107 /// while let Some((bytes, addr)) = rx.recv().await { 108 /// let len = s.send_to(&bytes, &addr).await.unwrap(); 109 /// println!("{:?} bytes sent", len); 110 /// } 111 /// }); 112 /// 113 /// let mut buf = [0; 1024]; 114 /// loop { 115 /// let (len, addr) = r.recv_from(&mut buf).await?; 116 /// println!("{:?} bytes received from {:?}", len, addr); 117 /// tx.send((buf[..len].to_vec(), addr)).await.unwrap(); 118 /// } 119 /// } 120 /// ``` 121 /// 122 pub struct UdpSocket { 123 io: PollEvented<mio::net::UdpSocket>, 124 } 125 } 126 127 impl UdpSocket { 128 /// This function will create a new UDP socket and attempt to bind it to 129 /// the `addr` provided. 130 /// 131 /// Binding with a port number of 0 will request that the OS assigns a port 132 /// to this listener. The port allocated can be queried via the `local_addr` 133 /// method. 134 /// 135 /// # Example 136 /// 137 /// ```no_run 138 /// use tokio::net::UdpSocket; 139 /// use std::io; 140 /// 141 /// #[tokio::main] 142 /// async fn main() -> io::Result<()> { 143 /// let sock = UdpSocket::bind("0.0.0.0:8080").await?; 144 /// // use `sock` 145 /// # let _ = sock; 146 /// Ok(()) 147 /// } 148 /// ``` bind<A: ToSocketAddrs>(addr: A) -> io::Result<UdpSocket>149 pub async fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<UdpSocket> { 150 let addrs = to_socket_addrs(addr).await?; 151 let mut last_err = None; 152 153 for addr in addrs { 154 match UdpSocket::bind_addr(addr) { 155 Ok(socket) => return Ok(socket), 156 Err(e) => last_err = Some(e), 157 } 158 } 159 160 Err(last_err.unwrap_or_else(|| { 161 io::Error::new( 162 io::ErrorKind::InvalidInput, 163 "could not resolve to any address", 164 ) 165 })) 166 } 167 bind_addr(addr: SocketAddr) -> io::Result<UdpSocket>168 fn bind_addr(addr: SocketAddr) -> io::Result<UdpSocket> { 169 let sys = mio::net::UdpSocket::bind(addr)?; 170 UdpSocket::new(sys) 171 } 172 new(socket: mio::net::UdpSocket) -> io::Result<UdpSocket>173 fn new(socket: mio::net::UdpSocket) -> io::Result<UdpSocket> { 174 let io = PollEvented::new(socket)?; 175 Ok(UdpSocket { io }) 176 } 177 178 /// Creates new `UdpSocket` from a previously bound `std::net::UdpSocket`. 179 /// 180 /// This function is intended to be used to wrap a UDP socket from the 181 /// standard library in the Tokio equivalent. The conversion assumes nothing 182 /// about the underlying socket; it is left up to the user to set it in 183 /// non-blocking mode. 184 /// 185 /// This can be used in conjunction with socket2's `Socket` interface to 186 /// configure a socket before it's handed off, such as setting options like 187 /// `reuse_address` or binding to multiple addresses. 188 /// 189 /// # Panics 190 /// 191 /// This function panics if thread-local runtime is not set. 192 /// 193 /// The runtime is usually set implicitly when this function is called 194 /// from a future driven by a tokio runtime, otherwise runtime can be set 195 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. 196 /// 197 /// # Example 198 /// 199 /// ```no_run 200 /// use tokio::net::UdpSocket; 201 /// # use std::{io, net::SocketAddr}; 202 /// 203 /// # #[tokio::main] 204 /// # async fn main() -> io::Result<()> { 205 /// let addr = "0.0.0.0:8080".parse::<SocketAddr>().unwrap(); 206 /// let std_sock = std::net::UdpSocket::bind(addr)?; 207 /// std_sock.set_nonblocking(true)?; 208 /// let sock = UdpSocket::from_std(std_sock)?; 209 /// // use `sock` 210 /// # Ok(()) 211 /// # } 212 /// ``` from_std(socket: net::UdpSocket) -> io::Result<UdpSocket>213 pub fn from_std(socket: net::UdpSocket) -> io::Result<UdpSocket> { 214 let io = mio::net::UdpSocket::from_std(socket); 215 UdpSocket::new(io) 216 } 217 218 /// Turns a [`tokio::net::UdpSocket`] into a [`std::net::UdpSocket`]. 219 /// 220 /// The returned [`std::net::UdpSocket`] will have nonblocking mode set as 221 /// `true`. Use [`set_nonblocking`] to change the blocking mode if needed. 222 /// 223 /// # Examples 224 /// 225 /// ```rust,no_run 226 /// use std::error::Error; 227 /// 228 /// #[tokio::main] 229 /// async fn main() -> Result<(), Box<dyn Error>> { 230 /// let tokio_socket = tokio::net::UdpSocket::bind("127.0.0.1:0").await?; 231 /// let std_socket = tokio_socket.into_std()?; 232 /// std_socket.set_nonblocking(false)?; 233 /// Ok(()) 234 /// } 235 /// ``` 236 /// 237 /// [`tokio::net::UdpSocket`]: UdpSocket 238 /// [`std::net::UdpSocket`]: std::net::UdpSocket 239 /// [`set_nonblocking`]: fn@std::net::UdpSocket::set_nonblocking into_std(self) -> io::Result<std::net::UdpSocket>240 pub fn into_std(self) -> io::Result<std::net::UdpSocket> { 241 #[cfg(unix)] 242 { 243 use std::os::unix::io::{FromRawFd, IntoRawFd}; 244 self.io 245 .into_inner() 246 .map(|io| io.into_raw_fd()) 247 .map(|raw_fd| unsafe { std::net::UdpSocket::from_raw_fd(raw_fd) }) 248 } 249 250 #[cfg(windows)] 251 { 252 use std::os::windows::io::{FromRawSocket, IntoRawSocket}; 253 self.io 254 .into_inner() 255 .map(|io| io.into_raw_socket()) 256 .map(|raw_socket| unsafe { std::net::UdpSocket::from_raw_socket(raw_socket) }) 257 } 258 } 259 260 /// Returns the local address that this socket is bound to. 261 /// 262 /// # Example 263 /// 264 /// ```no_run 265 /// use tokio::net::UdpSocket; 266 /// # use std::{io, net::SocketAddr}; 267 /// 268 /// # #[tokio::main] 269 /// # async fn main() -> io::Result<()> { 270 /// let addr = "0.0.0.0:8080".parse::<SocketAddr>().unwrap(); 271 /// let sock = UdpSocket::bind(addr).await?; 272 /// // the address the socket is bound to 273 /// let local_addr = sock.local_addr()?; 274 /// # Ok(()) 275 /// # } 276 /// ``` local_addr(&self) -> io::Result<SocketAddr>277 pub fn local_addr(&self) -> io::Result<SocketAddr> { 278 self.io.local_addr() 279 } 280 281 /// Connects the UDP socket setting the default destination for send() and 282 /// limiting packets that are read via recv from the address specified in 283 /// `addr`. 284 /// 285 /// # Example 286 /// 287 /// ```no_run 288 /// use tokio::net::UdpSocket; 289 /// # use std::{io, net::SocketAddr}; 290 /// 291 /// # #[tokio::main] 292 /// # async fn main() -> io::Result<()> { 293 /// let sock = UdpSocket::bind("0.0.0.0:8080".parse::<SocketAddr>().unwrap()).await?; 294 /// 295 /// let remote_addr = "127.0.0.1:59600".parse::<SocketAddr>().unwrap(); 296 /// sock.connect(remote_addr).await?; 297 /// let mut buf = [0u8; 32]; 298 /// // recv from remote_addr 299 /// let len = sock.recv(&mut buf).await?; 300 /// // send to remote_addr 301 /// let _len = sock.send(&buf[..len]).await?; 302 /// # Ok(()) 303 /// # } 304 /// ``` connect<A: ToSocketAddrs>(&self, addr: A) -> io::Result<()>305 pub async fn connect<A: ToSocketAddrs>(&self, addr: A) -> io::Result<()> { 306 let addrs = to_socket_addrs(addr).await?; 307 let mut last_err = None; 308 309 for addr in addrs { 310 match self.io.connect(addr) { 311 Ok(_) => return Ok(()), 312 Err(e) => last_err = Some(e), 313 } 314 } 315 316 Err(last_err.unwrap_or_else(|| { 317 io::Error::new( 318 io::ErrorKind::InvalidInput, 319 "could not resolve to any address", 320 ) 321 })) 322 } 323 324 /// Waits for any of the requested ready states. 325 /// 326 /// This function is usually paired with `try_recv()` or `try_send()`. It 327 /// can be used to concurrently recv / send to the same socket on a single 328 /// task without splitting the socket. 329 /// 330 /// The function may complete without the socket being ready. This is a 331 /// false-positive and attempting an operation will return with 332 /// `io::ErrorKind::WouldBlock`. 333 /// 334 /// # Cancel safety 335 /// 336 /// This method is cancel safe. Once a readiness event occurs, the method 337 /// will continue to return immediately until the readiness event is 338 /// consumed by an attempt to read or write that fails with `WouldBlock` or 339 /// `Poll::Pending`. 340 /// 341 /// # Examples 342 /// 343 /// Concurrently receive from and send to the socket on the same task 344 /// without splitting. 345 /// 346 /// ```no_run 347 /// use tokio::io::{self, Interest}; 348 /// use tokio::net::UdpSocket; 349 /// 350 /// #[tokio::main] 351 /// async fn main() -> io::Result<()> { 352 /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; 353 /// socket.connect("127.0.0.1:8081").await?; 354 /// 355 /// loop { 356 /// let ready = socket.ready(Interest::READABLE | Interest::WRITABLE).await?; 357 /// 358 /// if ready.is_readable() { 359 /// // The buffer is **not** included in the async task and will only exist 360 /// // on the stack. 361 /// let mut data = [0; 1024]; 362 /// match socket.try_recv(&mut data[..]) { 363 /// Ok(n) => { 364 /// println!("received {:?}", &data[..n]); 365 /// } 366 /// // False-positive, continue 367 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {} 368 /// Err(e) => { 369 /// return Err(e); 370 /// } 371 /// } 372 /// } 373 /// 374 /// if ready.is_writable() { 375 /// // Write some data 376 /// match socket.try_send(b"hello world") { 377 /// Ok(n) => { 378 /// println!("sent {} bytes", n); 379 /// } 380 /// // False-positive, continue 381 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {} 382 /// Err(e) => { 383 /// return Err(e); 384 /// } 385 /// } 386 /// } 387 /// } 388 /// } 389 /// ``` ready(&self, interest: Interest) -> io::Result<Ready>390 pub async fn ready(&self, interest: Interest) -> io::Result<Ready> { 391 let event = self.io.registration().readiness(interest).await?; 392 Ok(event.ready) 393 } 394 395 /// Waits for the socket to become writable. 396 /// 397 /// This function is equivalent to `ready(Interest::WRITABLE)` and is 398 /// usually paired with `try_send()` or `try_send_to()`. 399 /// 400 /// The function may complete without the socket being writable. This is a 401 /// false-positive and attempting a `try_send()` will return with 402 /// `io::ErrorKind::WouldBlock`. 403 /// 404 /// # Cancel safety 405 /// 406 /// This method is cancel safe. Once a readiness event occurs, the method 407 /// will continue to return immediately until the readiness event is 408 /// consumed by an attempt to write that fails with `WouldBlock` or 409 /// `Poll::Pending`. 410 /// 411 /// # Examples 412 /// 413 /// ```no_run 414 /// use tokio::net::UdpSocket; 415 /// use std::io; 416 /// 417 /// #[tokio::main] 418 /// async fn main() -> io::Result<()> { 419 /// // Bind socket 420 /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; 421 /// socket.connect("127.0.0.1:8081").await?; 422 /// 423 /// loop { 424 /// // Wait for the socket to be writable 425 /// socket.writable().await?; 426 /// 427 /// // Try to send data, this may still fail with `WouldBlock` 428 /// // if the readiness event is a false positive. 429 /// match socket.try_send(b"hello world") { 430 /// Ok(n) => { 431 /// break; 432 /// } 433 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 434 /// continue; 435 /// } 436 /// Err(e) => { 437 /// return Err(e); 438 /// } 439 /// } 440 /// } 441 /// 442 /// Ok(()) 443 /// } 444 /// ``` writable(&self) -> io::Result<()>445 pub async fn writable(&self) -> io::Result<()> { 446 self.ready(Interest::WRITABLE).await?; 447 Ok(()) 448 } 449 450 /// Polls for write/send readiness. 451 /// 452 /// If the udp stream is not currently ready for sending, this method will 453 /// store a clone of the `Waker` from the provided `Context`. When the udp 454 /// stream becomes ready for sending, `Waker::wake` will be called on the 455 /// waker. 456 /// 457 /// Note that on multiple calls to `poll_send_ready` or `poll_send`, only 458 /// the `Waker` from the `Context` passed to the most recent call is 459 /// scheduled to receive a wakeup. (However, `poll_recv_ready` retains a 460 /// second, independent waker.) 461 /// 462 /// This function is intended for cases where creating and pinning a future 463 /// via [`writable`] is not feasible. Where possible, using [`writable`] is 464 /// preferred, as this supports polling from multiple tasks at once. 465 /// 466 /// # Return value 467 /// 468 /// The function returns: 469 /// 470 /// * `Poll::Pending` if the udp stream is not ready for writing. 471 /// * `Poll::Ready(Ok(()))` if the udp stream is ready for writing. 472 /// * `Poll::Ready(Err(e))` if an error is encountered. 473 /// 474 /// # Errors 475 /// 476 /// This function may encounter any standard I/O error except `WouldBlock`. 477 /// 478 /// [`writable`]: method@Self::writable poll_send_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>479 pub fn poll_send_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { 480 self.io.registration().poll_write_ready(cx).map_ok(|_| ()) 481 } 482 483 /// Sends data on the socket to the remote address that the socket is 484 /// connected to. 485 /// 486 /// The [`connect`] method will connect this socket to a remote address. 487 /// This method will fail if the socket is not connected. 488 /// 489 /// [`connect`]: method@Self::connect 490 /// 491 /// # Return 492 /// 493 /// On success, the number of bytes sent is returned, otherwise, the 494 /// encountered error is returned. 495 /// 496 /// # Cancel safety 497 /// 498 /// This method is cancel safe. If `send` is used as the event in a 499 /// [`tokio::select!`](crate::select) statement and some other branch 500 /// completes first, then it is guaranteed that the message was not sent. 501 /// 502 /// # Examples 503 /// 504 /// ```no_run 505 /// use tokio::io; 506 /// use tokio::net::UdpSocket; 507 /// 508 /// #[tokio::main] 509 /// async fn main() -> io::Result<()> { 510 /// // Bind socket 511 /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; 512 /// socket.connect("127.0.0.1:8081").await?; 513 /// 514 /// // Send a message 515 /// socket.send(b"hello world").await?; 516 /// 517 /// Ok(()) 518 /// } 519 /// ``` send(&self, buf: &[u8]) -> io::Result<usize>520 pub async fn send(&self, buf: &[u8]) -> io::Result<usize> { 521 self.io 522 .registration() 523 .async_io(Interest::WRITABLE, || self.io.send(buf)) 524 .await 525 } 526 527 /// Attempts to send data on the socket to the remote address to which it 528 /// was previously `connect`ed. 529 /// 530 /// The [`connect`] method will connect this socket to a remote address. 531 /// This method will fail if the socket is not connected. 532 /// 533 /// Note that on multiple calls to a `poll_*` method in the send direction, 534 /// only the `Waker` from the `Context` passed to the most recent call will 535 /// be scheduled to receive a wakeup. 536 /// 537 /// # Return value 538 /// 539 /// The function returns: 540 /// 541 /// * `Poll::Pending` if the socket is not available to write 542 /// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent 543 /// * `Poll::Ready(Err(e))` if an error is encountered. 544 /// 545 /// # Errors 546 /// 547 /// This function may encounter any standard I/O error except `WouldBlock`. 548 /// 549 /// [`connect`]: method@Self::connect poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>>550 pub fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> { 551 self.io 552 .registration() 553 .poll_write_io(cx, || self.io.send(buf)) 554 } 555 556 /// Tries to send data on the socket to the remote address to which it is 557 /// connected. 558 /// 559 /// When the socket buffer is full, `Err(io::ErrorKind::WouldBlock)` is 560 /// returned. This function is usually paired with `writable()`. 561 /// 562 /// # Returns 563 /// 564 /// If successful, `Ok(n)` is returned, where `n` is the number of bytes 565 /// sent. If the socket is not ready to send data, 566 /// `Err(ErrorKind::WouldBlock)` is returned. 567 /// 568 /// # Examples 569 /// 570 /// ```no_run 571 /// use tokio::net::UdpSocket; 572 /// use std::io; 573 /// 574 /// #[tokio::main] 575 /// async fn main() -> io::Result<()> { 576 /// // Bind a UDP socket 577 /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; 578 /// 579 /// // Connect to a peer 580 /// socket.connect("127.0.0.1:8081").await?; 581 /// 582 /// loop { 583 /// // Wait for the socket to be writable 584 /// socket.writable().await?; 585 /// 586 /// // Try to send data, this may still fail with `WouldBlock` 587 /// // if the readiness event is a false positive. 588 /// match socket.try_send(b"hello world") { 589 /// Ok(n) => { 590 /// break; 591 /// } 592 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 593 /// continue; 594 /// } 595 /// Err(e) => { 596 /// return Err(e); 597 /// } 598 /// } 599 /// } 600 /// 601 /// Ok(()) 602 /// } 603 /// ``` try_send(&self, buf: &[u8]) -> io::Result<usize>604 pub fn try_send(&self, buf: &[u8]) -> io::Result<usize> { 605 self.io 606 .registration() 607 .try_io(Interest::WRITABLE, || self.io.send(buf)) 608 } 609 610 /// Waits for the socket to become readable. 611 /// 612 /// This function is equivalent to `ready(Interest::READABLE)` and is usually 613 /// paired with `try_recv()`. 614 /// 615 /// The function may complete without the socket being readable. This is a 616 /// false-positive and attempting a `try_recv()` will return with 617 /// `io::ErrorKind::WouldBlock`. 618 /// 619 /// # Cancel safety 620 /// 621 /// This method is cancel safe. Once a readiness event occurs, the method 622 /// will continue to return immediately until the readiness event is 623 /// consumed by an attempt to read that fails with `WouldBlock` or 624 /// `Poll::Pending`. 625 /// 626 /// # Examples 627 /// 628 /// ```no_run 629 /// use tokio::net::UdpSocket; 630 /// use std::io; 631 /// 632 /// #[tokio::main] 633 /// async fn main() -> io::Result<()> { 634 /// // Connect to a peer 635 /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; 636 /// socket.connect("127.0.0.1:8081").await?; 637 /// 638 /// loop { 639 /// // Wait for the socket to be readable 640 /// socket.readable().await?; 641 /// 642 /// // The buffer is **not** included in the async task and will 643 /// // only exist on the stack. 644 /// let mut buf = [0; 1024]; 645 /// 646 /// // Try to recv data, this may still fail with `WouldBlock` 647 /// // if the readiness event is a false positive. 648 /// match socket.try_recv(&mut buf) { 649 /// Ok(n) => { 650 /// println!("GOT {:?}", &buf[..n]); 651 /// break; 652 /// } 653 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 654 /// continue; 655 /// } 656 /// Err(e) => { 657 /// return Err(e); 658 /// } 659 /// } 660 /// } 661 /// 662 /// Ok(()) 663 /// } 664 /// ``` readable(&self) -> io::Result<()>665 pub async fn readable(&self) -> io::Result<()> { 666 self.ready(Interest::READABLE).await?; 667 Ok(()) 668 } 669 670 /// Polls for read/receive readiness. 671 /// 672 /// If the udp stream is not currently ready for receiving, this method will 673 /// store a clone of the `Waker` from the provided `Context`. When the udp 674 /// socket becomes ready for reading, `Waker::wake` will be called on the 675 /// waker. 676 /// 677 /// Note that on multiple calls to `poll_recv_ready`, `poll_recv` or 678 /// `poll_peek`, only the `Waker` from the `Context` passed to the most 679 /// recent call is scheduled to receive a wakeup. (However, 680 /// `poll_send_ready` retains a second, independent waker.) 681 /// 682 /// This function is intended for cases where creating and pinning a future 683 /// via [`readable`] is not feasible. Where possible, using [`readable`] is 684 /// preferred, as this supports polling from multiple tasks at once. 685 /// 686 /// # Return value 687 /// 688 /// The function returns: 689 /// 690 /// * `Poll::Pending` if the udp stream is not ready for reading. 691 /// * `Poll::Ready(Ok(()))` if the udp stream is ready for reading. 692 /// * `Poll::Ready(Err(e))` if an error is encountered. 693 /// 694 /// # Errors 695 /// 696 /// This function may encounter any standard I/O error except `WouldBlock`. 697 /// 698 /// [`readable`]: method@Self::readable poll_recv_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>699 pub fn poll_recv_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { 700 self.io.registration().poll_read_ready(cx).map_ok(|_| ()) 701 } 702 703 /// Receives a single datagram message on the socket from the remote address 704 /// to which it is connected. On success, returns the number of bytes read. 705 /// 706 /// The function must be called with valid byte array `buf` of sufficient 707 /// size to hold the message bytes. If a message is too long to fit in the 708 /// supplied buffer, excess bytes may be discarded. 709 /// 710 /// The [`connect`] method will connect this socket to a remote address. 711 /// This method will fail if the socket is not connected. 712 /// 713 /// # Cancel safety 714 /// 715 /// This method is cancel safe. If `recv_from` is used as the event in a 716 /// [`tokio::select!`](crate::select) statement and some other branch 717 /// completes first, it is guaranteed that no messages were received on this 718 /// socket. 719 /// 720 /// [`connect`]: method@Self::connect 721 /// 722 /// ```no_run 723 /// use tokio::net::UdpSocket; 724 /// use std::io; 725 /// 726 /// #[tokio::main] 727 /// async fn main() -> io::Result<()> { 728 /// // Bind socket 729 /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; 730 /// socket.connect("127.0.0.1:8081").await?; 731 /// 732 /// let mut buf = vec![0; 10]; 733 /// let n = socket.recv(&mut buf).await?; 734 /// 735 /// println!("received {} bytes {:?}", n, &buf[..n]); 736 /// 737 /// Ok(()) 738 /// } 739 /// ``` recv(&self, buf: &mut [u8]) -> io::Result<usize>740 pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> { 741 self.io 742 .registration() 743 .async_io(Interest::READABLE, || self.io.recv(buf)) 744 .await 745 } 746 747 /// Attempts to receive a single datagram message on the socket from the remote 748 /// address to which it is `connect`ed. 749 /// 750 /// The [`connect`] method will connect this socket to a remote address. This method 751 /// resolves to an error if the socket is not connected. 752 /// 753 /// Note that on multiple calls to a `poll_*` method in the recv direction, only the 754 /// `Waker` from the `Context` passed to the most recent call will be scheduled to 755 /// receive a wakeup. 756 /// 757 /// # Return value 758 /// 759 /// The function returns: 760 /// 761 /// * `Poll::Pending` if the socket is not ready to read 762 /// * `Poll::Ready(Ok(()))` reads data `ReadBuf` if the socket is ready 763 /// * `Poll::Ready(Err(e))` if an error is encountered. 764 /// 765 /// # Errors 766 /// 767 /// This function may encounter any standard I/O error except `WouldBlock`. 768 /// 769 /// [`connect`]: method@Self::connect poll_recv(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>>770 pub fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> { 771 let n = ready!(self.io.registration().poll_read_io(cx, || { 772 // Safety: will not read the maybe uninitialized bytes. 773 let b = unsafe { 774 &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) 775 }; 776 777 self.io.recv(b) 778 }))?; 779 780 // Safety: We trust `recv` to have filled up `n` bytes in the buffer. 781 unsafe { 782 buf.assume_init(n); 783 } 784 buf.advance(n); 785 Poll::Ready(Ok(())) 786 } 787 788 /// Tries to receive a single datagram message on the socket from the remote 789 /// address to which it is connected. On success, returns the number of 790 /// bytes read. 791 /// 792 /// The function must be called with valid byte array buf of sufficient size 793 /// to hold the message bytes. If a message is too long to fit in the 794 /// supplied buffer, excess bytes may be discarded. 795 /// 796 /// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is 797 /// returned. This function is usually paired with `readable()`. 798 /// 799 /// # Examples 800 /// 801 /// ```no_run 802 /// use tokio::net::UdpSocket; 803 /// use std::io; 804 /// 805 /// #[tokio::main] 806 /// async fn main() -> io::Result<()> { 807 /// // Connect to a peer 808 /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; 809 /// socket.connect("127.0.0.1:8081").await?; 810 /// 811 /// loop { 812 /// // Wait for the socket to be readable 813 /// socket.readable().await?; 814 /// 815 /// // The buffer is **not** included in the async task and will 816 /// // only exist on the stack. 817 /// let mut buf = [0; 1024]; 818 /// 819 /// // Try to recv data, this may still fail with `WouldBlock` 820 /// // if the readiness event is a false positive. 821 /// match socket.try_recv(&mut buf) { 822 /// Ok(n) => { 823 /// println!("GOT {:?}", &buf[..n]); 824 /// break; 825 /// } 826 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 827 /// continue; 828 /// } 829 /// Err(e) => { 830 /// return Err(e); 831 /// } 832 /// } 833 /// } 834 /// 835 /// Ok(()) 836 /// } 837 /// ``` try_recv(&self, buf: &mut [u8]) -> io::Result<usize>838 pub fn try_recv(&self, buf: &mut [u8]) -> io::Result<usize> { 839 self.io 840 .registration() 841 .try_io(Interest::READABLE, || self.io.recv(buf)) 842 } 843 844 cfg_io_util! { 845 /// Tries to receive data from the stream into the provided buffer, advancing the 846 /// buffer's internal cursor, returning how many bytes were read. 847 /// 848 /// The function must be called with valid byte array buf of sufficient size 849 /// to hold the message bytes. If a message is too long to fit in the 850 /// supplied buffer, excess bytes may be discarded. 851 /// 852 /// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is 853 /// returned. This function is usually paired with `readable()`. 854 /// 855 /// # Examples 856 /// 857 /// ```no_run 858 /// use tokio::net::UdpSocket; 859 /// use std::io; 860 /// 861 /// #[tokio::main] 862 /// async fn main() -> io::Result<()> { 863 /// // Connect to a peer 864 /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; 865 /// socket.connect("127.0.0.1:8081").await?; 866 /// 867 /// loop { 868 /// // Wait for the socket to be readable 869 /// socket.readable().await?; 870 /// 871 /// let mut buf = Vec::with_capacity(1024); 872 /// 873 /// // Try to recv data, this may still fail with `WouldBlock` 874 /// // if the readiness event is a false positive. 875 /// match socket.try_recv_buf(&mut buf) { 876 /// Ok(n) => { 877 /// println!("GOT {:?}", &buf[..n]); 878 /// break; 879 /// } 880 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 881 /// continue; 882 /// } 883 /// Err(e) => { 884 /// return Err(e); 885 /// } 886 /// } 887 /// } 888 /// 889 /// Ok(()) 890 /// } 891 /// ``` 892 pub fn try_recv_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> { 893 self.io.registration().try_io(Interest::READABLE, || { 894 let dst = buf.chunk_mut(); 895 let dst = 896 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) }; 897 898 // Safety: We trust `UdpSocket::recv` to have filled up `n` bytes in the 899 // buffer. 900 let n = (&*self.io).recv(dst)?; 901 902 unsafe { 903 buf.advance_mut(n); 904 } 905 906 Ok(n) 907 }) 908 } 909 910 /// Tries to receive a single datagram message on the socket. On success, 911 /// returns the number of bytes read and the origin. 912 /// 913 /// The function must be called with valid byte array buf of sufficient size 914 /// to hold the message bytes. If a message is too long to fit in the 915 /// supplied buffer, excess bytes may be discarded. 916 /// 917 /// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is 918 /// returned. This function is usually paired with `readable()`. 919 /// 920 /// # Examples 921 /// 922 /// ```no_run 923 /// use tokio::net::UdpSocket; 924 /// use std::io; 925 /// 926 /// #[tokio::main] 927 /// async fn main() -> io::Result<()> { 928 /// // Connect to a peer 929 /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; 930 /// 931 /// loop { 932 /// // Wait for the socket to be readable 933 /// socket.readable().await?; 934 /// 935 /// let mut buf = Vec::with_capacity(1024); 936 /// 937 /// // Try to recv data, this may still fail with `WouldBlock` 938 /// // if the readiness event is a false positive. 939 /// match socket.try_recv_buf_from(&mut buf) { 940 /// Ok((n, _addr)) => { 941 /// println!("GOT {:?}", &buf[..n]); 942 /// break; 943 /// } 944 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 945 /// continue; 946 /// } 947 /// Err(e) => { 948 /// return Err(e); 949 /// } 950 /// } 951 /// } 952 /// 953 /// Ok(()) 954 /// } 955 /// ``` 956 pub fn try_recv_buf_from<B: BufMut>(&self, buf: &mut B) -> io::Result<(usize, SocketAddr)> { 957 self.io.registration().try_io(Interest::READABLE, || { 958 let dst = buf.chunk_mut(); 959 let dst = 960 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) }; 961 962 // Safety: We trust `UdpSocket::recv_from` to have filled up `n` bytes in the 963 // buffer. 964 let (n, addr) = (&*self.io).recv_from(dst)?; 965 966 unsafe { 967 buf.advance_mut(n); 968 } 969 970 Ok((n, addr)) 971 }) 972 } 973 } 974 975 /// Sends data on the socket to the given address. On success, returns the 976 /// number of bytes written. 977 /// 978 /// Address type can be any implementor of [`ToSocketAddrs`] trait. See its 979 /// documentation for concrete examples. 980 /// 981 /// It is possible for `addr` to yield multiple addresses, but `send_to` 982 /// will only send data to the first address yielded by `addr`. 983 /// 984 /// This will return an error when the IP version of the local socket does 985 /// not match that returned from [`ToSocketAddrs`]. 986 /// 987 /// [`ToSocketAddrs`]: crate::net::ToSocketAddrs 988 /// 989 /// # Cancel safety 990 /// 991 /// This method is cancel safe. If `send_to` is used as the event in a 992 /// [`tokio::select!`](crate::select) statement and some other branch 993 /// completes first, then it is guaranteed that the message was not sent. 994 /// 995 /// # Example 996 /// 997 /// ```no_run 998 /// use tokio::net::UdpSocket; 999 /// use std::io; 1000 /// 1001 /// #[tokio::main] 1002 /// async fn main() -> io::Result<()> { 1003 /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; 1004 /// let len = socket.send_to(b"hello world", "127.0.0.1:8081").await?; 1005 /// 1006 /// println!("Sent {} bytes", len); 1007 /// 1008 /// Ok(()) 1009 /// } 1010 /// ``` send_to<A: ToSocketAddrs>(&self, buf: &[u8], target: A) -> io::Result<usize>1011 pub async fn send_to<A: ToSocketAddrs>(&self, buf: &[u8], target: A) -> io::Result<usize> { 1012 let mut addrs = to_socket_addrs(target).await?; 1013 1014 match addrs.next() { 1015 Some(target) => self.send_to_addr(buf, target).await, 1016 None => Err(io::Error::new( 1017 io::ErrorKind::InvalidInput, 1018 "no addresses to send data to", 1019 )), 1020 } 1021 } 1022 1023 /// Attempts to send data on the socket to a given address. 1024 /// 1025 /// Note that on multiple calls to a `poll_*` method in the send direction, only the 1026 /// `Waker` from the `Context` passed to the most recent call will be scheduled to 1027 /// receive a wakeup. 1028 /// 1029 /// # Return value 1030 /// 1031 /// The function returns: 1032 /// 1033 /// * `Poll::Pending` if the socket is not ready to write 1034 /// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent. 1035 /// * `Poll::Ready(Err(e))` if an error is encountered. 1036 /// 1037 /// # Errors 1038 /// 1039 /// This function may encounter any standard I/O error except `WouldBlock`. poll_send_to( &self, cx: &mut Context<'_>, buf: &[u8], target: SocketAddr, ) -> Poll<io::Result<usize>>1040 pub fn poll_send_to( 1041 &self, 1042 cx: &mut Context<'_>, 1043 buf: &[u8], 1044 target: SocketAddr, 1045 ) -> Poll<io::Result<usize>> { 1046 self.io 1047 .registration() 1048 .poll_write_io(cx, || self.io.send_to(buf, target)) 1049 } 1050 1051 /// Tries to send data on the socket to the given address, but if the send is 1052 /// blocked this will return right away. 1053 /// 1054 /// This function is usually paired with `writable()`. 1055 /// 1056 /// # Returns 1057 /// 1058 /// If successful, returns the number of bytes sent 1059 /// 1060 /// Users should ensure that when the remote cannot receive, the 1061 /// [`ErrorKind::WouldBlock`] is properly handled. An error can also occur 1062 /// if the IP version of the socket does not match that of `target`. 1063 /// 1064 /// [`ErrorKind::WouldBlock`]: std::io::ErrorKind::WouldBlock 1065 /// 1066 /// # Example 1067 /// 1068 /// ```no_run 1069 /// use tokio::net::UdpSocket; 1070 /// use std::error::Error; 1071 /// use std::io; 1072 /// 1073 /// #[tokio::main] 1074 /// async fn main() -> Result<(), Box<dyn Error>> { 1075 /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; 1076 /// 1077 /// let dst = "127.0.0.1:8081".parse()?; 1078 /// 1079 /// loop { 1080 /// socket.writable().await?; 1081 /// 1082 /// match socket.try_send_to(&b"hello world"[..], dst) { 1083 /// Ok(sent) => { 1084 /// println!("sent {} bytes", sent); 1085 /// break; 1086 /// } 1087 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 1088 /// // Writable false positive. 1089 /// continue; 1090 /// } 1091 /// Err(e) => return Err(e.into()), 1092 /// } 1093 /// } 1094 /// 1095 /// Ok(()) 1096 /// } 1097 /// ``` try_send_to(&self, buf: &[u8], target: SocketAddr) -> io::Result<usize>1098 pub fn try_send_to(&self, buf: &[u8], target: SocketAddr) -> io::Result<usize> { 1099 self.io 1100 .registration() 1101 .try_io(Interest::WRITABLE, || self.io.send_to(buf, target)) 1102 } 1103 send_to_addr(&self, buf: &[u8], target: SocketAddr) -> io::Result<usize>1104 async fn send_to_addr(&self, buf: &[u8], target: SocketAddr) -> io::Result<usize> { 1105 self.io 1106 .registration() 1107 .async_io(Interest::WRITABLE, || self.io.send_to(buf, target)) 1108 .await 1109 } 1110 1111 /// Receives a single datagram message on the socket. On success, returns 1112 /// the number of bytes read and the origin. 1113 /// 1114 /// The function must be called with valid byte array `buf` of sufficient 1115 /// size to hold the message bytes. If a message is too long to fit in the 1116 /// supplied buffer, excess bytes may be discarded. 1117 /// 1118 /// # Cancel safety 1119 /// 1120 /// This method is cancel safe. If `recv_from` is used as the event in a 1121 /// [`tokio::select!`](crate::select) statement and some other branch 1122 /// completes first, it is guaranteed that no messages were received on this 1123 /// socket. 1124 /// 1125 /// # Example 1126 /// 1127 /// ```no_run 1128 /// use tokio::net::UdpSocket; 1129 /// use std::io; 1130 /// 1131 /// #[tokio::main] 1132 /// async fn main() -> io::Result<()> { 1133 /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; 1134 /// 1135 /// let mut buf = vec![0u8; 32]; 1136 /// let (len, addr) = socket.recv_from(&mut buf).await?; 1137 /// 1138 /// println!("received {:?} bytes from {:?}", len, addr); 1139 /// 1140 /// Ok(()) 1141 /// } 1142 /// ``` recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)>1143 pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { 1144 self.io 1145 .registration() 1146 .async_io(Interest::READABLE, || self.io.recv_from(buf)) 1147 .await 1148 } 1149 1150 /// Attempts to receive a single datagram on the socket. 1151 /// 1152 /// Note that on multiple calls to a `poll_*` method in the recv direction, only the 1153 /// `Waker` from the `Context` passed to the most recent call will be scheduled to 1154 /// receive a wakeup. 1155 /// 1156 /// # Return value 1157 /// 1158 /// The function returns: 1159 /// 1160 /// * `Poll::Pending` if the socket is not ready to read 1161 /// * `Poll::Ready(Ok(addr))` reads data from `addr` into `ReadBuf` if the socket is ready 1162 /// * `Poll::Ready(Err(e))` if an error is encountered. 1163 /// 1164 /// # Errors 1165 /// 1166 /// This function may encounter any standard I/O error except `WouldBlock`. poll_recv_from( &self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<SocketAddr>>1167 pub fn poll_recv_from( 1168 &self, 1169 cx: &mut Context<'_>, 1170 buf: &mut ReadBuf<'_>, 1171 ) -> Poll<io::Result<SocketAddr>> { 1172 let (n, addr) = ready!(self.io.registration().poll_read_io(cx, || { 1173 // Safety: will not read the maybe uninitialized bytes. 1174 let b = unsafe { 1175 &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) 1176 }; 1177 1178 self.io.recv_from(b) 1179 }))?; 1180 1181 // Safety: We trust `recv` to have filled up `n` bytes in the buffer. 1182 unsafe { 1183 buf.assume_init(n); 1184 } 1185 buf.advance(n); 1186 Poll::Ready(Ok(addr)) 1187 } 1188 1189 /// Tries to receive a single datagram message on the socket. On success, 1190 /// returns the number of bytes read and the origin. 1191 /// 1192 /// The function must be called with valid byte array buf of sufficient size 1193 /// to hold the message bytes. If a message is too long to fit in the 1194 /// supplied buffer, excess bytes may be discarded. 1195 /// 1196 /// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is 1197 /// returned. This function is usually paired with `readable()`. 1198 /// 1199 /// # Examples 1200 /// 1201 /// ```no_run 1202 /// use tokio::net::UdpSocket; 1203 /// use std::io; 1204 /// 1205 /// #[tokio::main] 1206 /// async fn main() -> io::Result<()> { 1207 /// // Connect to a peer 1208 /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; 1209 /// 1210 /// loop { 1211 /// // Wait for the socket to be readable 1212 /// socket.readable().await?; 1213 /// 1214 /// // The buffer is **not** included in the async task and will 1215 /// // only exist on the stack. 1216 /// let mut buf = [0; 1024]; 1217 /// 1218 /// // Try to recv data, this may still fail with `WouldBlock` 1219 /// // if the readiness event is a false positive. 1220 /// match socket.try_recv_from(&mut buf) { 1221 /// Ok((n, _addr)) => { 1222 /// println!("GOT {:?}", &buf[..n]); 1223 /// break; 1224 /// } 1225 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 1226 /// continue; 1227 /// } 1228 /// Err(e) => { 1229 /// return Err(e); 1230 /// } 1231 /// } 1232 /// } 1233 /// 1234 /// Ok(()) 1235 /// } 1236 /// ``` try_recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)>1237 pub fn try_recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { 1238 self.io 1239 .registration() 1240 .try_io(Interest::READABLE, || self.io.recv_from(buf)) 1241 } 1242 1243 /// Tries to read or write from the socket using a user-provided IO operation. 1244 /// 1245 /// If the socket is ready, the provided closure is called. The closure 1246 /// should attempt to perform IO operation from the socket by manually 1247 /// calling the appropriate syscall. If the operation fails because the 1248 /// socket is not actually ready, then the closure should return a 1249 /// `WouldBlock` error and the readiness flag is cleared. The return value 1250 /// of the closure is then returned by `try_io`. 1251 /// 1252 /// If the socket is not ready, then the closure is not called 1253 /// and a `WouldBlock` error is returned. 1254 /// 1255 /// The closure should only return a `WouldBlock` error if it has performed 1256 /// an IO operation on the socket that failed due to the socket not being 1257 /// ready. Returning a `WouldBlock` error in any other situation will 1258 /// incorrectly clear the readiness flag, which can cause the socket to 1259 /// behave incorrectly. 1260 /// 1261 /// The closure should not perform the IO operation using any of the methods 1262 /// defined on the Tokio `UdpSocket` type, as this will mess with the 1263 /// readiness flag and can cause the socket to behave incorrectly. 1264 /// 1265 /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function. 1266 /// 1267 /// [`readable()`]: UdpSocket::readable() 1268 /// [`writable()`]: UdpSocket::writable() 1269 /// [`ready()`]: UdpSocket::ready() try_io<R>( &self, interest: Interest, f: impl FnOnce() -> io::Result<R>, ) -> io::Result<R>1270 pub fn try_io<R>( 1271 &self, 1272 interest: Interest, 1273 f: impl FnOnce() -> io::Result<R>, 1274 ) -> io::Result<R> { 1275 self.io.registration().try_io(interest, f) 1276 } 1277 1278 /// Receives data from the socket, without removing it from the input queue. 1279 /// On success, returns the number of bytes read and the address from whence 1280 /// the data came. 1281 /// 1282 /// # Notes 1283 /// 1284 /// On Windows, if the data is larger than the buffer specified, the buffer 1285 /// is filled with the first part of the data, and peek_from returns the error 1286 /// WSAEMSGSIZE(10040). The excess data is lost. 1287 /// Make sure to always use a sufficiently large buffer to hold the 1288 /// maximum UDP packet size, which can be up to 65536 bytes in size. 1289 /// 1290 /// # Examples 1291 /// 1292 /// ```no_run 1293 /// use tokio::net::UdpSocket; 1294 /// use std::io; 1295 /// 1296 /// #[tokio::main] 1297 /// async fn main() -> io::Result<()> { 1298 /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; 1299 /// 1300 /// let mut buf = vec![0u8; 32]; 1301 /// let (len, addr) = socket.peek_from(&mut buf).await?; 1302 /// 1303 /// println!("peeked {:?} bytes from {:?}", len, addr); 1304 /// 1305 /// Ok(()) 1306 /// } 1307 /// ``` peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)>1308 pub async fn peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { 1309 self.io 1310 .registration() 1311 .async_io(Interest::READABLE, || self.io.peek_from(buf)) 1312 .await 1313 } 1314 1315 /// Receives data from the socket, without removing it from the input queue. 1316 /// On success, returns the number of bytes read. 1317 /// 1318 /// # Notes 1319 /// 1320 /// Note that on multiple calls to a `poll_*` method in the recv direction, only the 1321 /// `Waker` from the `Context` passed to the most recent call will be scheduled to 1322 /// receive a wakeup 1323 /// 1324 /// On Windows, if the data is larger than the buffer specified, the buffer 1325 /// is filled with the first part of the data, and peek returns the error 1326 /// WSAEMSGSIZE(10040). The excess data is lost. 1327 /// Make sure to always use a sufficiently large buffer to hold the 1328 /// maximum UDP packet size, which can be up to 65536 bytes in size. 1329 /// 1330 /// # Return value 1331 /// 1332 /// The function returns: 1333 /// 1334 /// * `Poll::Pending` if the socket is not ready to read 1335 /// * `Poll::Ready(Ok(addr))` reads data from `addr` into `ReadBuf` if the socket is ready 1336 /// * `Poll::Ready(Err(e))` if an error is encountered. 1337 /// 1338 /// # Errors 1339 /// 1340 /// This function may encounter any standard I/O error except `WouldBlock`. poll_peek_from( &self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<SocketAddr>>1341 pub fn poll_peek_from( 1342 &self, 1343 cx: &mut Context<'_>, 1344 buf: &mut ReadBuf<'_>, 1345 ) -> Poll<io::Result<SocketAddr>> { 1346 let (n, addr) = ready!(self.io.registration().poll_read_io(cx, || { 1347 // Safety: will not read the maybe uninitialized bytes. 1348 let b = unsafe { 1349 &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) 1350 }; 1351 1352 self.io.peek_from(b) 1353 }))?; 1354 1355 // Safety: We trust `recv` to have filled up `n` bytes in the buffer. 1356 unsafe { 1357 buf.assume_init(n); 1358 } 1359 buf.advance(n); 1360 Poll::Ready(Ok(addr)) 1361 } 1362 1363 /// Gets the value of the `SO_BROADCAST` option for this socket. 1364 /// 1365 /// For more information about this option, see [`set_broadcast`]. 1366 /// 1367 /// [`set_broadcast`]: method@Self::set_broadcast broadcast(&self) -> io::Result<bool>1368 pub fn broadcast(&self) -> io::Result<bool> { 1369 self.io.broadcast() 1370 } 1371 1372 /// Sets the value of the `SO_BROADCAST` option for this socket. 1373 /// 1374 /// When enabled, this socket is allowed to send packets to a broadcast 1375 /// address. set_broadcast(&self, on: bool) -> io::Result<()>1376 pub fn set_broadcast(&self, on: bool) -> io::Result<()> { 1377 self.io.set_broadcast(on) 1378 } 1379 1380 /// Gets the value of the `IP_MULTICAST_LOOP` option for this socket. 1381 /// 1382 /// For more information about this option, see [`set_multicast_loop_v4`]. 1383 /// 1384 /// [`set_multicast_loop_v4`]: method@Self::set_multicast_loop_v4 multicast_loop_v4(&self) -> io::Result<bool>1385 pub fn multicast_loop_v4(&self) -> io::Result<bool> { 1386 self.io.multicast_loop_v4() 1387 } 1388 1389 /// Sets the value of the `IP_MULTICAST_LOOP` option for this socket. 1390 /// 1391 /// If enabled, multicast packets will be looped back to the local socket. 1392 /// 1393 /// # Note 1394 /// 1395 /// This may not have any affect on IPv6 sockets. set_multicast_loop_v4(&self, on: bool) -> io::Result<()>1396 pub fn set_multicast_loop_v4(&self, on: bool) -> io::Result<()> { 1397 self.io.set_multicast_loop_v4(on) 1398 } 1399 1400 /// Gets the value of the `IP_MULTICAST_TTL` option for this socket. 1401 /// 1402 /// For more information about this option, see [`set_multicast_ttl_v4`]. 1403 /// 1404 /// [`set_multicast_ttl_v4`]: method@Self::set_multicast_ttl_v4 multicast_ttl_v4(&self) -> io::Result<u32>1405 pub fn multicast_ttl_v4(&self) -> io::Result<u32> { 1406 self.io.multicast_ttl_v4() 1407 } 1408 1409 /// Sets the value of the `IP_MULTICAST_TTL` option for this socket. 1410 /// 1411 /// Indicates the time-to-live value of outgoing multicast packets for 1412 /// this socket. The default value is 1 which means that multicast packets 1413 /// don't leave the local network unless explicitly requested. 1414 /// 1415 /// # Note 1416 /// 1417 /// This may not have any affect on IPv6 sockets. set_multicast_ttl_v4(&self, ttl: u32) -> io::Result<()>1418 pub fn set_multicast_ttl_v4(&self, ttl: u32) -> io::Result<()> { 1419 self.io.set_multicast_ttl_v4(ttl) 1420 } 1421 1422 /// Gets the value of the `IPV6_MULTICAST_LOOP` option for this socket. 1423 /// 1424 /// For more information about this option, see [`set_multicast_loop_v6`]. 1425 /// 1426 /// [`set_multicast_loop_v6`]: method@Self::set_multicast_loop_v6 multicast_loop_v6(&self) -> io::Result<bool>1427 pub fn multicast_loop_v6(&self) -> io::Result<bool> { 1428 self.io.multicast_loop_v6() 1429 } 1430 1431 /// Sets the value of the `IPV6_MULTICAST_LOOP` option for this socket. 1432 /// 1433 /// Controls whether this socket sees the multicast packets it sends itself. 1434 /// 1435 /// # Note 1436 /// 1437 /// This may not have any affect on IPv4 sockets. set_multicast_loop_v6(&self, on: bool) -> io::Result<()>1438 pub fn set_multicast_loop_v6(&self, on: bool) -> io::Result<()> { 1439 self.io.set_multicast_loop_v6(on) 1440 } 1441 1442 /// Gets the value of the `IP_TTL` option for this socket. 1443 /// 1444 /// For more information about this option, see [`set_ttl`]. 1445 /// 1446 /// [`set_ttl`]: method@Self::set_ttl 1447 /// 1448 /// # Examples 1449 /// 1450 /// ```no_run 1451 /// use tokio::net::UdpSocket; 1452 /// # use std::io; 1453 /// 1454 /// # async fn dox() -> io::Result<()> { 1455 /// let sock = UdpSocket::bind("127.0.0.1:8080").await?; 1456 /// 1457 /// println!("{:?}", sock.ttl()?); 1458 /// # Ok(()) 1459 /// # } 1460 /// ``` ttl(&self) -> io::Result<u32>1461 pub fn ttl(&self) -> io::Result<u32> { 1462 self.io.ttl() 1463 } 1464 1465 /// Sets the value for the `IP_TTL` option on this socket. 1466 /// 1467 /// This value sets the time-to-live field that is used in every packet sent 1468 /// from this socket. 1469 /// 1470 /// # Examples 1471 /// 1472 /// ```no_run 1473 /// use tokio::net::UdpSocket; 1474 /// # use std::io; 1475 /// 1476 /// # async fn dox() -> io::Result<()> { 1477 /// let sock = UdpSocket::bind("127.0.0.1:8080").await?; 1478 /// sock.set_ttl(60)?; 1479 /// 1480 /// # Ok(()) 1481 /// # } 1482 /// ``` set_ttl(&self, ttl: u32) -> io::Result<()>1483 pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { 1484 self.io.set_ttl(ttl) 1485 } 1486 1487 /// Executes an operation of the `IP_ADD_MEMBERSHIP` type. 1488 /// 1489 /// This function specifies a new multicast group for this socket to join. 1490 /// The address must be a valid multicast address, and `interface` is the 1491 /// address of the local interface with which the system should join the 1492 /// multicast group. If it's equal to `INADDR_ANY` then an appropriate 1493 /// interface is chosen by the system. join_multicast_v4(&self, multiaddr: Ipv4Addr, interface: Ipv4Addr) -> io::Result<()>1494 pub fn join_multicast_v4(&self, multiaddr: Ipv4Addr, interface: Ipv4Addr) -> io::Result<()> { 1495 self.io.join_multicast_v4(&multiaddr, &interface) 1496 } 1497 1498 /// Executes an operation of the `IPV6_ADD_MEMBERSHIP` type. 1499 /// 1500 /// This function specifies a new multicast group for this socket to join. 1501 /// The address must be a valid multicast address, and `interface` is the 1502 /// index of the interface to join/leave (or 0 to indicate any interface). join_multicast_v6(&self, multiaddr: &Ipv6Addr, interface: u32) -> io::Result<()>1503 pub fn join_multicast_v6(&self, multiaddr: &Ipv6Addr, interface: u32) -> io::Result<()> { 1504 self.io.join_multicast_v6(multiaddr, interface) 1505 } 1506 1507 /// Executes an operation of the `IP_DROP_MEMBERSHIP` type. 1508 /// 1509 /// For more information about this option, see [`join_multicast_v4`]. 1510 /// 1511 /// [`join_multicast_v4`]: method@Self::join_multicast_v4 leave_multicast_v4(&self, multiaddr: Ipv4Addr, interface: Ipv4Addr) -> io::Result<()>1512 pub fn leave_multicast_v4(&self, multiaddr: Ipv4Addr, interface: Ipv4Addr) -> io::Result<()> { 1513 self.io.leave_multicast_v4(&multiaddr, &interface) 1514 } 1515 1516 /// Executes an operation of the `IPV6_DROP_MEMBERSHIP` type. 1517 /// 1518 /// For more information about this option, see [`join_multicast_v6`]. 1519 /// 1520 /// [`join_multicast_v6`]: method@Self::join_multicast_v6 leave_multicast_v6(&self, multiaddr: &Ipv6Addr, interface: u32) -> io::Result<()>1521 pub fn leave_multicast_v6(&self, multiaddr: &Ipv6Addr, interface: u32) -> io::Result<()> { 1522 self.io.leave_multicast_v6(multiaddr, interface) 1523 } 1524 1525 /// Returns the value of the `SO_ERROR` option. 1526 /// 1527 /// # Examples 1528 /// ``` 1529 /// use tokio::net::UdpSocket; 1530 /// use std::io; 1531 /// 1532 /// #[tokio::main] 1533 /// async fn main() -> io::Result<()> { 1534 /// // Create a socket 1535 /// let socket = UdpSocket::bind("0.0.0.0:8080").await?; 1536 /// 1537 /// if let Ok(Some(err)) = socket.take_error() { 1538 /// println!("Got error: {:?}", err); 1539 /// } 1540 /// 1541 /// Ok(()) 1542 /// } 1543 /// ``` take_error(&self) -> io::Result<Option<io::Error>>1544 pub fn take_error(&self) -> io::Result<Option<io::Error>> { 1545 self.io.take_error() 1546 } 1547 } 1548 1549 impl TryFrom<std::net::UdpSocket> for UdpSocket { 1550 type Error = io::Error; 1551 1552 /// Consumes stream, returning the tokio I/O object. 1553 /// 1554 /// This is equivalent to 1555 /// [`UdpSocket::from_std(stream)`](UdpSocket::from_std). try_from(stream: std::net::UdpSocket) -> Result<Self, Self::Error>1556 fn try_from(stream: std::net::UdpSocket) -> Result<Self, Self::Error> { 1557 Self::from_std(stream) 1558 } 1559 } 1560 1561 impl fmt::Debug for UdpSocket { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1562 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 1563 self.io.fmt(f) 1564 } 1565 } 1566 1567 #[cfg(all(unix))] 1568 mod sys { 1569 use super::UdpSocket; 1570 use std::os::unix::prelude::*; 1571 1572 impl AsRawFd for UdpSocket { as_raw_fd(&self) -> RawFd1573 fn as_raw_fd(&self) -> RawFd { 1574 self.io.as_raw_fd() 1575 } 1576 } 1577 } 1578 1579 #[cfg(windows)] 1580 mod sys { 1581 use super::UdpSocket; 1582 use std::os::windows::prelude::*; 1583 1584 impl AsRawSocket for UdpSocket { as_raw_socket(&self) -> RawSocket1585 fn as_raw_socket(&self) -> RawSocket { 1586 self.io.as_raw_socket() 1587 } 1588 } 1589 } 1590