1 //! Unix-specific networking extensions. 2 3 use std::fmt; 4 use std::os::unix::net::UnixListener as StdUnixListener; 5 use std::os::unix::net::UnixStream as StdUnixStream; 6 use std::pin::Pin; 7 8 use async_io::Async; 9 10 use super::SocketAddr; 11 use super::UnixStream; 12 use crate::io; 13 use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; 14 use crate::path::Path; 15 use crate::stream::Stream; 16 use crate::sync::Arc; 17 use crate::task::{ready, Context, Poll}; 18 19 /// A Unix domain socket server, listening for connections. 20 /// 21 /// After creating a `UnixListener` by [`bind`]ing it to a socket address, it listens for incoming 22 /// connections. These can be accepted by awaiting elements from the async stream of [`incoming`] 23 /// connections. 24 /// 25 /// The socket will be closed when the value is dropped. 26 /// 27 /// This type is an async version of [`std::os::unix::net::UnixListener`]. 28 /// 29 /// [`std::os::unix::net::UnixListener`]: 30 /// https://doc.rust-lang.org/std/os/unix/net/struct.UnixListener.html 31 /// [`bind`]: #method.bind 32 /// [`incoming`]: #method.incoming 33 /// 34 /// # Examples 35 /// 36 /// ```no_run 37 /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { 38 /// # 39 /// use async_std::os::unix::net::UnixListener; 40 /// use async_std::prelude::*; 41 /// 42 /// let listener = UnixListener::bind("/tmp/socket").await?; 43 /// let mut incoming = listener.incoming(); 44 /// 45 /// while let Some(stream) = incoming.next().await { 46 /// let mut stream = stream?; 47 /// stream.write_all(b"hello world").await?; 48 /// } 49 /// # 50 /// # Ok(()) }) } 51 /// ``` 52 pub struct UnixListener { 53 watcher: Async<StdUnixListener>, 54 } 55 56 impl UnixListener { 57 /// Creates a Unix datagram listener bound to the given path. 58 /// 59 /// # Examples 60 /// 61 /// ```no_run 62 /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { 63 /// # 64 /// use async_std::os::unix::net::UnixListener; 65 /// 66 /// let listener = UnixListener::bind("/tmp/socket").await?; 67 /// # 68 /// # Ok(()) }) } 69 /// ``` bind<P: AsRef<Path>>(path: P) -> io::Result<UnixListener>70 pub async fn bind<P: AsRef<Path>>(path: P) -> io::Result<UnixListener> { 71 let path = path.as_ref().to_owned(); 72 let listener = Async::<StdUnixListener>::bind(path)?; 73 74 Ok(UnixListener { watcher: listener }) 75 } 76 77 /// Accepts a new incoming connection to this listener. 78 /// 79 /// When a connection is established, the corresponding stream and address will be returned. 80 /// 81 /// # Examples 82 /// 83 /// ```no_run 84 /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { 85 /// # 86 /// use async_std::os::unix::net::UnixListener; 87 /// 88 /// let listener = UnixListener::bind("/tmp/socket").await?; 89 /// let (socket, addr) = listener.accept().await?; 90 /// # 91 /// # Ok(()) }) } 92 /// ``` accept(&self) -> io::Result<(UnixStream, SocketAddr)>93 pub async fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> { 94 let (stream, addr) = self.watcher.accept().await?; 95 96 Ok(( 97 UnixStream { 98 watcher: Arc::new(stream), 99 }, 100 addr, 101 )) 102 } 103 104 /// Returns a stream of incoming connections. 105 /// 106 /// Iterating over this stream is equivalent to calling [`accept`] in a loop. The stream of 107 /// connections is infinite, i.e awaiting the next connection will never result in [`None`]. 108 /// 109 /// [`accept`]: #method.accept 110 /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None 111 /// 112 /// # Examples 113 /// 114 /// ```no_run 115 /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { 116 /// # 117 /// use async_std::os::unix::net::UnixListener; 118 /// use async_std::prelude::*; 119 /// 120 /// let listener = UnixListener::bind("/tmp/socket").await?; 121 /// let mut incoming = listener.incoming(); 122 /// 123 /// while let Some(stream) = incoming.next().await { 124 /// let mut stream = stream?; 125 /// stream.write_all(b"hello world").await?; 126 /// } 127 /// # 128 /// # Ok(()) }) } 129 /// ``` incoming(&self) -> Incoming<'_>130 pub fn incoming(&self) -> Incoming<'_> { 131 Incoming { 132 incoming: Box::pin(self.watcher.incoming()), 133 } 134 } 135 136 /// Returns the local socket address of this listener. 137 /// 138 /// # Examples 139 /// 140 /// ```no_run 141 /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { 142 /// # 143 /// use async_std::os::unix::net::UnixListener; 144 /// 145 /// let listener = UnixListener::bind("/tmp/socket").await?; 146 /// let addr = listener.local_addr()?; 147 /// # 148 /// # Ok(()) }) } 149 /// ``` local_addr(&self) -> io::Result<SocketAddr>150 pub fn local_addr(&self) -> io::Result<SocketAddr> { 151 self.watcher.get_ref().local_addr() 152 } 153 } 154 155 impl fmt::Debug for UnixListener { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result156 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 157 let mut builder = f.debug_struct("UnixListener"); 158 builder.field("fd", &self.as_raw_fd()); 159 160 if let Ok(addr) = self.local_addr() { 161 builder.field("local", &addr); 162 } 163 164 builder.finish() 165 } 166 } 167 168 /// A stream of incoming Unix domain socket connections. 169 /// 170 /// This stream is infinite, i.e awaiting the next connection will never result in [`None`]. It is 171 /// created by the [`incoming`] method on [`UnixListener`]. 172 /// 173 /// This type is an async version of [`std::os::unix::net::Incoming`]. 174 /// 175 /// [`std::os::unix::net::Incoming`]: https://doc.rust-lang.org/std/os/unix/net/struct.Incoming.html 176 /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None 177 /// [`incoming`]: struct.UnixListener.html#method.incoming 178 /// [`UnixListener`]: struct.UnixListener.html 179 pub struct Incoming<'a> { 180 incoming: Pin<Box<dyn Stream<Item = io::Result<Async<StdUnixStream>>> + Send + Sync + 'a>>, 181 } 182 183 impl Stream for Incoming<'_> { 184 type Item = io::Result<UnixStream>; 185 poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>186 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 187 let res = ready!(Pin::new(&mut self.incoming).poll_next(cx)); 188 Poll::Ready(res.map(|res| res.map(|stream| UnixStream { watcher: Arc::new(stream) }))) 189 } 190 } 191 192 impl fmt::Debug for Incoming<'_> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result193 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 194 f.debug_struct("Incoming") 195 .finish() 196 } 197 } 198 199 impl From<StdUnixListener> for UnixListener { 200 /// Converts a `std::os::unix::net::UnixListener` into its asynchronous equivalent. from(listener: StdUnixListener) -> UnixListener201 fn from(listener: StdUnixListener) -> UnixListener { 202 UnixListener { 203 watcher: Async::new(listener).expect("UnixListener is known to be good"), 204 } 205 } 206 } 207 208 impl AsRawFd for UnixListener { as_raw_fd(&self) -> RawFd209 fn as_raw_fd(&self) -> RawFd { 210 self.watcher.as_raw_fd() 211 } 212 } 213 214 impl FromRawFd for UnixListener { from_raw_fd(fd: RawFd) -> UnixListener215 unsafe fn from_raw_fd(fd: RawFd) -> UnixListener { 216 let listener = std::os::unix::net::UnixListener::from_raw_fd(fd); 217 listener.into() 218 } 219 } 220 221 impl IntoRawFd for UnixListener { into_raw_fd(self) -> RawFd222 fn into_raw_fd(self) -> RawFd { 223 self.watcher.into_inner().unwrap().into_raw_fd() 224 } 225 } 226