1 use crate::io::{Interest, PollEvented, ReadBuf, Ready};
2 use crate::net::unix::SocketAddr;
3 
4 use std::convert::TryFrom;
5 use std::fmt;
6 use std::io;
7 use std::net::Shutdown;
8 use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
9 use std::os::unix::net;
10 use std::path::Path;
11 use std::task::{Context, Poll};
12 
13 cfg_io_util! {
14     use bytes::BufMut;
15 }
16 
17 cfg_net_unix! {
18     /// An I/O object representing a Unix datagram socket.
19     ///
20     /// A socket can be either named (associated with a filesystem path) or
21     /// unnamed.
22     ///
23     /// This type does not provide a `split` method, because this functionality
24     /// can be achieved by wrapping the socket in an [`Arc`]. Note that you do
25     /// not need a `Mutex` to share the `UnixDatagram` — an `Arc<UnixDatagram>`
26     /// is enough. This is because all of the methods take `&self` instead of
27     /// `&mut self`.
28     ///
29     /// **Note:** named sockets are persisted even after the object is dropped
30     /// and the program has exited, and cannot be reconnected. It is advised
31     /// that you either check for and unlink the existing socket if it exists,
32     /// or use a temporary file that is guaranteed to not already exist.
33     ///
34     /// [`Arc`]: std::sync::Arc
35     ///
36     /// # Examples
37     /// Using named sockets, associated with a filesystem path:
38     /// ```
39     /// # use std::error::Error;
40     /// # #[tokio::main]
41     /// # async fn main() -> Result<(), Box<dyn Error>> {
42     /// use tokio::net::UnixDatagram;
43     /// use tempfile::tempdir;
44     ///
45     /// // We use a temporary directory so that the socket
46     /// // files left by the bound sockets will get cleaned up.
47     /// let tmp = tempdir()?;
48     ///
49     /// // Bind each socket to a filesystem path
50     /// let tx_path = tmp.path().join("tx");
51     /// let tx = UnixDatagram::bind(&tx_path)?;
52     /// let rx_path = tmp.path().join("rx");
53     /// let rx = UnixDatagram::bind(&rx_path)?;
54     ///
55     /// let bytes = b"hello world";
56     /// tx.send_to(bytes, &rx_path).await?;
57     ///
58     /// let mut buf = vec![0u8; 24];
59     /// let (size, addr) = rx.recv_from(&mut buf).await?;
60     ///
61     /// let dgram = &buf[..size];
62     /// assert_eq!(dgram, bytes);
63     /// assert_eq!(addr.as_pathname().unwrap(), &tx_path);
64     ///
65     /// # Ok(())
66     /// # }
67     /// ```
68     ///
69     /// Using unnamed sockets, created as a pair
70     /// ```
71     /// # use std::error::Error;
72     /// # #[tokio::main]
73     /// # async fn main() -> Result<(), Box<dyn Error>> {
74     /// use tokio::net::UnixDatagram;
75     ///
76     /// // Create the pair of sockets
77     /// let (sock1, sock2) = UnixDatagram::pair()?;
78     ///
79     /// // Since the sockets are paired, the paired send/recv
80     /// // functions can be used
81     /// let bytes = b"hello world";
82     /// sock1.send(bytes).await?;
83     ///
84     /// let mut buff = vec![0u8; 24];
85     /// let size = sock2.recv(&mut buff).await?;
86     ///
87     /// let dgram = &buff[..size];
88     /// assert_eq!(dgram, bytes);
89     ///
90     /// # Ok(())
91     /// # }
92     /// ```
93     pub struct UnixDatagram {
94         io: PollEvented<mio::net::UnixDatagram>,
95     }
96 }
97 
98 impl UnixDatagram {
99     /// Waits for any of the requested ready states.
100     ///
101     /// This function is usually paired with `try_recv()` or `try_send()`. It
102     /// can be used to concurrently recv / send to the same socket on a single
103     /// task without splitting the socket.
104     ///
105     /// The function may complete without the socket being ready. This is a
106     /// false-positive and attempting an operation will return with
107     /// `io::ErrorKind::WouldBlock`.
108     ///
109     /// # Cancel safety
110     ///
111     /// This method is cancel safe. Once a readiness event occurs, the method
112     /// will continue to return immediately until the readiness event is
113     /// consumed by an attempt to read or write that fails with `WouldBlock` or
114     /// `Poll::Pending`.
115     ///
116     /// # Examples
117     ///
118     /// Concurrently receive from and send to the socket on the same task
119     /// without splitting.
120     ///
121     /// ```no_run
122     /// use tokio::io::Interest;
123     /// use tokio::net::UnixDatagram;
124     /// use std::io;
125     ///
126     /// #[tokio::main]
127     /// async fn main() -> io::Result<()> {
128     ///     let dir = tempfile::tempdir().unwrap();
129     ///     let client_path = dir.path().join("client.sock");
130     ///     let server_path = dir.path().join("server.sock");
131     ///     let socket = UnixDatagram::bind(&client_path)?;
132     ///     socket.connect(&server_path)?;
133     ///
134     ///     loop {
135     ///         let ready = socket.ready(Interest::READABLE | Interest::WRITABLE).await?;
136     ///
137     ///         if ready.is_readable() {
138     ///             let mut data = [0; 1024];
139     ///             match socket.try_recv(&mut data[..]) {
140     ///                 Ok(n) => {
141     ///                     println!("received {:?}", &data[..n]);
142     ///                 }
143     ///                 // False-positive, continue
144     ///                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
145     ///                 Err(e) => {
146     ///                     return Err(e);
147     ///                 }
148     ///             }
149     ///         }
150     ///
151     ///         if ready.is_writable() {
152     ///             // Write some data
153     ///             match socket.try_send(b"hello world") {
154     ///                 Ok(n) => {
155     ///                     println!("sent {} bytes", n);
156     ///                 }
157     ///                 // False-positive, continue
158     ///                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
159     ///                 Err(e) => {
160     ///                     return Err(e);
161     ///                 }
162     ///             }
163     ///         }
164     ///     }
165     /// }
166     /// ```
ready(&self, interest: Interest) -> io::Result<Ready>167     pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
168         let event = self.io.registration().readiness(interest).await?;
169         Ok(event.ready)
170     }
171 
172     /// Waits for the socket to become writable.
173     ///
174     /// This function is equivalent to `ready(Interest::WRITABLE)` and is
175     /// usually paired with `try_send()` or `try_send_to()`.
176     ///
177     /// The function may complete without the socket being writable. This is a
178     /// false-positive and attempting a `try_send()` will return with
179     /// `io::ErrorKind::WouldBlock`.
180     ///
181     /// # Cancel safety
182     ///
183     /// This method is cancel safe. Once a readiness event occurs, the method
184     /// will continue to return immediately until the readiness event is
185     /// consumed by an attempt to write that fails with `WouldBlock` or
186     /// `Poll::Pending`.
187     ///
188     /// # Examples
189     ///
190     /// ```no_run
191     /// use tokio::net::UnixDatagram;
192     /// use std::io;
193     ///
194     /// #[tokio::main]
195     /// async fn main() -> io::Result<()> {
196     ///     let dir = tempfile::tempdir().unwrap();
197     ///     let client_path = dir.path().join("client.sock");
198     ///     let server_path = dir.path().join("server.sock");
199     ///     let socket = UnixDatagram::bind(&client_path)?;
200     ///     socket.connect(&server_path)?;
201     ///
202     ///     loop {
203     ///         // Wait for the socket to be writable
204     ///         socket.writable().await?;
205     ///
206     ///         // Try to send data, this may still fail with `WouldBlock`
207     ///         // if the readiness event is a false positive.
208     ///         match socket.try_send(b"hello world") {
209     ///             Ok(n) => {
210     ///                 break;
211     ///             }
212     ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
213     ///                 continue;
214     ///             }
215     ///             Err(e) => {
216     ///                 return Err(e);
217     ///             }
218     ///         }
219     ///     }
220     ///
221     ///     Ok(())
222     /// }
223     /// ```
writable(&self) -> io::Result<()>224     pub async fn writable(&self) -> io::Result<()> {
225         self.ready(Interest::WRITABLE).await?;
226         Ok(())
227     }
228 
229     /// Polls for write/send readiness.
230     ///
231     /// If the socket is not currently ready for sending, this method will
232     /// store a clone of the `Waker` from the provided `Context`. When the socket
233     /// becomes ready for sending, `Waker::wake` will be called on the
234     /// waker.
235     ///
236     /// Note that on multiple calls to `poll_send_ready` or `poll_send`, only
237     /// the `Waker` from the `Context` passed to the most recent call is
238     /// scheduled to receive a wakeup. (However, `poll_recv_ready` retains a
239     /// second, independent waker.)
240     ///
241     /// This function is intended for cases where creating and pinning a future
242     /// via [`writable`] is not feasible. Where possible, using [`writable`] is
243     /// preferred, as this supports polling from multiple tasks at once.
244     ///
245     /// # Return value
246     ///
247     /// The function returns:
248     ///
249     /// * `Poll::Pending` if the socket is not ready for writing.
250     /// * `Poll::Ready(Ok(()))` if the socket is ready for writing.
251     /// * `Poll::Ready(Err(e))` if an error is encountered.
252     ///
253     /// # Errors
254     ///
255     /// This function may encounter any standard I/O error except `WouldBlock`.
256     ///
257     /// [`writable`]: method@Self::writable
poll_send_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>258     pub fn poll_send_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
259         self.io.registration().poll_write_ready(cx).map_ok(|_| ())
260     }
261 
262     /// Waits for the socket to become readable.
263     ///
264     /// This function is equivalent to `ready(Interest::READABLE)` and is usually
265     /// paired with `try_recv()`.
266     ///
267     /// The function may complete without the socket being readable. This is a
268     /// false-positive and attempting a `try_recv()` will return with
269     /// `io::ErrorKind::WouldBlock`.
270     ///
271     /// # Cancel safety
272     ///
273     /// This method is cancel safe. Once a readiness event occurs, the method
274     /// will continue to return immediately until the readiness event is
275     /// consumed by an attempt to read that fails with `WouldBlock` or
276     /// `Poll::Pending`.
277     ///
278     /// # Examples
279     ///
280     /// ```no_run
281     /// use tokio::net::UnixDatagram;
282     /// use std::io;
283     ///
284     /// #[tokio::main]
285     /// async fn main() -> io::Result<()> {
286     ///     // Connect to a peer
287     ///     let dir = tempfile::tempdir().unwrap();
288     ///     let client_path = dir.path().join("client.sock");
289     ///     let server_path = dir.path().join("server.sock");
290     ///     let socket = UnixDatagram::bind(&client_path)?;
291     ///     socket.connect(&server_path)?;
292     ///
293     ///     loop {
294     ///         // Wait for the socket to be readable
295     ///         socket.readable().await?;
296     ///
297     ///         // The buffer is **not** included in the async task and will
298     ///         // only exist on the stack.
299     ///         let mut buf = [0; 1024];
300     ///
301     ///         // Try to recv data, this may still fail with `WouldBlock`
302     ///         // if the readiness event is a false positive.
303     ///         match socket.try_recv(&mut buf) {
304     ///             Ok(n) => {
305     ///                 println!("GOT {:?}", &buf[..n]);
306     ///                 break;
307     ///             }
308     ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
309     ///                 continue;
310     ///             }
311     ///             Err(e) => {
312     ///                 return Err(e);
313     ///             }
314     ///         }
315     ///     }
316     ///
317     ///     Ok(())
318     /// }
319     /// ```
readable(&self) -> io::Result<()>320     pub async fn readable(&self) -> io::Result<()> {
321         self.ready(Interest::READABLE).await?;
322         Ok(())
323     }
324 
325     /// Polls for read/receive readiness.
326     ///
327     /// If the socket is not currently ready for receiving, this method will
328     /// store a clone of the `Waker` from the provided `Context`. When the
329     /// socket becomes ready for reading, `Waker::wake` will be called on the
330     /// waker.
331     ///
332     /// Note that on multiple calls to `poll_recv_ready`, `poll_recv` or
333     /// `poll_peek`, only the `Waker` from the `Context` passed to the most
334     /// recent call is scheduled to receive a wakeup. (However,
335     /// `poll_send_ready` retains a second, independent waker.)
336     ///
337     /// This function is intended for cases where creating and pinning a future
338     /// via [`readable`] is not feasible. Where possible, using [`readable`] is
339     /// preferred, as this supports polling from multiple tasks at once.
340     ///
341     /// # Return value
342     ///
343     /// The function returns:
344     ///
345     /// * `Poll::Pending` if the socket is not ready for reading.
346     /// * `Poll::Ready(Ok(()))` if the socket is ready for reading.
347     /// * `Poll::Ready(Err(e))` if an error is encountered.
348     ///
349     /// # Errors
350     ///
351     /// This function may encounter any standard I/O error except `WouldBlock`.
352     ///
353     /// [`readable`]: method@Self::readable
poll_recv_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>354     pub fn poll_recv_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
355         self.io.registration().poll_read_ready(cx).map_ok(|_| ())
356     }
357 
358     /// Creates a new `UnixDatagram` bound to the specified path.
359     ///
360     /// # Examples
361     /// ```
362     /// # use std::error::Error;
363     /// # #[tokio::main]
364     /// # async fn main() -> Result<(), Box<dyn Error>> {
365     /// use tokio::net::UnixDatagram;
366     /// use tempfile::tempdir;
367     ///
368     /// // We use a temporary directory so that the socket
369     /// // files left by the bound sockets will get cleaned up.
370     /// let tmp = tempdir()?;
371     ///
372     /// // Bind the socket to a filesystem path
373     /// let socket_path = tmp.path().join("socket");
374     /// let socket = UnixDatagram::bind(&socket_path)?;
375     ///
376     /// # Ok(())
377     /// # }
378     /// ```
bind<P>(path: P) -> io::Result<UnixDatagram> where P: AsRef<Path>,379     pub fn bind<P>(path: P) -> io::Result<UnixDatagram>
380     where
381         P: AsRef<Path>,
382     {
383         let socket = mio::net::UnixDatagram::bind(path)?;
384         UnixDatagram::new(socket)
385     }
386 
387     /// Creates an unnamed pair of connected sockets.
388     ///
389     /// This function will create a pair of interconnected Unix sockets for
390     /// communicating back and forth between one another.
391     ///
392     /// # Examples
393     /// ```
394     /// # use std::error::Error;
395     /// # #[tokio::main]
396     /// # async fn main() -> Result<(), Box<dyn Error>> {
397     /// use tokio::net::UnixDatagram;
398     ///
399     /// // Create the pair of sockets
400     /// let (sock1, sock2) = UnixDatagram::pair()?;
401     ///
402     /// // Since the sockets are paired, the paired send/recv
403     /// // functions can be used
404     /// let bytes = b"hail eris";
405     /// sock1.send(bytes).await?;
406     ///
407     /// let mut buff = vec![0u8; 24];
408     /// let size = sock2.recv(&mut buff).await?;
409     ///
410     /// let dgram = &buff[..size];
411     /// assert_eq!(dgram, bytes);
412     ///
413     /// # Ok(())
414     /// # }
415     /// ```
pair() -> io::Result<(UnixDatagram, UnixDatagram)>416     pub fn pair() -> io::Result<(UnixDatagram, UnixDatagram)> {
417         let (a, b) = mio::net::UnixDatagram::pair()?;
418         let a = UnixDatagram::new(a)?;
419         let b = UnixDatagram::new(b)?;
420 
421         Ok((a, b))
422     }
423 
424     /// Creates new `UnixDatagram` from a `std::os::unix::net::UnixDatagram`.
425     ///
426     /// This function is intended to be used to wrap a UnixDatagram from the
427     /// standard library in the Tokio equivalent. The conversion assumes
428     /// nothing about the underlying datagram; it is left up to the user to set
429     /// it in non-blocking mode.
430     ///
431     /// # Panics
432     ///
433     /// This function panics if thread-local runtime is not set.
434     ///
435     /// The runtime is usually set implicitly when this function is called
436     /// from a future driven by a Tokio runtime, otherwise runtime can be set
437     /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
438     /// # Examples
439     /// ```
440     /// # use std::error::Error;
441     /// # #[tokio::main]
442     /// # async fn main() -> Result<(), Box<dyn Error>> {
443     /// use tokio::net::UnixDatagram;
444     /// use std::os::unix::net::UnixDatagram as StdUDS;
445     /// use tempfile::tempdir;
446     ///
447     /// // We use a temporary directory so that the socket
448     /// // files left by the bound sockets will get cleaned up.
449     /// let tmp = tempdir()?;
450     ///
451     /// // Bind the socket to a filesystem path
452     /// let socket_path = tmp.path().join("socket");
453     /// let std_socket = StdUDS::bind(&socket_path)?;
454     /// std_socket.set_nonblocking(true)?;
455     /// let tokio_socket = UnixDatagram::from_std(std_socket)?;
456     ///
457     /// # Ok(())
458     /// # }
459     /// ```
from_std(datagram: net::UnixDatagram) -> io::Result<UnixDatagram>460     pub fn from_std(datagram: net::UnixDatagram) -> io::Result<UnixDatagram> {
461         let socket = mio::net::UnixDatagram::from_std(datagram);
462         let io = PollEvented::new(socket)?;
463         Ok(UnixDatagram { io })
464     }
465 
466     /// Turns a [`tokio::net::UnixDatagram`] into a [`std::os::unix::net::UnixDatagram`].
467     ///
468     /// The returned [`std::os::unix::net::UnixDatagram`] will have nonblocking
469     /// mode set as `true`.  Use [`set_nonblocking`] to change the blocking mode
470     /// if needed.
471     ///
472     /// # Examples
473     ///
474     /// ```rust,no_run
475     /// use std::error::Error;
476     ///
477     /// #[tokio::main]
478     /// async fn main() -> Result<(), Box<dyn Error>> {
479     ///     let tokio_socket = tokio::net::UnixDatagram::bind("127.0.0.1:0")?;
480     ///     let std_socket = tokio_socket.into_std()?;
481     ///     std_socket.set_nonblocking(false)?;
482     ///     Ok(())
483     /// }
484     /// ```
485     ///
486     /// [`tokio::net::UnixDatagram`]: UnixDatagram
487     /// [`std::os::unix::net::UnixDatagram`]: std::os::unix::net::UnixDatagram
488     /// [`set_nonblocking`]: fn@std::os::unix::net::UnixDatagram::set_nonblocking
into_std(self) -> io::Result<std::os::unix::net::UnixDatagram>489     pub fn into_std(self) -> io::Result<std::os::unix::net::UnixDatagram> {
490         self.io
491             .into_inner()
492             .map(|io| io.into_raw_fd())
493             .map(|raw_fd| unsafe { std::os::unix::net::UnixDatagram::from_raw_fd(raw_fd) })
494     }
495 
new(socket: mio::net::UnixDatagram) -> io::Result<UnixDatagram>496     fn new(socket: mio::net::UnixDatagram) -> io::Result<UnixDatagram> {
497         let io = PollEvented::new(socket)?;
498         Ok(UnixDatagram { io })
499     }
500 
501     /// Creates a new `UnixDatagram` which is not bound to any address.
502     ///
503     /// # Examples
504     /// ```
505     /// # use std::error::Error;
506     /// # #[tokio::main]
507     /// # async fn main() -> Result<(), Box<dyn Error>> {
508     /// use tokio::net::UnixDatagram;
509     /// use tempfile::tempdir;
510     ///
511     /// // Create an unbound socket
512     /// let tx = UnixDatagram::unbound()?;
513     ///
514     /// // Create another, bound socket
515     /// let tmp = tempdir()?;
516     /// let rx_path = tmp.path().join("rx");
517     /// let rx = UnixDatagram::bind(&rx_path)?;
518     ///
519     /// // Send to the bound socket
520     /// let bytes = b"hello world";
521     /// tx.send_to(bytes, &rx_path).await?;
522     ///
523     /// let mut buf = vec![0u8; 24];
524     /// let (size, addr) = rx.recv_from(&mut buf).await?;
525     ///
526     /// let dgram = &buf[..size];
527     /// assert_eq!(dgram, bytes);
528     ///
529     /// # Ok(())
530     /// # }
531     /// ```
unbound() -> io::Result<UnixDatagram>532     pub fn unbound() -> io::Result<UnixDatagram> {
533         let socket = mio::net::UnixDatagram::unbound()?;
534         UnixDatagram::new(socket)
535     }
536 
537     /// Connects the socket to the specified address.
538     ///
539     /// The `send` method may be used to send data to the specified address.
540     /// `recv` and `recv_from` will only receive data from that address.
541     ///
542     /// # Examples
543     /// ```
544     /// # use std::error::Error;
545     /// # #[tokio::main]
546     /// # async fn main() -> Result<(), Box<dyn Error>> {
547     /// use tokio::net::UnixDatagram;
548     /// use tempfile::tempdir;
549     ///
550     /// // Create an unbound socket
551     /// let tx = UnixDatagram::unbound()?;
552     ///
553     /// // Create another, bound socket
554     /// let tmp = tempdir()?;
555     /// let rx_path = tmp.path().join("rx");
556     /// let rx = UnixDatagram::bind(&rx_path)?;
557     ///
558     /// // Connect to the bound socket
559     /// tx.connect(&rx_path)?;
560     ///
561     /// // Send to the bound socket
562     /// let bytes = b"hello world";
563     /// tx.send(bytes).await?;
564     ///
565     /// let mut buf = vec![0u8; 24];
566     /// let (size, addr) = rx.recv_from(&mut buf).await?;
567     ///
568     /// let dgram = &buf[..size];
569     /// assert_eq!(dgram, bytes);
570     ///
571     /// # Ok(())
572     /// # }
573     /// ```
connect<P: AsRef<Path>>(&self, path: P) -> io::Result<()>574     pub fn connect<P: AsRef<Path>>(&self, path: P) -> io::Result<()> {
575         self.io.connect(path)
576     }
577 
578     /// Sends data on the socket to the socket's peer.
579     ///
580     /// # Cancel safety
581     ///
582     /// This method is cancel safe. If `send` is used as the event in a
583     /// [`tokio::select!`](crate::select) statement and some other branch
584     /// completes first, then it is guaranteed that the message was not sent.
585     ///
586     /// # Examples
587     /// ```
588     /// # use std::error::Error;
589     /// # #[tokio::main]
590     /// # async fn main() -> Result<(), Box<dyn Error>> {
591     /// use tokio::net::UnixDatagram;
592     ///
593     /// // Create the pair of sockets
594     /// let (sock1, sock2) = UnixDatagram::pair()?;
595     ///
596     /// // Since the sockets are paired, the paired send/recv
597     /// // functions can be used
598     /// let bytes = b"hello world";
599     /// sock1.send(bytes).await?;
600     ///
601     /// let mut buff = vec![0u8; 24];
602     /// let size = sock2.recv(&mut buff).await?;
603     ///
604     /// let dgram = &buff[..size];
605     /// assert_eq!(dgram, bytes);
606     ///
607     /// # Ok(())
608     /// # }
609     /// ```
send(&self, buf: &[u8]) -> io::Result<usize>610     pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
611         self.io
612             .registration()
613             .async_io(Interest::WRITABLE, || self.io.send(buf))
614             .await
615     }
616 
617     /// Tries to send a datagram to the peer without waiting.
618     ///
619     /// # Examples
620     ///
621     /// ```no_run
622     /// use tokio::net::UnixDatagram;
623     /// use std::io;
624     ///
625     /// #[tokio::main]
626     /// async fn main() -> io::Result<()> {
627     ///     let dir = tempfile::tempdir().unwrap();
628     ///     let client_path = dir.path().join("client.sock");
629     ///     let server_path = dir.path().join("server.sock");
630     ///     let socket = UnixDatagram::bind(&client_path)?;
631     ///     socket.connect(&server_path)?;
632     ///
633     ///     loop {
634     ///         // Wait for the socket to be writable
635     ///         socket.writable().await?;
636     ///
637     ///         // Try to send data, this may still fail with `WouldBlock`
638     ///         // if the readiness event is a false positive.
639     ///         match socket.try_send(b"hello world") {
640     ///             Ok(n) => {
641     ///                 break;
642     ///             }
643     ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
644     ///                 continue;
645     ///             }
646     ///             Err(e) => {
647     ///                 return Err(e);
648     ///             }
649     ///         }
650     ///     }
651     ///
652     ///     Ok(())
653     /// }
654     /// ```
try_send(&self, buf: &[u8]) -> io::Result<usize>655     pub fn try_send(&self, buf: &[u8]) -> io::Result<usize> {
656         self.io
657             .registration()
658             .try_io(Interest::WRITABLE, || self.io.send(buf))
659     }
660 
661     /// Tries to send a datagram to the peer without waiting.
662     ///
663     /// # Examples
664     ///
665     /// ```no_run
666     /// use tokio::net::UnixDatagram;
667     /// use std::io;
668     ///
669     /// #[tokio::main]
670     /// async fn main() -> io::Result<()> {
671     ///     let dir = tempfile::tempdir().unwrap();
672     ///     let client_path = dir.path().join("client.sock");
673     ///     let server_path = dir.path().join("server.sock");
674     ///     let socket = UnixDatagram::bind(&client_path)?;
675     ///
676     ///     loop {
677     ///         // Wait for the socket to be writable
678     ///         socket.writable().await?;
679     ///
680     ///         // Try to send data, this may still fail with `WouldBlock`
681     ///         // if the readiness event is a false positive.
682     ///         match socket.try_send_to(b"hello world", &server_path) {
683     ///             Ok(n) => {
684     ///                 break;
685     ///             }
686     ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
687     ///                 continue;
688     ///             }
689     ///             Err(e) => {
690     ///                 return Err(e);
691     ///             }
692     ///         }
693     ///     }
694     ///
695     ///     Ok(())
696     /// }
697     /// ```
try_send_to<P>(&self, buf: &[u8], target: P) -> io::Result<usize> where P: AsRef<Path>,698     pub fn try_send_to<P>(&self, buf: &[u8], target: P) -> io::Result<usize>
699     where
700         P: AsRef<Path>,
701     {
702         self.io
703             .registration()
704             .try_io(Interest::WRITABLE, || self.io.send_to(buf, target))
705     }
706 
707     /// Receives data from the socket.
708     ///
709     /// # Cancel safety
710     ///
711     /// This method is cancel safe. If `recv` is used as the event in a
712     /// [`tokio::select!`](crate::select) statement and some other branch
713     /// completes first, it is guaranteed that no messages were received on this
714     /// socket.
715     ///
716     /// # Examples
717     /// ```
718     /// # use std::error::Error;
719     /// # #[tokio::main]
720     /// # async fn main() -> Result<(), Box<dyn Error>> {
721     /// use tokio::net::UnixDatagram;
722     ///
723     /// // Create the pair of sockets
724     /// let (sock1, sock2) = UnixDatagram::pair()?;
725     ///
726     /// // Since the sockets are paired, the paired send/recv
727     /// // functions can be used
728     /// let bytes = b"hello world";
729     /// sock1.send(bytes).await?;
730     ///
731     /// let mut buff = vec![0u8; 24];
732     /// let size = sock2.recv(&mut buff).await?;
733     ///
734     /// let dgram = &buff[..size];
735     /// assert_eq!(dgram, bytes);
736     ///
737     /// # Ok(())
738     /// # }
739     /// ```
recv(&self, buf: &mut [u8]) -> io::Result<usize>740     pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
741         self.io
742             .registration()
743             .async_io(Interest::READABLE, || self.io.recv(buf))
744             .await
745     }
746 
747     /// Tries to receive a datagram from the peer without waiting.
748     ///
749     /// # Examples
750     ///
751     /// ```no_run
752     /// use tokio::net::UnixDatagram;
753     /// use std::io;
754     ///
755     /// #[tokio::main]
756     /// async fn main() -> io::Result<()> {
757     ///     // Connect to a peer
758     ///     let dir = tempfile::tempdir().unwrap();
759     ///     let client_path = dir.path().join("client.sock");
760     ///     let server_path = dir.path().join("server.sock");
761     ///     let socket = UnixDatagram::bind(&client_path)?;
762     ///     socket.connect(&server_path)?;
763     ///
764     ///     loop {
765     ///         // Wait for the socket to be readable
766     ///         socket.readable().await?;
767     ///
768     ///         // The buffer is **not** included in the async task and will
769     ///         // only exist on the stack.
770     ///         let mut buf = [0; 1024];
771     ///
772     ///         // Try to recv data, this may still fail with `WouldBlock`
773     ///         // if the readiness event is a false positive.
774     ///         match socket.try_recv(&mut buf) {
775     ///             Ok(n) => {
776     ///                 println!("GOT {:?}", &buf[..n]);
777     ///                 break;
778     ///             }
779     ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
780     ///                 continue;
781     ///             }
782     ///             Err(e) => {
783     ///                 return Err(e);
784     ///             }
785     ///         }
786     ///     }
787     ///
788     ///     Ok(())
789     /// }
790     /// ```
try_recv(&self, buf: &mut [u8]) -> io::Result<usize>791     pub fn try_recv(&self, buf: &mut [u8]) -> io::Result<usize> {
792         self.io
793             .registration()
794             .try_io(Interest::READABLE, || self.io.recv(buf))
795     }
796 
797     cfg_io_util! {
798         /// Tries to receive data from the socket without waiting.
799         ///
800         /// # Examples
801         ///
802         /// ```no_run
803         /// use tokio::net::UnixDatagram;
804         /// use std::io;
805         ///
806         /// #[tokio::main]
807         /// async fn main() -> io::Result<()> {
808         ///     // Connect to a peer
809         ///     let dir = tempfile::tempdir().unwrap();
810         ///     let client_path = dir.path().join("client.sock");
811         ///     let server_path = dir.path().join("server.sock");
812         ///     let socket = UnixDatagram::bind(&client_path)?;
813         ///
814         ///     loop {
815         ///         // Wait for the socket to be readable
816         ///         socket.readable().await?;
817         ///
818         ///         let mut buf = Vec::with_capacity(1024);
819         ///
820         ///         // Try to recv data, this may still fail with `WouldBlock`
821         ///         // if the readiness event is a false positive.
822         ///         match socket.try_recv_buf_from(&mut buf) {
823         ///             Ok((n, _addr)) => {
824         ///                 println!("GOT {:?}", &buf[..n]);
825         ///                 break;
826         ///             }
827         ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
828         ///                 continue;
829         ///             }
830         ///             Err(e) => {
831         ///                 return Err(e);
832         ///             }
833         ///         }
834         ///     }
835         ///
836         ///     Ok(())
837         /// }
838         /// ```
839         pub fn try_recv_buf_from<B: BufMut>(&self, buf: &mut B) -> io::Result<(usize, SocketAddr)> {
840             let (n, addr) = self.io.registration().try_io(Interest::READABLE, || {
841                 let dst = buf.chunk_mut();
842                 let dst =
843                     unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
844 
845                 // Safety: We trust `UnixDatagram::recv_from` to have filled up `n` bytes in the
846                 // buffer.
847                 let (n, addr) = (&*self.io).recv_from(dst)?;
848 
849                 unsafe {
850                     buf.advance_mut(n);
851                 }
852 
853                 Ok((n, addr))
854             })?;
855 
856             Ok((n, SocketAddr(addr)))
857         }
858 
859         /// Tries to read data from the stream into the provided buffer, advancing the
860         /// buffer's internal cursor, returning how many bytes were read.
861         ///
862         /// # Examples
863         ///
864         /// ```no_run
865         /// use tokio::net::UnixDatagram;
866         /// use std::io;
867         ///
868         /// #[tokio::main]
869         /// async fn main() -> io::Result<()> {
870         ///     // Connect to a peer
871         ///     let dir = tempfile::tempdir().unwrap();
872         ///     let client_path = dir.path().join("client.sock");
873         ///     let server_path = dir.path().join("server.sock");
874         ///     let socket = UnixDatagram::bind(&client_path)?;
875         ///     socket.connect(&server_path)?;
876         ///
877         ///     loop {
878         ///         // Wait for the socket to be readable
879         ///         socket.readable().await?;
880         ///
881         ///         let mut buf = Vec::with_capacity(1024);
882         ///
883         ///         // Try to recv data, this may still fail with `WouldBlock`
884         ///         // if the readiness event is a false positive.
885         ///         match socket.try_recv_buf(&mut buf) {
886         ///             Ok(n) => {
887         ///                 println!("GOT {:?}", &buf[..n]);
888         ///                 break;
889         ///             }
890         ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
891         ///                 continue;
892         ///             }
893         ///             Err(e) => {
894         ///                 return Err(e);
895         ///             }
896         ///         }
897         ///     }
898         ///
899         ///     Ok(())
900         /// }
901         /// ```
902         pub fn try_recv_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
903             self.io.registration().try_io(Interest::READABLE, || {
904                 let dst = buf.chunk_mut();
905                 let dst =
906                     unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
907 
908                 // Safety: We trust `UnixDatagram::recv` to have filled up `n` bytes in the
909                 // buffer.
910                 let n = (&*self.io).recv(dst)?;
911 
912                 unsafe {
913                     buf.advance_mut(n);
914                 }
915 
916                 Ok(n)
917             })
918         }
919     }
920 
921     /// Sends data on the socket to the specified address.
922     ///
923     /// # Cancel safety
924     ///
925     /// This method is cancel safe. If `send_to` is used as the event in a
926     /// [`tokio::select!`](crate::select) statement and some other branch
927     /// completes first, then it is guaranteed that the message was not sent.
928     ///
929     /// # Examples
930     /// ```
931     /// # use std::error::Error;
932     /// # #[tokio::main]
933     /// # async fn main() -> Result<(), Box<dyn Error>> {
934     /// use tokio::net::UnixDatagram;
935     /// use tempfile::tempdir;
936     ///
937     /// // We use a temporary directory so that the socket
938     /// // files left by the bound sockets will get cleaned up.
939     /// let tmp = tempdir()?;
940     ///
941     /// // Bind each socket to a filesystem path
942     /// let tx_path = tmp.path().join("tx");
943     /// let tx = UnixDatagram::bind(&tx_path)?;
944     /// let rx_path = tmp.path().join("rx");
945     /// let rx = UnixDatagram::bind(&rx_path)?;
946     ///
947     /// let bytes = b"hello world";
948     /// tx.send_to(bytes, &rx_path).await?;
949     ///
950     /// let mut buf = vec![0u8; 24];
951     /// let (size, addr) = rx.recv_from(&mut buf).await?;
952     ///
953     /// let dgram = &buf[..size];
954     /// assert_eq!(dgram, bytes);
955     /// assert_eq!(addr.as_pathname().unwrap(), &tx_path);
956     ///
957     /// # Ok(())
958     /// # }
959     /// ```
send_to<P>(&self, buf: &[u8], target: P) -> io::Result<usize> where P: AsRef<Path>,960     pub async fn send_to<P>(&self, buf: &[u8], target: P) -> io::Result<usize>
961     where
962         P: AsRef<Path>,
963     {
964         self.io
965             .registration()
966             .async_io(Interest::WRITABLE, || self.io.send_to(buf, target.as_ref()))
967             .await
968     }
969 
970     /// Receives data from the socket.
971     ///
972     /// # Cancel safety
973     ///
974     /// This method is cancel safe. If `recv_from` is used as the event in a
975     /// [`tokio::select!`](crate::select) statement and some other branch
976     /// completes first, it is guaranteed that no messages were received on this
977     /// socket.
978     ///
979     /// # Examples
980     /// ```
981     /// # use std::error::Error;
982     /// # #[tokio::main]
983     /// # async fn main() -> Result<(), Box<dyn Error>> {
984     /// use tokio::net::UnixDatagram;
985     /// use tempfile::tempdir;
986     ///
987     /// // We use a temporary directory so that the socket
988     /// // files left by the bound sockets will get cleaned up.
989     /// let tmp = tempdir()?;
990     ///
991     /// // Bind each socket to a filesystem path
992     /// let tx_path = tmp.path().join("tx");
993     /// let tx = UnixDatagram::bind(&tx_path)?;
994     /// let rx_path = tmp.path().join("rx");
995     /// let rx = UnixDatagram::bind(&rx_path)?;
996     ///
997     /// let bytes = b"hello world";
998     /// tx.send_to(bytes, &rx_path).await?;
999     ///
1000     /// let mut buf = vec![0u8; 24];
1001     /// let (size, addr) = rx.recv_from(&mut buf).await?;
1002     ///
1003     /// let dgram = &buf[..size];
1004     /// assert_eq!(dgram, bytes);
1005     /// assert_eq!(addr.as_pathname().unwrap(), &tx_path);
1006     ///
1007     /// # Ok(())
1008     /// # }
1009     /// ```
recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)>1010     pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
1011         let (n, addr) = self
1012             .io
1013             .registration()
1014             .async_io(Interest::READABLE, || self.io.recv_from(buf))
1015             .await?;
1016 
1017         Ok((n, SocketAddr(addr)))
1018     }
1019 
1020     /// Attempts to receive a single datagram on the specified address.
1021     ///
1022     /// Note that on multiple calls to a `poll_*` method in the recv direction, only the
1023     /// `Waker` from the `Context` passed to the most recent call will be scheduled to
1024     /// receive a wakeup.
1025     ///
1026     /// # Return value
1027     ///
1028     /// The function returns:
1029     ///
1030     /// * `Poll::Pending` if the socket is not ready to read
1031     /// * `Poll::Ready(Ok(addr))` reads data from `addr` into `ReadBuf` if the socket is ready
1032     /// * `Poll::Ready(Err(e))` if an error is encountered.
1033     ///
1034     /// # Errors
1035     ///
1036     /// This function may encounter any standard I/O error except `WouldBlock`.
poll_recv_from( &self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<SocketAddr>>1037     pub fn poll_recv_from(
1038         &self,
1039         cx: &mut Context<'_>,
1040         buf: &mut ReadBuf<'_>,
1041     ) -> Poll<io::Result<SocketAddr>> {
1042         let (n, addr) = ready!(self.io.registration().poll_read_io(cx, || {
1043             // Safety: will not read the maybe uninitialized bytes.
1044             let b = unsafe {
1045                 &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
1046             };
1047 
1048             self.io.recv_from(b)
1049         }))?;
1050 
1051         // Safety: We trust `recv` to have filled up `n` bytes in the buffer.
1052         unsafe {
1053             buf.assume_init(n);
1054         }
1055         buf.advance(n);
1056         Poll::Ready(Ok(SocketAddr(addr)))
1057     }
1058 
1059     /// Attempts to send data to the specified address.
1060     ///
1061     /// Note that on multiple calls to a `poll_*` method in the send direction, only the
1062     /// `Waker` from the `Context` passed to the most recent call will be scheduled to
1063     /// receive a wakeup.
1064     ///
1065     /// # Return value
1066     ///
1067     /// The function returns:
1068     ///
1069     /// * `Poll::Pending` if the socket is not ready to write
1070     /// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent.
1071     /// * `Poll::Ready(Err(e))` if an error is encountered.
1072     ///
1073     /// # Errors
1074     ///
1075     /// This function may encounter any standard I/O error except `WouldBlock`.
poll_send_to<P>( &self, cx: &mut Context<'_>, buf: &[u8], target: P, ) -> Poll<io::Result<usize>> where P: AsRef<Path>,1076     pub fn poll_send_to<P>(
1077         &self,
1078         cx: &mut Context<'_>,
1079         buf: &[u8],
1080         target: P,
1081     ) -> Poll<io::Result<usize>>
1082     where
1083         P: AsRef<Path>,
1084     {
1085         self.io
1086             .registration()
1087             .poll_write_io(cx, || self.io.send_to(buf, target.as_ref()))
1088     }
1089 
1090     /// Attempts to send data on the socket to the remote address to which it
1091     /// was previously `connect`ed.
1092     ///
1093     /// The [`connect`] method will connect this socket to a remote address.
1094     /// This method will fail if the socket is not connected.
1095     ///
1096     /// Note that on multiple calls to a `poll_*` method in the send direction,
1097     /// only the `Waker` from the `Context` passed to the most recent call will
1098     /// be scheduled to receive a wakeup.
1099     ///
1100     /// # Return value
1101     ///
1102     /// The function returns:
1103     ///
1104     /// * `Poll::Pending` if the socket is not available to write
1105     /// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent
1106     /// * `Poll::Ready(Err(e))` if an error is encountered.
1107     ///
1108     /// # Errors
1109     ///
1110     /// This function may encounter any standard I/O error except `WouldBlock`.
1111     ///
1112     /// [`connect`]: method@Self::connect
poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>>1113     pub fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
1114         self.io
1115             .registration()
1116             .poll_write_io(cx, || self.io.send(buf))
1117     }
1118 
1119     /// Attempts to receive a single datagram message on the socket from the remote
1120     /// address to which it is `connect`ed.
1121     ///
1122     /// The [`connect`] method will connect this socket to a remote address. This method
1123     /// resolves to an error if the socket is not connected.
1124     ///
1125     /// Note that on multiple calls to a `poll_*` method in the recv direction, only the
1126     /// `Waker` from the `Context` passed to the most recent call will be scheduled to
1127     /// receive a wakeup.
1128     ///
1129     /// # Return value
1130     ///
1131     /// The function returns:
1132     ///
1133     /// * `Poll::Pending` if the socket is not ready to read
1134     /// * `Poll::Ready(Ok(()))` reads data `ReadBuf` if the socket is ready
1135     /// * `Poll::Ready(Err(e))` if an error is encountered.
1136     ///
1137     /// # Errors
1138     ///
1139     /// This function may encounter any standard I/O error except `WouldBlock`.
1140     ///
1141     /// [`connect`]: method@Self::connect
poll_recv(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>>1142     pub fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
1143         let n = ready!(self.io.registration().poll_read_io(cx, || {
1144             // Safety: will not read the maybe uninitialized bytes.
1145             let b = unsafe {
1146                 &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
1147             };
1148 
1149             self.io.recv(b)
1150         }))?;
1151 
1152         // Safety: We trust `recv` to have filled up `n` bytes in the buffer.
1153         unsafe {
1154             buf.assume_init(n);
1155         }
1156         buf.advance(n);
1157         Poll::Ready(Ok(()))
1158     }
1159 
1160     /// Tries to receive data from the socket without waiting.
1161     ///
1162     /// # Examples
1163     ///
1164     /// ```no_run
1165     /// use tokio::net::UnixDatagram;
1166     /// use std::io;
1167     ///
1168     /// #[tokio::main]
1169     /// async fn main() -> io::Result<()> {
1170     ///     // Connect to a peer
1171     ///     let dir = tempfile::tempdir().unwrap();
1172     ///     let client_path = dir.path().join("client.sock");
1173     ///     let server_path = dir.path().join("server.sock");
1174     ///     let socket = UnixDatagram::bind(&client_path)?;
1175     ///
1176     ///     loop {
1177     ///         // Wait for the socket to be readable
1178     ///         socket.readable().await?;
1179     ///
1180     ///         // The buffer is **not** included in the async task and will
1181     ///         // only exist on the stack.
1182     ///         let mut buf = [0; 1024];
1183     ///
1184     ///         // Try to recv data, this may still fail with `WouldBlock`
1185     ///         // if the readiness event is a false positive.
1186     ///         match socket.try_recv_from(&mut buf) {
1187     ///             Ok((n, _addr)) => {
1188     ///                 println!("GOT {:?}", &buf[..n]);
1189     ///                 break;
1190     ///             }
1191     ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1192     ///                 continue;
1193     ///             }
1194     ///             Err(e) => {
1195     ///                 return Err(e);
1196     ///             }
1197     ///         }
1198     ///     }
1199     ///
1200     ///     Ok(())
1201     /// }
1202     /// ```
try_recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)>1203     pub fn try_recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
1204         let (n, addr) = self
1205             .io
1206             .registration()
1207             .try_io(Interest::READABLE, || self.io.recv_from(buf))?;
1208 
1209         Ok((n, SocketAddr(addr)))
1210     }
1211 
1212     /// Tries to read or write from the socket using a user-provided IO operation.
1213     ///
1214     /// If the socket is ready, the provided closure is called. The closure
1215     /// should attempt to perform IO operation from the socket by manually
1216     /// calling the appropriate syscall. If the operation fails because the
1217     /// socket is not actually ready, then the closure should return a
1218     /// `WouldBlock` error and the readiness flag is cleared. The return value
1219     /// of the closure is then returned by `try_io`.
1220     ///
1221     /// If the socket is not ready, then the closure is not called
1222     /// and a `WouldBlock` error is returned.
1223     ///
1224     /// The closure should only return a `WouldBlock` error if it has performed
1225     /// an IO operation on the socket that failed due to the socket not being
1226     /// ready. Returning a `WouldBlock` error in any other situation will
1227     /// incorrectly clear the readiness flag, which can cause the socket to
1228     /// behave incorrectly.
1229     ///
1230     /// The closure should not perform the IO operation using any of the methods
1231     /// defined on the Tokio `UnixDatagram` type, as this will mess with the
1232     /// readiness flag and can cause the socket to behave incorrectly.
1233     ///
1234     /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
1235     ///
1236     /// [`readable()`]: UnixDatagram::readable()
1237     /// [`writable()`]: UnixDatagram::writable()
1238     /// [`ready()`]: UnixDatagram::ready()
try_io<R>( &self, interest: Interest, f: impl FnOnce() -> io::Result<R>, ) -> io::Result<R>1239     pub fn try_io<R>(
1240         &self,
1241         interest: Interest,
1242         f: impl FnOnce() -> io::Result<R>,
1243     ) -> io::Result<R> {
1244         self.io.registration().try_io(interest, f)
1245     }
1246 
1247     /// Returns the local address that this socket is bound to.
1248     ///
1249     /// # Examples
1250     /// For a socket bound to a local path
1251     /// ```
1252     /// # use std::error::Error;
1253     /// # #[tokio::main]
1254     /// # async fn main() -> Result<(), Box<dyn Error>> {
1255     /// use tokio::net::UnixDatagram;
1256     /// use tempfile::tempdir;
1257     ///
1258     /// // We use a temporary directory so that the socket
1259     /// // files left by the bound sockets will get cleaned up.
1260     /// let tmp = tempdir()?;
1261     ///
1262     /// // Bind socket to a filesystem path
1263     /// let socket_path = tmp.path().join("socket");
1264     /// let socket = UnixDatagram::bind(&socket_path)?;
1265     ///
1266     /// assert_eq!(socket.local_addr()?.as_pathname().unwrap(), &socket_path);
1267     ///
1268     /// # Ok(())
1269     /// # }
1270     /// ```
1271     ///
1272     /// For an unbound socket
1273     /// ```
1274     /// # use std::error::Error;
1275     /// # #[tokio::main]
1276     /// # async fn main() -> Result<(), Box<dyn Error>> {
1277     /// use tokio::net::UnixDatagram;
1278     ///
1279     /// // Create an unbound socket
1280     /// let socket = UnixDatagram::unbound()?;
1281     ///
1282     /// assert!(socket.local_addr()?.is_unnamed());
1283     ///
1284     /// # Ok(())
1285     /// # }
1286     /// ```
local_addr(&self) -> io::Result<SocketAddr>1287     pub fn local_addr(&self) -> io::Result<SocketAddr> {
1288         self.io.local_addr().map(SocketAddr)
1289     }
1290 
1291     /// Returns the address of this socket's peer.
1292     ///
1293     /// The `connect` method will connect the socket to a peer.
1294     ///
1295     /// # Examples
1296     /// For a peer with a local path
1297     /// ```
1298     /// # use std::error::Error;
1299     /// # #[tokio::main]
1300     /// # async fn main() -> Result<(), Box<dyn Error>> {
1301     /// use tokio::net::UnixDatagram;
1302     /// use tempfile::tempdir;
1303     ///
1304     /// // Create an unbound socket
1305     /// let tx = UnixDatagram::unbound()?;
1306     ///
1307     /// // Create another, bound socket
1308     /// let tmp = tempdir()?;
1309     /// let rx_path = tmp.path().join("rx");
1310     /// let rx = UnixDatagram::bind(&rx_path)?;
1311     ///
1312     /// // Connect to the bound socket
1313     /// tx.connect(&rx_path)?;
1314     ///
1315     /// assert_eq!(tx.peer_addr()?.as_pathname().unwrap(), &rx_path);
1316     ///
1317     /// # Ok(())
1318     /// # }
1319     /// ```
1320     ///
1321     /// For an unbound peer
1322     /// ```
1323     /// # use std::error::Error;
1324     /// # #[tokio::main]
1325     /// # async fn main() -> Result<(), Box<dyn Error>> {
1326     /// use tokio::net::UnixDatagram;
1327     ///
1328     /// // Create the pair of sockets
1329     /// let (sock1, sock2) = UnixDatagram::pair()?;
1330     ///
1331     /// assert!(sock1.peer_addr()?.is_unnamed());
1332     ///
1333     /// # Ok(())
1334     /// # }
1335     /// ```
peer_addr(&self) -> io::Result<SocketAddr>1336     pub fn peer_addr(&self) -> io::Result<SocketAddr> {
1337         self.io.peer_addr().map(SocketAddr)
1338     }
1339 
1340     /// Returns the value of the `SO_ERROR` option.
1341     ///
1342     /// # Examples
1343     /// ```
1344     /// # use std::error::Error;
1345     /// # #[tokio::main]
1346     /// # async fn main() -> Result<(), Box<dyn Error>> {
1347     /// use tokio::net::UnixDatagram;
1348     ///
1349     /// // Create an unbound socket
1350     /// let socket = UnixDatagram::unbound()?;
1351     ///
1352     /// if let Ok(Some(err)) = socket.take_error() {
1353     ///     println!("Got error: {:?}", err);
1354     /// }
1355     ///
1356     /// # Ok(())
1357     /// # }
1358     /// ```
take_error(&self) -> io::Result<Option<io::Error>>1359     pub fn take_error(&self) -> io::Result<Option<io::Error>> {
1360         self.io.take_error()
1361     }
1362 
1363     /// Shuts down the read, write, or both halves of this connection.
1364     ///
1365     /// This function will cause all pending and future I/O calls on the
1366     /// specified portions to immediately return with an appropriate value
1367     /// (see the documentation of `Shutdown`).
1368     ///
1369     /// # Examples
1370     /// ```
1371     /// # use std::error::Error;
1372     /// # #[tokio::main]
1373     /// # async fn main() -> Result<(), Box<dyn Error>> {
1374     /// use tokio::net::UnixDatagram;
1375     /// use std::net::Shutdown;
1376     ///
1377     /// // Create an unbound socket
1378     /// let (socket, other) = UnixDatagram::pair()?;
1379     ///
1380     /// socket.shutdown(Shutdown::Both)?;
1381     ///
1382     /// // NOTE: the following commented out code does NOT work as expected.
1383     /// // Due to an underlying issue, the recv call will block indefinitely.
1384     /// // See: https://github.com/tokio-rs/tokio/issues/1679
1385     /// //let mut buff = vec![0u8; 24];
1386     /// //let size = socket.recv(&mut buff).await?;
1387     /// //assert_eq!(size, 0);
1388     ///
1389     /// let send_result = socket.send(b"hello world").await;
1390     /// assert!(send_result.is_err());
1391     ///
1392     /// # Ok(())
1393     /// # }
1394     /// ```
shutdown(&self, how: Shutdown) -> io::Result<()>1395     pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
1396         self.io.shutdown(how)
1397     }
1398 }
1399 
1400 impl TryFrom<std::os::unix::net::UnixDatagram> for UnixDatagram {
1401     type Error = io::Error;
1402 
1403     /// Consumes stream, returning the Tokio I/O object.
1404     ///
1405     /// This is equivalent to
1406     /// [`UnixDatagram::from_std(stream)`](UnixDatagram::from_std).
try_from(stream: std::os::unix::net::UnixDatagram) -> Result<Self, Self::Error>1407     fn try_from(stream: std::os::unix::net::UnixDatagram) -> Result<Self, Self::Error> {
1408         Self::from_std(stream)
1409     }
1410 }
1411 
1412 impl fmt::Debug for UnixDatagram {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1413     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1414         self.io.fmt(f)
1415     }
1416 }
1417 
1418 impl AsRawFd for UnixDatagram {
as_raw_fd(&self) -> RawFd1419     fn as_raw_fd(&self) -> RawFd {
1420         self.io.as_raw_fd()
1421     }
1422 }
1423