1 // Copied from tokio-uds/src/stream.rs revision 25e835c5b7e2cfeb9c22b1fd576844f6814a9477 (tokio-uds 0.2.5)
2 // License MIT per upstream: https://github.com/tokio-rs/tokio/blob/master/tokio-uds/LICENSE
3 // - Removed ucred for build simplicity
4 // - Added clear_{read,write}_ready per: https://github.com/tokio-rs/tokio/pull/1294
5
6 use tokio_io::{AsyncRead, AsyncWrite};
7 use tokio_reactor::{Handle, PollEvented};
8
9 use bytes::{Buf, BufMut};
10 use futures::{Async, Future, Poll};
11 use iovec::{self, IoVec};
12 use mio::Ready;
13
14 use std::fmt;
15 use std::io::{self, Read, Write};
16 use std::net::Shutdown;
17 use std::os::unix::io::{AsRawFd, RawFd};
18 use std::os::unix::net::{self, SocketAddr};
19 use std::path::Path;
20
21 /// A structure representing a connected Unix socket.
22 ///
23 /// This socket can be connected directly with `UnixStream::connect` or accepted
24 /// from a listener with `UnixListener::incoming`. Additionally, a pair of
25 /// anonymous Unix sockets can be created with `UnixStream::pair`.
26 pub struct UnixStream {
27 io: PollEvented<mio_uds::UnixStream>,
28 }
29
30 /// Future returned by `UnixStream::connect` which will resolve to a
31 /// `UnixStream` when the stream is connected.
32 #[derive(Debug)]
33 pub struct ConnectFuture {
34 inner: State,
35 }
36
37 #[derive(Debug)]
38 enum State {
39 Waiting(UnixStream),
40 Error(io::Error),
41 Empty,
42 }
43
44 impl UnixStream {
45 /// Connects to the socket named by `path`.
46 ///
47 /// This function will create a new Unix socket and connect to the path
48 /// specified, associating the returned stream with the default event loop's
49 /// handle.
connect<P>(path: P) -> ConnectFuture where P: AsRef<Path>,50 pub fn connect<P>(path: P) -> ConnectFuture
51 where
52 P: AsRef<Path>,
53 {
54 let res = mio_uds::UnixStream::connect(path).map(UnixStream::new);
55
56 let inner = match res {
57 Ok(stream) => State::Waiting(stream),
58 Err(e) => State::Error(e),
59 };
60
61 ConnectFuture { inner }
62 }
63
64 /// Consumes a `UnixStream` in the standard library and returns a
65 /// nonblocking `UnixStream` from this crate.
66 ///
67 /// The returned stream will be associated with the given event loop
68 /// specified by `handle` and is ready to perform I/O.
from_std(stream: net::UnixStream, handle: &Handle) -> io::Result<UnixStream>69 pub fn from_std(stream: net::UnixStream, handle: &Handle) -> io::Result<UnixStream> {
70 let stream = mio_uds::UnixStream::from_stream(stream)?;
71 let io = PollEvented::new_with_handle(stream, handle)?;
72
73 Ok(UnixStream { io })
74 }
75
76 /// Creates an unnamed pair of connected sockets.
77 ///
78 /// This function will create a pair of interconnected Unix sockets for
79 /// communicating back and forth between one another. Each socket will
80 /// be associated with the default event loop's handle.
pair() -> io::Result<(UnixStream, UnixStream)>81 pub fn pair() -> io::Result<(UnixStream, UnixStream)> {
82 let (a, b) = mio_uds::UnixStream::pair()?;
83 let a = UnixStream::new(a);
84 let b = UnixStream::new(b);
85
86 Ok((a, b))
87 }
88
new(stream: mio_uds::UnixStream) -> UnixStream89 pub(crate) fn new(stream: mio_uds::UnixStream) -> UnixStream {
90 let io = PollEvented::new(stream);
91 UnixStream { io }
92 }
93
94 /// Test whether this socket is ready to be read or not.
poll_read_ready(&self, ready: Ready) -> Poll<Ready, io::Error>95 pub fn poll_read_ready(&self, ready: Ready) -> Poll<Ready, io::Error> {
96 self.io.poll_read_ready(ready)
97 }
98
99 /// Clear read ready state.
clear_read_ready(&self, ready: mio::Ready) -> io::Result<()>100 pub fn clear_read_ready(&self, ready: mio::Ready) -> io::Result<()> {
101 self.io.clear_read_ready(ready)
102 }
103
104 /// Test whether this socket is ready to be written to or not.
poll_write_ready(&self) -> Poll<Ready, io::Error>105 pub fn poll_write_ready(&self) -> Poll<Ready, io::Error> {
106 self.io.poll_write_ready()
107 }
108
109 /// Clear write ready state.
clear_write_ready(&self) -> io::Result<()>110 pub fn clear_write_ready(&self) -> io::Result<()> {
111 self.io.clear_write_ready()
112 }
113
114 /// Returns the socket address of the local half of this connection.
local_addr(&self) -> io::Result<SocketAddr>115 pub fn local_addr(&self) -> io::Result<SocketAddr> {
116 self.io.get_ref().local_addr()
117 }
118
119 /// Returns the socket address of the remote half of this connection.
peer_addr(&self) -> io::Result<SocketAddr>120 pub fn peer_addr(&self) -> io::Result<SocketAddr> {
121 self.io.get_ref().peer_addr()
122 }
123
124 /// Returns the value of the `SO_ERROR` option.
take_error(&self) -> io::Result<Option<io::Error>>125 pub fn take_error(&self) -> io::Result<Option<io::Error>> {
126 self.io.get_ref().take_error()
127 }
128
129 /// Shuts down the read, write, or both halves of this connection.
130 ///
131 /// This function will cause all pending and future I/O calls on the
132 /// specified portions to immediately return with an appropriate value
133 /// (see the documentation of `Shutdown`).
shutdown(&self, how: Shutdown) -> io::Result<()>134 pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
135 self.io.get_ref().shutdown(how)
136 }
137 }
138
139 impl Read for UnixStream {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>140 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
141 self.io.read(buf)
142 }
143 }
144
145 impl Write for UnixStream {
write(&mut self, buf: &[u8]) -> io::Result<usize>146 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
147 self.io.write(buf)
148 }
flush(&mut self) -> io::Result<()>149 fn flush(&mut self) -> io::Result<()> {
150 self.io.flush()
151 }
152 }
153
154 impl AsyncRead for UnixStream {
prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool155 unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
156 false
157 }
158
read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error>159 fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
160 <&UnixStream>::read_buf(&mut &*self, buf)
161 }
162 }
163
164 impl AsyncWrite for UnixStream {
shutdown(&mut self) -> Poll<(), io::Error>165 fn shutdown(&mut self) -> Poll<(), io::Error> {
166 <&UnixStream>::shutdown(&mut &*self)
167 }
168
write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error>169 fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
170 <&UnixStream>::write_buf(&mut &*self, buf)
171 }
172 }
173
174 impl<'a> Read for &'a UnixStream {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>175 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
176 (&self.io).read(buf)
177 }
178 }
179
180 impl<'a> Write for &'a UnixStream {
write(&mut self, buf: &[u8]) -> io::Result<usize>181 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
182 (&self.io).write(buf)
183 }
184
flush(&mut self) -> io::Result<()>185 fn flush(&mut self) -> io::Result<()> {
186 (&self.io).flush()
187 }
188 }
189
190 impl<'a> AsyncRead for &'a UnixStream {
prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool191 unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
192 false
193 }
194
read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error>195 fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
196 if let Async::NotReady = <UnixStream>::poll_read_ready(self, Ready::readable())? {
197 return Ok(Async::NotReady);
198 }
199 unsafe {
200 let r = read_ready(buf, self.as_raw_fd());
201 if r == -1 {
202 let e = io::Error::last_os_error();
203 if e.kind() == io::ErrorKind::WouldBlock {
204 self.io.clear_read_ready(Ready::readable())?;
205 Ok(Async::NotReady)
206 } else {
207 Err(e)
208 }
209 } else {
210 let r = r as usize;
211 buf.advance_mut(r);
212 Ok(r.into())
213 }
214 }
215 }
216 }
217
218 impl<'a> AsyncWrite for &'a UnixStream {
shutdown(&mut self) -> Poll<(), io::Error>219 fn shutdown(&mut self) -> Poll<(), io::Error> {
220 Ok(().into())
221 }
222
write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error>223 fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
224 if let Async::NotReady = <UnixStream>::poll_write_ready(self)? {
225 return Ok(Async::NotReady);
226 }
227 unsafe {
228 let r = write_ready(buf, self.as_raw_fd());
229 if r == -1 {
230 let e = io::Error::last_os_error();
231 if e.kind() == io::ErrorKind::WouldBlock {
232 self.io.clear_write_ready()?;
233 Ok(Async::NotReady)
234 } else {
235 Err(e)
236 }
237 } else {
238 let r = r as usize;
239 buf.advance(r);
240 Ok(r.into())
241 }
242 }
243 }
244 }
245
246 impl fmt::Debug for UnixStream {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result247 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
248 self.io.get_ref().fmt(f)
249 }
250 }
251
252 impl AsRawFd for UnixStream {
as_raw_fd(&self) -> RawFd253 fn as_raw_fd(&self) -> RawFd {
254 self.io.get_ref().as_raw_fd()
255 }
256 }
257
258 impl Future for ConnectFuture {
259 type Item = UnixStream;
260 type Error = io::Error;
261
poll(&mut self) -> Poll<UnixStream, io::Error>262 fn poll(&mut self) -> Poll<UnixStream, io::Error> {
263 use std::mem;
264
265 match self.inner {
266 State::Waiting(ref mut stream) => {
267 if let Async::NotReady = stream.io.poll_write_ready()? {
268 return Ok(Async::NotReady);
269 }
270
271 if let Some(e) = stream.io.get_ref().take_error()? {
272 return Err(e);
273 }
274 }
275 State::Error(_) => {
276 let e = match mem::replace(&mut self.inner, State::Empty) {
277 State::Error(e) => e,
278 _ => unreachable!(),
279 };
280
281 return Err(e);
282 }
283 State::Empty => panic!("can't poll stream twice"),
284 }
285
286 match mem::replace(&mut self.inner, State::Empty) {
287 State::Waiting(stream) => Ok(Async::Ready(stream)),
288 _ => unreachable!(),
289 }
290 }
291 }
292
read_ready<B: BufMut>(buf: &mut B, raw_fd: RawFd) -> isize293 unsafe fn read_ready<B: BufMut>(buf: &mut B, raw_fd: RawFd) -> isize {
294 // The `IoVec` type can't have a 0-length size, so we create a bunch
295 // of dummy versions on the stack with 1 length which we'll quickly
296 // overwrite.
297 let b1: &mut [u8] = &mut [0];
298 let b2: &mut [u8] = &mut [0];
299 let b3: &mut [u8] = &mut [0];
300 let b4: &mut [u8] = &mut [0];
301 let b5: &mut [u8] = &mut [0];
302 let b6: &mut [u8] = &mut [0];
303 let b7: &mut [u8] = &mut [0];
304 let b8: &mut [u8] = &mut [0];
305 let b9: &mut [u8] = &mut [0];
306 let b10: &mut [u8] = &mut [0];
307 let b11: &mut [u8] = &mut [0];
308 let b12: &mut [u8] = &mut [0];
309 let b13: &mut [u8] = &mut [0];
310 let b14: &mut [u8] = &mut [0];
311 let b15: &mut [u8] = &mut [0];
312 let b16: &mut [u8] = &mut [0];
313 let mut bufs: [&mut IoVec; 16] = [
314 b1.into(),
315 b2.into(),
316 b3.into(),
317 b4.into(),
318 b5.into(),
319 b6.into(),
320 b7.into(),
321 b8.into(),
322 b9.into(),
323 b10.into(),
324 b11.into(),
325 b12.into(),
326 b13.into(),
327 b14.into(),
328 b15.into(),
329 b16.into(),
330 ];
331
332 let n = buf.bytes_vec_mut(&mut bufs);
333 read_ready_vecs(&mut bufs[..n], raw_fd)
334 }
335
read_ready_vecs(bufs: &mut [&mut IoVec], raw_fd: RawFd) -> isize336 unsafe fn read_ready_vecs(bufs: &mut [&mut IoVec], raw_fd: RawFd) -> isize {
337 let iovecs = iovec::unix::as_os_slice_mut(bufs);
338
339 libc::readv(raw_fd, iovecs.as_ptr(), iovecs.len() as i32)
340 }
341
write_ready<B: Buf>(buf: &mut B, raw_fd: RawFd) -> isize342 unsafe fn write_ready<B: Buf>(buf: &mut B, raw_fd: RawFd) -> isize {
343 // The `IoVec` type can't have a zero-length size, so create a dummy
344 // version from a 1-length slice which we'll overwrite with the
345 // `bytes_vec` method.
346 static DUMMY: &[u8] = &[0];
347 let iovec = <&IoVec>::from(DUMMY);
348 let mut bufs = [
349 iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec,
350 iovec, iovec, iovec,
351 ];
352
353 let n = buf.bytes_vec(&mut bufs);
354 write_ready_vecs(&bufs[..n], raw_fd)
355 }
356
write_ready_vecs(bufs: &[&IoVec], raw_fd: RawFd) -> isize357 unsafe fn write_ready_vecs(bufs: &[&IoVec], raw_fd: RawFd) -> isize {
358 let iovecs = iovec::unix::as_os_slice(bufs);
359
360 libc::writev(raw_fd, iovecs.as_ptr(), iovecs.len() as i32)
361 }
362