1 // Copied from tokio-uds/src/stream.rs revision 25e835c5b7e2cfeb9c22b1fd576844f6814a9477 (tokio-uds 0.2.5)
2 // License MIT per upstream: https://github.com/tokio-rs/tokio/blob/master/tokio-uds/LICENSE
3 // - Removed ucred for build simplicity
4 // - Added clear_{read,write}_ready per: https://github.com/tokio-rs/tokio/pull/1294
5 
6 use tokio_io::{AsyncRead, AsyncWrite};
7 use tokio_reactor::{Handle, PollEvented};
8 
9 use bytes::{Buf, BufMut};
10 use futures::{Async, Future, Poll};
11 use iovec::{self, IoVec};
12 use mio::Ready;
13 
14 use std::fmt;
15 use std::io::{self, Read, Write};
16 use std::net::Shutdown;
17 use std::os::unix::io::{AsRawFd, RawFd};
18 use std::os::unix::net::{self, SocketAddr};
19 use std::path::Path;
20 
21 /// A structure representing a connected Unix socket.
22 ///
23 /// This socket can be connected directly with `UnixStream::connect` or accepted
24 /// from a listener with `UnixListener::incoming`. Additionally, a pair of
25 /// anonymous Unix sockets can be created with `UnixStream::pair`.
26 pub struct UnixStream {
27     io: PollEvented<mio_uds::UnixStream>,
28 }
29 
30 /// Future returned by `UnixStream::connect` which will resolve to a
31 /// `UnixStream` when the stream is connected.
32 #[derive(Debug)]
33 pub struct ConnectFuture {
34     inner: State,
35 }
36 
37 #[derive(Debug)]
38 enum State {
39     Waiting(UnixStream),
40     Error(io::Error),
41     Empty,
42 }
43 
44 impl UnixStream {
45     /// Connects to the socket named by `path`.
46     ///
47     /// This function will create a new Unix socket and connect to the path
48     /// specified, associating the returned stream with the default event loop's
49     /// handle.
connect<P>(path: P) -> ConnectFuture where P: AsRef<Path>,50     pub fn connect<P>(path: P) -> ConnectFuture
51     where
52         P: AsRef<Path>,
53     {
54         let res = mio_uds::UnixStream::connect(path).map(UnixStream::new);
55 
56         let inner = match res {
57             Ok(stream) => State::Waiting(stream),
58             Err(e) => State::Error(e),
59         };
60 
61         ConnectFuture { inner }
62     }
63 
64     /// Consumes a `UnixStream` in the standard library and returns a
65     /// nonblocking `UnixStream` from this crate.
66     ///
67     /// The returned stream will be associated with the given event loop
68     /// specified by `handle` and is ready to perform I/O.
from_std(stream: net::UnixStream, handle: &Handle) -> io::Result<UnixStream>69     pub fn from_std(stream: net::UnixStream, handle: &Handle) -> io::Result<UnixStream> {
70         let stream = mio_uds::UnixStream::from_stream(stream)?;
71         let io = PollEvented::new_with_handle(stream, handle)?;
72 
73         Ok(UnixStream { io })
74     }
75 
76     /// Creates an unnamed pair of connected sockets.
77     ///
78     /// This function will create a pair of interconnected Unix sockets for
79     /// communicating back and forth between one another. Each socket will
80     /// be associated with the default event loop's handle.
pair() -> io::Result<(UnixStream, UnixStream)>81     pub fn pair() -> io::Result<(UnixStream, UnixStream)> {
82         let (a, b) = mio_uds::UnixStream::pair()?;
83         let a = UnixStream::new(a);
84         let b = UnixStream::new(b);
85 
86         Ok((a, b))
87     }
88 
new(stream: mio_uds::UnixStream) -> UnixStream89     pub(crate) fn new(stream: mio_uds::UnixStream) -> UnixStream {
90         let io = PollEvented::new(stream);
91         UnixStream { io }
92     }
93 
94     /// Test whether this socket is ready to be read or not.
poll_read_ready(&self, ready: Ready) -> Poll<Ready, io::Error>95     pub fn poll_read_ready(&self, ready: Ready) -> Poll<Ready, io::Error> {
96         self.io.poll_read_ready(ready)
97     }
98 
99     /// Clear read ready state.
clear_read_ready(&self, ready: mio::Ready) -> io::Result<()>100     pub fn clear_read_ready(&self, ready: mio::Ready) -> io::Result<()> {
101         self.io.clear_read_ready(ready)
102     }
103 
104     /// Test whether this socket is ready to be written to or not.
poll_write_ready(&self) -> Poll<Ready, io::Error>105     pub fn poll_write_ready(&self) -> Poll<Ready, io::Error> {
106         self.io.poll_write_ready()
107     }
108 
109     /// Clear write ready state.
clear_write_ready(&self) -> io::Result<()>110     pub fn clear_write_ready(&self) -> io::Result<()> {
111         self.io.clear_write_ready()
112     }
113 
114     /// Returns the socket address of the local half of this connection.
local_addr(&self) -> io::Result<SocketAddr>115     pub fn local_addr(&self) -> io::Result<SocketAddr> {
116         self.io.get_ref().local_addr()
117     }
118 
119     /// Returns the socket address of the remote half of this connection.
peer_addr(&self) -> io::Result<SocketAddr>120     pub fn peer_addr(&self) -> io::Result<SocketAddr> {
121         self.io.get_ref().peer_addr()
122     }
123 
124     /// Returns the value of the `SO_ERROR` option.
take_error(&self) -> io::Result<Option<io::Error>>125     pub fn take_error(&self) -> io::Result<Option<io::Error>> {
126         self.io.get_ref().take_error()
127     }
128 
129     /// Shuts down the read, write, or both halves of this connection.
130     ///
131     /// This function will cause all pending and future I/O calls on the
132     /// specified portions to immediately return with an appropriate value
133     /// (see the documentation of `Shutdown`).
shutdown(&self, how: Shutdown) -> io::Result<()>134     pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
135         self.io.get_ref().shutdown(how)
136     }
137 }
138 
139 impl Read for UnixStream {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>140     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
141         self.io.read(buf)
142     }
143 }
144 
145 impl Write for UnixStream {
write(&mut self, buf: &[u8]) -> io::Result<usize>146     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
147         self.io.write(buf)
148     }
flush(&mut self) -> io::Result<()>149     fn flush(&mut self) -> io::Result<()> {
150         self.io.flush()
151     }
152 }
153 
154 impl AsyncRead for UnixStream {
prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool155     unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
156         false
157     }
158 
read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error>159     fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
160         <&UnixStream>::read_buf(&mut &*self, buf)
161     }
162 }
163 
164 impl AsyncWrite for UnixStream {
shutdown(&mut self) -> Poll<(), io::Error>165     fn shutdown(&mut self) -> Poll<(), io::Error> {
166         <&UnixStream>::shutdown(&mut &*self)
167     }
168 
write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error>169     fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
170         <&UnixStream>::write_buf(&mut &*self, buf)
171     }
172 }
173 
174 impl<'a> Read for &'a UnixStream {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>175     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
176         (&self.io).read(buf)
177     }
178 }
179 
180 impl<'a> Write for &'a UnixStream {
write(&mut self, buf: &[u8]) -> io::Result<usize>181     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
182         (&self.io).write(buf)
183     }
184 
flush(&mut self) -> io::Result<()>185     fn flush(&mut self) -> io::Result<()> {
186         (&self.io).flush()
187     }
188 }
189 
190 impl<'a> AsyncRead for &'a UnixStream {
prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool191     unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
192         false
193     }
194 
read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error>195     fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
196         if let Async::NotReady = <UnixStream>::poll_read_ready(self, Ready::readable())? {
197             return Ok(Async::NotReady);
198         }
199         unsafe {
200             let r = read_ready(buf, self.as_raw_fd());
201             if r == -1 {
202                 let e = io::Error::last_os_error();
203                 if e.kind() == io::ErrorKind::WouldBlock {
204                     self.io.clear_read_ready(Ready::readable())?;
205                     Ok(Async::NotReady)
206                 } else {
207                     Err(e)
208                 }
209             } else {
210                 let r = r as usize;
211                 buf.advance_mut(r);
212                 Ok(r.into())
213             }
214         }
215     }
216 }
217 
218 impl<'a> AsyncWrite for &'a UnixStream {
shutdown(&mut self) -> Poll<(), io::Error>219     fn shutdown(&mut self) -> Poll<(), io::Error> {
220         Ok(().into())
221     }
222 
write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error>223     fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
224         if let Async::NotReady = <UnixStream>::poll_write_ready(self)? {
225             return Ok(Async::NotReady);
226         }
227         unsafe {
228             let r = write_ready(buf, self.as_raw_fd());
229             if r == -1 {
230                 let e = io::Error::last_os_error();
231                 if e.kind() == io::ErrorKind::WouldBlock {
232                     self.io.clear_write_ready()?;
233                     Ok(Async::NotReady)
234                 } else {
235                     Err(e)
236                 }
237             } else {
238                 let r = r as usize;
239                 buf.advance(r);
240                 Ok(r.into())
241             }
242         }
243     }
244 }
245 
246 impl fmt::Debug for UnixStream {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result247     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
248         self.io.get_ref().fmt(f)
249     }
250 }
251 
252 impl AsRawFd for UnixStream {
as_raw_fd(&self) -> RawFd253     fn as_raw_fd(&self) -> RawFd {
254         self.io.get_ref().as_raw_fd()
255     }
256 }
257 
258 impl Future for ConnectFuture {
259     type Item = UnixStream;
260     type Error = io::Error;
261 
poll(&mut self) -> Poll<UnixStream, io::Error>262     fn poll(&mut self) -> Poll<UnixStream, io::Error> {
263         use std::mem;
264 
265         match self.inner {
266             State::Waiting(ref mut stream) => {
267                 if let Async::NotReady = stream.io.poll_write_ready()? {
268                     return Ok(Async::NotReady);
269                 }
270 
271                 if let Some(e) = stream.io.get_ref().take_error()? {
272                     return Err(e);
273                 }
274             }
275             State::Error(_) => {
276                 let e = match mem::replace(&mut self.inner, State::Empty) {
277                     State::Error(e) => e,
278                     _ => unreachable!(),
279                 };
280 
281                 return Err(e);
282             }
283             State::Empty => panic!("can't poll stream twice"),
284         }
285 
286         match mem::replace(&mut self.inner, State::Empty) {
287             State::Waiting(stream) => Ok(Async::Ready(stream)),
288             _ => unreachable!(),
289         }
290     }
291 }
292 
read_ready<B: BufMut>(buf: &mut B, raw_fd: RawFd) -> isize293 unsafe fn read_ready<B: BufMut>(buf: &mut B, raw_fd: RawFd) -> isize {
294     // The `IoVec` type can't have a 0-length size, so we create a bunch
295     // of dummy versions on the stack with 1 length which we'll quickly
296     // overwrite.
297     let b1: &mut [u8] = &mut [0];
298     let b2: &mut [u8] = &mut [0];
299     let b3: &mut [u8] = &mut [0];
300     let b4: &mut [u8] = &mut [0];
301     let b5: &mut [u8] = &mut [0];
302     let b6: &mut [u8] = &mut [0];
303     let b7: &mut [u8] = &mut [0];
304     let b8: &mut [u8] = &mut [0];
305     let b9: &mut [u8] = &mut [0];
306     let b10: &mut [u8] = &mut [0];
307     let b11: &mut [u8] = &mut [0];
308     let b12: &mut [u8] = &mut [0];
309     let b13: &mut [u8] = &mut [0];
310     let b14: &mut [u8] = &mut [0];
311     let b15: &mut [u8] = &mut [0];
312     let b16: &mut [u8] = &mut [0];
313     let mut bufs: [&mut IoVec; 16] = [
314         b1.into(),
315         b2.into(),
316         b3.into(),
317         b4.into(),
318         b5.into(),
319         b6.into(),
320         b7.into(),
321         b8.into(),
322         b9.into(),
323         b10.into(),
324         b11.into(),
325         b12.into(),
326         b13.into(),
327         b14.into(),
328         b15.into(),
329         b16.into(),
330     ];
331 
332     let n = buf.bytes_vec_mut(&mut bufs);
333     read_ready_vecs(&mut bufs[..n], raw_fd)
334 }
335 
read_ready_vecs(bufs: &mut [&mut IoVec], raw_fd: RawFd) -> isize336 unsafe fn read_ready_vecs(bufs: &mut [&mut IoVec], raw_fd: RawFd) -> isize {
337     let iovecs = iovec::unix::as_os_slice_mut(bufs);
338 
339     libc::readv(raw_fd, iovecs.as_ptr(), iovecs.len() as i32)
340 }
341 
write_ready<B: Buf>(buf: &mut B, raw_fd: RawFd) -> isize342 unsafe fn write_ready<B: Buf>(buf: &mut B, raw_fd: RawFd) -> isize {
343     // The `IoVec` type can't have a zero-length size, so create a dummy
344     // version from a 1-length slice which we'll overwrite with the
345     // `bytes_vec` method.
346     static DUMMY: &[u8] = &[0];
347     let iovec = <&IoVec>::from(DUMMY);
348     let mut bufs = [
349         iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec,
350         iovec, iovec, iovec,
351     ];
352 
353     let n = buf.bytes_vec(&mut bufs);
354     write_ready_vecs(&bufs[..n], raw_fd)
355 }
356 
write_ready_vecs(bufs: &[&IoVec], raw_fd: RawFd) -> isize357 unsafe fn write_ready_vecs(bufs: &[&IoVec], raw_fd: RawFd) -> isize {
358     let iovecs = iovec::unix::as_os_slice(bufs);
359 
360     libc::writev(raw_fd, iovecs.as_ptr(), iovecs.len() as i32)
361 }
362