1 //! Unix-specific networking extensions.
2 
3 use std::fmt;
4 use std::os::unix::net::UnixListener as StdUnixListener;
5 use std::os::unix::net::UnixStream as StdUnixStream;
6 use std::pin::Pin;
7 
8 use async_io::Async;
9 
10 use super::SocketAddr;
11 use super::UnixStream;
12 use crate::io;
13 use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
14 use crate::path::Path;
15 use crate::stream::Stream;
16 use crate::sync::Arc;
17 use crate::task::{ready, Context, Poll};
18 
19 /// A Unix domain socket server, listening for connections.
20 ///
21 /// After creating a `UnixListener` by [`bind`]ing it to a socket address, it listens for incoming
22 /// connections. These can be accepted by awaiting elements from the async stream of [`incoming`]
23 /// connections.
24 ///
25 /// The socket will be closed when the value is dropped.
26 ///
27 /// This type is an async version of [`std::os::unix::net::UnixListener`].
28 ///
29 /// [`std::os::unix::net::UnixListener`]:
30 /// https://doc.rust-lang.org/std/os/unix/net/struct.UnixListener.html
31 /// [`bind`]: #method.bind
32 /// [`incoming`]: #method.incoming
33 ///
34 /// # Examples
35 ///
36 /// ```no_run
37 /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
38 /// #
39 /// use async_std::os::unix::net::UnixListener;
40 /// use async_std::prelude::*;
41 ///
42 /// let listener = UnixListener::bind("/tmp/socket").await?;
43 /// let mut incoming = listener.incoming();
44 ///
45 /// while let Some(stream) = incoming.next().await {
46 ///     let mut stream = stream?;
47 ///     stream.write_all(b"hello world").await?;
48 /// }
49 /// #
50 /// # Ok(()) }) }
51 /// ```
52 pub struct UnixListener {
53     watcher: Async<StdUnixListener>,
54 }
55 
56 impl UnixListener {
57     /// Creates a Unix datagram listener bound to the given path.
58     ///
59     /// # Examples
60     ///
61     /// ```no_run
62     /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
63     /// #
64     /// use async_std::os::unix::net::UnixListener;
65     ///
66     /// let listener = UnixListener::bind("/tmp/socket").await?;
67     /// #
68     /// # Ok(()) }) }
69     /// ```
bind<P: AsRef<Path>>(path: P) -> io::Result<UnixListener>70     pub async fn bind<P: AsRef<Path>>(path: P) -> io::Result<UnixListener> {
71         let path = path.as_ref().to_owned();
72         let listener = Async::<StdUnixListener>::bind(path)?;
73 
74         Ok(UnixListener { watcher: listener })
75     }
76 
77     /// Accepts a new incoming connection to this listener.
78     ///
79     /// When a connection is established, the corresponding stream and address will be returned.
80     ///
81     /// # Examples
82     ///
83     /// ```no_run
84     /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
85     /// #
86     /// use async_std::os::unix::net::UnixListener;
87     ///
88     /// let listener = UnixListener::bind("/tmp/socket").await?;
89     /// let (socket, addr) = listener.accept().await?;
90     /// #
91     /// # Ok(()) }) }
92     /// ```
accept(&self) -> io::Result<(UnixStream, SocketAddr)>93     pub async fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> {
94         let (stream, addr) = self.watcher.accept().await?;
95 
96         Ok((
97             UnixStream {
98                 watcher: Arc::new(stream),
99             },
100             addr,
101         ))
102     }
103 
104     /// Returns a stream of incoming connections.
105     ///
106     /// Iterating over this stream is equivalent to calling [`accept`] in a loop. The stream of
107     /// connections is infinite, i.e awaiting the next connection will never result in [`None`].
108     ///
109     /// [`accept`]: #method.accept
110     /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None
111     ///
112     /// # Examples
113     ///
114     /// ```no_run
115     /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
116     /// #
117     /// use async_std::os::unix::net::UnixListener;
118     /// use async_std::prelude::*;
119     ///
120     /// let listener = UnixListener::bind("/tmp/socket").await?;
121     /// let mut incoming = listener.incoming();
122     ///
123     /// while let Some(stream) = incoming.next().await {
124     ///     let mut stream = stream?;
125     ///     stream.write_all(b"hello world").await?;
126     /// }
127     /// #
128     /// # Ok(()) }) }
129     /// ```
incoming(&self) -> Incoming<'_>130     pub fn incoming(&self) -> Incoming<'_> {
131         Incoming {
132             incoming: Box::pin(self.watcher.incoming()),
133         }
134     }
135 
136     /// Returns the local socket address of this listener.
137     ///
138     /// # Examples
139     ///
140     /// ```no_run
141     /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
142     /// #
143     /// use async_std::os::unix::net::UnixListener;
144     ///
145     /// let listener = UnixListener::bind("/tmp/socket").await?;
146     /// let addr = listener.local_addr()?;
147     /// #
148     /// # Ok(()) }) }
149     /// ```
local_addr(&self) -> io::Result<SocketAddr>150     pub fn local_addr(&self) -> io::Result<SocketAddr> {
151         self.watcher.get_ref().local_addr()
152     }
153 }
154 
155 impl fmt::Debug for UnixListener {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result156     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
157         let mut builder = f.debug_struct("UnixListener");
158         builder.field("fd", &self.as_raw_fd());
159 
160         if let Ok(addr) = self.local_addr() {
161             builder.field("local", &addr);
162         }
163 
164         builder.finish()
165     }
166 }
167 
168 /// A stream of incoming Unix domain socket connections.
169 ///
170 /// This stream is infinite, i.e awaiting the next connection will never result in [`None`]. It is
171 /// created by the [`incoming`] method on [`UnixListener`].
172 ///
173 /// This type is an async version of [`std::os::unix::net::Incoming`].
174 ///
175 /// [`std::os::unix::net::Incoming`]: https://doc.rust-lang.org/std/os/unix/net/struct.Incoming.html
176 /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None
177 /// [`incoming`]: struct.UnixListener.html#method.incoming
178 /// [`UnixListener`]: struct.UnixListener.html
179 pub struct Incoming<'a> {
180     incoming: Pin<Box<dyn Stream<Item = io::Result<Async<StdUnixStream>>> + Send + Sync + 'a>>,
181 }
182 
183 impl Stream for Incoming<'_> {
184     type Item = io::Result<UnixStream>;
185 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>186     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
187         let res = ready!(Pin::new(&mut self.incoming).poll_next(cx));
188         Poll::Ready(res.map(|res| res.map(|stream| UnixStream { watcher: Arc::new(stream) })))
189     }
190 }
191 
192 impl fmt::Debug for Incoming<'_> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result193     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
194         f.debug_struct("Incoming")
195             .finish()
196     }
197 }
198 
199 impl From<StdUnixListener> for UnixListener {
200     /// Converts a `std::os::unix::net::UnixListener` into its asynchronous equivalent.
from(listener: StdUnixListener) -> UnixListener201     fn from(listener: StdUnixListener) -> UnixListener {
202         UnixListener {
203             watcher: Async::new(listener).expect("UnixListener is known to be good"),
204         }
205     }
206 }
207 
208 impl AsRawFd for UnixListener {
as_raw_fd(&self) -> RawFd209     fn as_raw_fd(&self) -> RawFd {
210         self.watcher.as_raw_fd()
211     }
212 }
213 
214 impl FromRawFd for UnixListener {
from_raw_fd(fd: RawFd) -> UnixListener215     unsafe fn from_raw_fd(fd: RawFd) -> UnixListener {
216         let listener = std::os::unix::net::UnixListener::from_raw_fd(fd);
217         listener.into()
218     }
219 }
220 
221 impl IntoRawFd for UnixListener {
into_raw_fd(self) -> RawFd222     fn into_raw_fd(self) -> RawFd {
223         self.watcher.into_inner().unwrap().into_raw_fd()
224     }
225 }
226