1 use std::fmt; 2 use std::io::{self, Read, Write}; 3 use std::mem; 4 use std::net::{self, SocketAddr, Shutdown}; 5 use std::time::Duration; 6 7 use bytes::{Buf, BufMut}; 8 use futures::{Future, Poll, Async}; 9 use iovec::IoVec; 10 use mio; 11 use tokio_io::{AsyncRead, AsyncWrite}; 12 use tokio_reactor::{Handle, PollEvented}; 13 14 #[cfg(feature = "unstable-futures")] 15 use futures2; 16 17 /// An I/O object representing a TCP stream connected to a remote endpoint. 18 /// 19 /// A TCP stream can either be created by connecting to an endpoint, via the 20 /// [`connect`] method, or by [accepting] a connection from a [listener]. 21 /// 22 /// [`connect`]: struct.TcpStream.html#method.connect 23 /// [accepting]: struct.TcpListener.html#method.accept 24 /// [listener]: struct.TcpListener.html 25 pub struct TcpStream { 26 io: PollEvented<mio::net::TcpStream>, 27 } 28 29 /// Future returned by `TcpStream::connect` which will resolve to a `TcpStream` 30 /// when the stream is connected. 31 #[must_use = "futures do nothing unless polled"] 32 #[derive(Debug)] 33 pub struct ConnectFuture { 34 inner: ConnectFutureState, 35 } 36 37 #[must_use = "futures do nothing unless polled"] 38 #[derive(Debug)] 39 enum ConnectFutureState { 40 Waiting(TcpStream), 41 Error(io::Error), 42 Empty, 43 } 44 45 impl TcpStream { 46 /// Create a new TCP stream connected to the specified address. 47 /// 48 /// This function will create a new TCP socket and attempt to connect it to 49 /// the `addr` provided. The returned future will be resolved once the 50 /// stream has successfully connected, or it wil return an error if one 51 /// occurs. connect(addr: &SocketAddr) -> ConnectFuture52 pub fn connect(addr: &SocketAddr) -> ConnectFuture { 53 use self::ConnectFutureState::*; 54 55 let inner = match mio::net::TcpStream::connect(addr) { 56 Ok(tcp) => Waiting(TcpStream::new(tcp)), 57 Err(e) => Error(e), 58 }; 59 60 ConnectFuture { inner } 61 } 62 new(connected: mio::net::TcpStream) -> TcpStream63 pub(crate) fn new(connected: mio::net::TcpStream) -> TcpStream { 64 let io = PollEvented::new(connected); 65 TcpStream { io } 66 } 67 68 /// Create a new `TcpStream` from a `net::TcpStream`. 69 /// 70 /// This function will convert a TCP stream created by the standard library 71 /// to a TCP stream ready to be used with the provided event loop handle. 72 /// Use `Handle::default()` to lazily bind to an event loop, just like `connect` does. from_std(stream: net::TcpStream, handle: &Handle) -> io::Result<TcpStream>73 pub fn from_std(stream: net::TcpStream, handle: &Handle) 74 -> io::Result<TcpStream> 75 { 76 let io = mio::net::TcpStream::from_stream(stream)?; 77 let io = PollEvented::new_with_handle(io, handle)?; 78 79 Ok(TcpStream { io }) 80 } 81 82 /// Creates a new `TcpStream` from the pending socket inside the given 83 /// `std::net::TcpStream`, connecting it to the address specified. 84 /// 85 /// This constructor allows configuring the socket before it's actually 86 /// connected, and this function will transfer ownership to the returned 87 /// `TcpStream` if successful. An unconnected `TcpStream` can be created 88 /// with the `net2::TcpBuilder` type (and also configured via that route). 89 /// 90 /// The platform specific behavior of this function looks like: 91 /// 92 /// * On Unix, the socket is placed into nonblocking mode and then a 93 /// `connect` call is issued. 94 /// 95 /// * On Windows, the address is stored internally and the connect operation 96 /// is issued when the returned `TcpStream` is registered with an event 97 /// loop. Note that on Windows you must `bind` a socket before it can be 98 /// connected, so if a custom `TcpBuilder` is used it should be bound 99 /// (perhaps to `INADDR_ANY`) before this method is called. connect_std(stream: net::TcpStream, addr: &SocketAddr, handle: &Handle) -> ConnectFuture100 pub fn connect_std(stream: net::TcpStream, 101 addr: &SocketAddr, 102 handle: &Handle) 103 -> ConnectFuture 104 { 105 use self::ConnectFutureState::*; 106 107 let io = mio::net::TcpStream::connect_stream(stream, addr) 108 .and_then(|io| PollEvented::new_with_handle(io, handle)); 109 110 let inner = match io { 111 Ok(io) => Waiting(TcpStream { io }), 112 Err(e) => Error(e), 113 }; 114 115 ConnectFuture { inner: inner } 116 } 117 118 /// Check the TCP stream's read readiness state. 119 /// 120 /// The mask argument allows specifying what readiness to notify on. This 121 /// can be any value, including platform specific readiness, **except** 122 /// `writable`. HUP is always implicitly included on platforms that support 123 /// it. 124 /// 125 /// If the resource is not ready for a read then `Async::NotReady` is 126 /// returned and the current task is notified once a new event is received. 127 /// 128 /// The stream will remain in a read-ready state until calls to `poll_read` 129 /// return `NotReady`. 130 /// 131 /// # Panics 132 /// 133 /// This function panics if: 134 /// 135 /// * `ready` includes writable. 136 /// * called from outside of a task context. poll_read_ready(&self, mask: mio::Ready) -> Poll<mio::Ready, io::Error>137 pub fn poll_read_ready(&self, mask: mio::Ready) -> Poll<mio::Ready, io::Error> { 138 self.io.poll_read_ready(mask) 139 } 140 141 /// Like `poll_read_ready`, but compatible with futures 0.2 142 #[cfg(feature = "unstable-futures")] poll_read_ready2(&self, cx: &mut futures2::task::Context, mask: mio::Ready) -> futures2::Poll<mio::Ready, io::Error>143 pub fn poll_read_ready2(&self, cx: &mut futures2::task::Context, mask: mio::Ready) 144 -> futures2::Poll<mio::Ready, io::Error> 145 { 146 self.io.poll_read_ready2(cx, mask) 147 } 148 149 /// Check the TCP stream's write readiness state. 150 /// 151 /// This always checks for writable readiness and also checks for HUP 152 /// readiness on platforms that support it. 153 /// 154 /// If the resource is not ready for a write then `Async::NotReady` is 155 /// returned and the current task is notified once a new event is received. 156 /// 157 /// The I/O resource will remain in a write-ready state until calls to 158 /// `poll_write` return `NotReady`. 159 /// 160 /// # Panics 161 /// 162 /// This function panics if: 163 /// 164 /// * `ready` contains bits besides `writable` and `hup`. 165 /// * called from outside of a task context. poll_write_ready(&self) -> Poll<mio::Ready, io::Error>166 pub fn poll_write_ready(&self) -> Poll<mio::Ready, io::Error> { 167 self.io.poll_write_ready() 168 } 169 170 /// Like `poll_write_ready`, but compatible with futures 0.2. 171 #[cfg(feature = "unstable-futures")] poll_write_ready2(&self, cx: &mut futures2::task::Context) -> futures2::Poll<mio::Ready, io::Error>172 pub fn poll_write_ready2(&self, cx: &mut futures2::task::Context) 173 -> futures2::Poll<mio::Ready, io::Error> 174 { 175 self.io.poll_write_ready2(cx) 176 } 177 178 /// Returns the local address that this stream is bound to. local_addr(&self) -> io::Result<SocketAddr>179 pub fn local_addr(&self) -> io::Result<SocketAddr> { 180 self.io.get_ref().local_addr() 181 } 182 183 /// Returns the remote address that this stream is connected to. peer_addr(&self) -> io::Result<SocketAddr>184 pub fn peer_addr(&self) -> io::Result<SocketAddr> { 185 self.io.get_ref().peer_addr() 186 } 187 188 #[deprecated(since = "0.1.2", note = "use poll_peek instead")] 189 #[doc(hidden)] peek(&mut self, buf: &mut [u8]) -> io::Result<usize>190 pub fn peek(&mut self, buf: &mut [u8]) -> io::Result<usize> { 191 match self.poll_peek(buf)? { 192 Async::Ready(n) => Ok(n), 193 Async::NotReady => Err(io::ErrorKind::WouldBlock.into()), 194 } 195 } 196 197 /// Receives data on the socket from the remote address to which it is 198 /// connected, without removing that data from the queue. On success, 199 /// returns the number of bytes peeked. 200 /// 201 /// Successive calls return the same data. This is accomplished by passing 202 /// `MSG_PEEK` as a flag to the underlying recv system call. 203 /// 204 /// # Return 205 /// 206 /// On success, returns `Ok(Async::Ready(num_bytes_read))`. 207 /// 208 /// If no data is available for reading, the method returns 209 /// `Ok(Async::NotReady)` and arranges for the current task to receive a 210 /// notification when the socket becomes readable or is closed. 211 /// 212 /// # Panics 213 /// 214 /// This function will panic if called from outside of a task context. poll_peek(&mut self, buf: &mut [u8]) -> Poll<usize, io::Error>215 pub fn poll_peek(&mut self, buf: &mut [u8]) -> Poll<usize, io::Error> { 216 try_ready!(self.io.poll_read_ready(mio::Ready::readable())); 217 218 match self.io.get_ref().peek(buf) { 219 Ok(ret) => Ok(ret.into()), 220 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 221 self.io.clear_read_ready(mio::Ready::readable())?; 222 Ok(Async::NotReady) 223 } 224 Err(e) => Err(e), 225 } 226 } 227 228 /// Like `poll_peek` but compatible with futures 0.2 229 #[cfg(feature = "unstable-futures")] poll_peek2(&mut self, cx: &mut futures2::task::Context, buf: &mut [u8]) -> futures2::Poll<usize, io::Error>230 pub fn poll_peek2(&mut self, cx: &mut futures2::task::Context, buf: &mut [u8]) 231 -> futures2::Poll<usize, io::Error> 232 { 233 if let futures2::Async::Pending = self.io.poll_read_ready2(cx, mio::Ready::readable())? { 234 return Ok(futures2::Async::Pending); 235 } 236 237 match self.io.get_ref().peek(buf) { 238 Ok(ret) => Ok(ret.into()), 239 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 240 self.io.clear_read_ready2(cx, mio::Ready::readable())?; 241 Ok(futures2::Async::Pending) 242 } 243 Err(e) => Err(e), 244 } 245 } 246 247 /// Shuts down the read, write, or both halves of this connection. 248 /// 249 /// This function will cause all pending and future I/O on the specified 250 /// portions to return immediately with an appropriate value (see the 251 /// documentation of `Shutdown`). shutdown(&self, how: Shutdown) -> io::Result<()>252 pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { 253 self.io.get_ref().shutdown(how) 254 } 255 256 /// Gets the value of the `TCP_NODELAY` option on this socket. 257 /// 258 /// For more information about this option, see [`set_nodelay`]. 259 /// 260 /// [`set_nodelay`]: #method.set_nodelay nodelay(&self) -> io::Result<bool>261 pub fn nodelay(&self) -> io::Result<bool> { 262 self.io.get_ref().nodelay() 263 } 264 265 /// Sets the value of the `TCP_NODELAY` option on this socket. 266 /// 267 /// If set, this option disables the Nagle algorithm. This means that 268 /// segments are always sent as soon as possible, even if there is only a 269 /// small amount of data. When not set, data is buffered until there is a 270 /// sufficient amount to send out, thereby avoiding the frequent sending of 271 /// small packets. set_nodelay(&self, nodelay: bool) -> io::Result<()>272 pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> { 273 self.io.get_ref().set_nodelay(nodelay) 274 } 275 276 /// Gets the value of the `SO_RCVBUF` option on this socket. 277 /// 278 /// For more information about this option, see [`set_recv_buffer_size`]. 279 /// 280 /// [`set_recv_buffer_size`]: #tymethod.set_recv_buffer_size recv_buffer_size(&self) -> io::Result<usize>281 pub fn recv_buffer_size(&self) -> io::Result<usize> { 282 self.io.get_ref().recv_buffer_size() 283 } 284 285 /// Sets the value of the `SO_RCVBUF` option on this socket. 286 /// 287 /// Changes the size of the operating system's receive buffer associated 288 /// with the socket. set_recv_buffer_size(&self, size: usize) -> io::Result<()>289 pub fn set_recv_buffer_size(&self, size: usize) -> io::Result<()> { 290 self.io.get_ref().set_recv_buffer_size(size) 291 } 292 293 /// Gets the value of the `SO_SNDBUF` option on this socket. 294 /// 295 /// For more information about this option, see [`set_send_buffer`]. 296 /// 297 /// [`set_send_buffer`]: #tymethod.set_send_buffer send_buffer_size(&self) -> io::Result<usize>298 pub fn send_buffer_size(&self) -> io::Result<usize> { 299 self.io.get_ref().send_buffer_size() 300 } 301 302 /// Sets the value of the `SO_SNDBUF` option on this socket. 303 /// 304 /// Changes the size of the operating system's send buffer associated with 305 /// the socket. set_send_buffer_size(&self, size: usize) -> io::Result<()>306 pub fn set_send_buffer_size(&self, size: usize) -> io::Result<()> { 307 self.io.get_ref().set_send_buffer_size(size) 308 } 309 310 /// Returns whether keepalive messages are enabled on this socket, and if so 311 /// the duration of time between them. 312 /// 313 /// For more information about this option, see [`set_keepalive`]. 314 /// 315 /// [`set_keepalive`]: #tymethod.set_keepalive keepalive(&self) -> io::Result<Option<Duration>>316 pub fn keepalive(&self) -> io::Result<Option<Duration>> { 317 self.io.get_ref().keepalive() 318 } 319 320 /// Sets whether keepalive messages are enabled to be sent on this socket. 321 /// 322 /// On Unix, this option will set the `SO_KEEPALIVE` as well as the 323 /// `TCP_KEEPALIVE` or `TCP_KEEPIDLE` option (depending on your platform). 324 /// On Windows, this will set the `SIO_KEEPALIVE_VALS` option. 325 /// 326 /// If `None` is specified then keepalive messages are disabled, otherwise 327 /// the duration specified will be the time to remain idle before sending a 328 /// TCP keepalive probe. 329 /// 330 /// Some platforms specify this value in seconds, so sub-second 331 /// specifications may be omitted. set_keepalive(&self, keepalive: Option<Duration>) -> io::Result<()>332 pub fn set_keepalive(&self, keepalive: Option<Duration>) -> io::Result<()> { 333 self.io.get_ref().set_keepalive(keepalive) 334 } 335 336 /// Gets the value of the `IP_TTL` option for this socket. 337 /// 338 /// For more information about this option, see [`set_ttl`]. 339 /// 340 /// [`set_ttl`]: #tymethod.set_ttl ttl(&self) -> io::Result<u32>341 pub fn ttl(&self) -> io::Result<u32> { 342 self.io.get_ref().ttl() 343 } 344 345 /// Sets the value for the `IP_TTL` option on this socket. 346 /// 347 /// This value sets the time-to-live field that is used in every packet sent 348 /// from this socket. set_ttl(&self, ttl: u32) -> io::Result<()>349 pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { 350 self.io.get_ref().set_ttl(ttl) 351 } 352 353 /// Reads the linger duration for this socket by getting the `SO_LINGER` 354 /// option. 355 /// 356 /// For more information about this option, see [`set_linger`]. 357 /// 358 /// [`set_linger`]: #tymethod.set_linger linger(&self) -> io::Result<Option<Duration>>359 pub fn linger(&self) -> io::Result<Option<Duration>> { 360 self.io.get_ref().linger() 361 } 362 363 /// Sets the linger duration of this socket by setting the `SO_LINGER` 364 /// option. 365 /// 366 /// This option controls the action taken when a stream has unsent messages 367 /// and the stream is closed. If `SO_LINGER` is set, the system 368 /// shall block the process until it can transmit the data or until the 369 /// time expires. 370 /// 371 /// If `SO_LINGER` is not specified, and the stream is closed, the system 372 /// handles the call in a way that allows the process to continue as quickly 373 /// as possible. set_linger(&self, dur: Option<Duration>) -> io::Result<()>374 pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> { 375 self.io.get_ref().set_linger(dur) 376 } 377 378 /// Creates a new independently owned handle to the underlying socket. 379 /// 380 /// The returned `TcpStream` is a reference to the same stream that this 381 /// object references. Both handles will read and write the same stream of 382 /// data, and options set on one stream will be propagated to the other 383 /// stream. try_clone(&self) -> io::Result<TcpStream>384 pub fn try_clone(&self) -> io::Result<TcpStream> { 385 let io = self.io.get_ref().try_clone()?; 386 Ok(TcpStream::new(io)) 387 } 388 } 389 390 // ===== impl Read / Write ===== 391 392 impl Read for TcpStream { read(&mut self, buf: &mut [u8]) -> io::Result<usize>393 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { 394 self.io.read(buf) 395 } 396 } 397 398 impl Write for TcpStream { write(&mut self, buf: &[u8]) -> io::Result<usize>399 fn write(&mut self, buf: &[u8]) -> io::Result<usize> { 400 self.io.write(buf) 401 } flush(&mut self) -> io::Result<()>402 fn flush(&mut self) -> io::Result<()> { 403 Ok(()) 404 } 405 } 406 407 impl AsyncRead for TcpStream { prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool408 unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool { 409 false 410 } 411 read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error>412 fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> { 413 <&TcpStream>::read_buf(&mut &*self, buf) 414 } 415 } 416 417 #[cfg(feature = "unstable-futures")] 418 impl futures2::io::AsyncRead for TcpStream { poll_read(&mut self, cx: &mut futures2::task::Context, buf: &mut [u8]) -> futures2::Poll<usize, io::Error>419 fn poll_read(&mut self, cx: &mut futures2::task::Context, buf: &mut [u8]) 420 -> futures2::Poll<usize, io::Error> 421 { 422 futures2::io::AsyncRead::poll_read(&mut self.io, cx, buf) 423 } 424 poll_vectored_read(&mut self, cx: &mut futures2::task::Context, vec: &mut [&mut IoVec]) -> futures2::Poll<usize, io::Error>425 fn poll_vectored_read(&mut self, cx: &mut futures2::task::Context, vec: &mut [&mut IoVec]) 426 -> futures2::Poll<usize, io::Error> 427 { 428 futures2::io::AsyncRead::poll_vectored_read(&mut &*self, cx, vec) 429 } 430 initializer(&self) -> futures2::io::Initializer431 unsafe fn initializer(&self) -> futures2::io::Initializer { 432 futures2::io::Initializer::nop() 433 } 434 } 435 436 impl AsyncWrite for TcpStream { shutdown(&mut self) -> Poll<(), io::Error>437 fn shutdown(&mut self) -> Poll<(), io::Error> { 438 <&TcpStream>::shutdown(&mut &*self) 439 } 440 write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error>441 fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> { 442 <&TcpStream>::write_buf(&mut &*self, buf) 443 } 444 } 445 446 #[cfg(feature = "unstable-futures")] 447 impl futures2::io::AsyncWrite for TcpStream { poll_write(&mut self, cx: &mut futures2::task::Context, buf: &[u8]) -> futures2::Poll<usize, io::Error>448 fn poll_write(&mut self, cx: &mut futures2::task::Context, buf: &[u8]) 449 -> futures2::Poll<usize, io::Error> 450 { 451 futures2::io::AsyncWrite::poll_write(&mut self.io, cx, buf) 452 } 453 poll_vectored_write(&mut self, cx: &mut futures2::task::Context, vec: &[&IoVec]) -> futures2::Poll<usize, io::Error>454 fn poll_vectored_write(&mut self, cx: &mut futures2::task::Context, vec: &[&IoVec]) 455 -> futures2::Poll<usize, io::Error> 456 { 457 futures2::io::AsyncWrite::poll_vectored_write(&mut &*self, cx, vec) 458 } 459 poll_flush(&mut self, cx: &mut futures2::task::Context) -> futures2::Poll<(), io::Error>460 fn poll_flush(&mut self, cx: &mut futures2::task::Context) -> futures2::Poll<(), io::Error> { 461 futures2::io::AsyncWrite::poll_flush(&mut self.io, cx) 462 } 463 poll_close(&mut self, cx: &mut futures2::task::Context) -> futures2::Poll<(), io::Error>464 fn poll_close(&mut self, cx: &mut futures2::task::Context) -> futures2::Poll<(), io::Error> { 465 futures2::io::AsyncWrite::poll_close(&mut self.io, cx) 466 } 467 } 468 469 // ===== impl Read / Write for &'a ===== 470 471 impl<'a> Read for &'a TcpStream { read(&mut self, buf: &mut [u8]) -> io::Result<usize>472 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { 473 (&self.io).read(buf) 474 } 475 } 476 477 impl<'a> Write for &'a TcpStream { write(&mut self, buf: &[u8]) -> io::Result<usize>478 fn write(&mut self, buf: &[u8]) -> io::Result<usize> { 479 (&self.io).write(buf) 480 } 481 flush(&mut self) -> io::Result<()>482 fn flush(&mut self) -> io::Result<()> { 483 (&self.io).flush() 484 } 485 } 486 487 impl<'a> AsyncRead for &'a TcpStream { prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool488 unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool { 489 false 490 } 491 read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error>492 fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> { 493 if let Async::NotReady = self.io.poll_read_ready(mio::Ready::readable())? { 494 return Ok(Async::NotReady) 495 } 496 497 let r = unsafe { 498 // The `IoVec` type can't have a 0-length size, so we create a bunch 499 // of dummy versions on the stack with 1 length which we'll quickly 500 // overwrite. 501 let b1: &mut [u8] = &mut [0]; 502 let b2: &mut [u8] = &mut [0]; 503 let b3: &mut [u8] = &mut [0]; 504 let b4: &mut [u8] = &mut [0]; 505 let b5: &mut [u8] = &mut [0]; 506 let b6: &mut [u8] = &mut [0]; 507 let b7: &mut [u8] = &mut [0]; 508 let b8: &mut [u8] = &mut [0]; 509 let b9: &mut [u8] = &mut [0]; 510 let b10: &mut [u8] = &mut [0]; 511 let b11: &mut [u8] = &mut [0]; 512 let b12: &mut [u8] = &mut [0]; 513 let b13: &mut [u8] = &mut [0]; 514 let b14: &mut [u8] = &mut [0]; 515 let b15: &mut [u8] = &mut [0]; 516 let b16: &mut [u8] = &mut [0]; 517 let mut bufs: [&mut IoVec; 16] = [ 518 b1.into(), b2.into(), b3.into(), b4.into(), 519 b5.into(), b6.into(), b7.into(), b8.into(), 520 b9.into(), b10.into(), b11.into(), b12.into(), 521 b13.into(), b14.into(), b15.into(), b16.into(), 522 ]; 523 let n = buf.bytes_vec_mut(&mut bufs); 524 self.io.get_ref().read_bufs(&mut bufs[..n]) 525 }; 526 527 match r { 528 Ok(n) => { 529 unsafe { buf.advance_mut(n); } 530 Ok(Async::Ready(n)) 531 } 532 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 533 self.io.clear_read_ready(mio::Ready::readable())?; 534 Ok(Async::NotReady) 535 } 536 Err(e) => Err(e), 537 } 538 } 539 } 540 541 #[cfg(feature = "unstable-futures")] 542 impl<'a> futures2::io::AsyncRead for &'a TcpStream { poll_read(&mut self, cx: &mut futures2::task::Context, buf: &mut [u8]) -> futures2::Poll<usize, io::Error>543 fn poll_read(&mut self, cx: &mut futures2::task::Context, buf: &mut [u8]) 544 -> futures2::Poll<usize, io::Error> 545 { 546 futures2::io::AsyncRead::poll_read(&mut &self.io, cx, buf) 547 } 548 poll_vectored_read(&mut self, cx: &mut futures2::task::Context, vec: &mut [&mut IoVec]) -> futures2::Poll<usize, io::Error>549 fn poll_vectored_read(&mut self, cx: &mut futures2::task::Context, vec: &mut [&mut IoVec]) 550 -> futures2::Poll<usize, io::Error> 551 { 552 if let futures2::Async::Pending = self.io.poll_read_ready2(cx, mio::Ready::readable())? { 553 return Ok(futures2::Async::Pending) 554 } 555 556 let r = self.io.get_ref().read_bufs(vec); 557 558 match r { 559 Ok(n) => { 560 Ok(futures2::Async::Ready(n)) 561 } 562 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 563 self.io.clear_read_ready2(cx, mio::Ready::readable())?; 564 Ok(futures2::Async::Pending) 565 } 566 Err(e) => Err(e), 567 } 568 } 569 initializer(&self) -> futures2::io::Initializer570 unsafe fn initializer(&self) -> futures2::io::Initializer { 571 futures2::io::Initializer::nop() 572 } 573 } 574 575 impl<'a> AsyncWrite for &'a TcpStream { shutdown(&mut self) -> Poll<(), io::Error>576 fn shutdown(&mut self) -> Poll<(), io::Error> { 577 Ok(().into()) 578 } 579 write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error>580 fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> { 581 if let Async::NotReady = self.io.poll_write_ready()? { 582 return Ok(Async::NotReady) 583 } 584 585 let r = { 586 // The `IoVec` type can't have a zero-length size, so create a dummy 587 // version from a 1-length slice which we'll overwrite with the 588 // `bytes_vec` method. 589 static DUMMY: &[u8] = &[0]; 590 let iovec = <&IoVec>::from(DUMMY); 591 let mut bufs = [iovec; 64]; 592 let n = buf.bytes_vec(&mut bufs); 593 self.io.get_ref().write_bufs(&bufs[..n]) 594 }; 595 match r { 596 Ok(n) => { 597 buf.advance(n); 598 Ok(Async::Ready(n)) 599 } 600 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 601 self.io.clear_write_ready()?; 602 Ok(Async::NotReady) 603 } 604 Err(e) => Err(e), 605 } 606 } 607 } 608 609 #[cfg(feature = "unstable-futures")] 610 impl<'a> futures2::io::AsyncWrite for &'a TcpStream { poll_write(&mut self, cx: &mut futures2::task::Context, buf: &[u8]) -> futures2::Poll<usize, io::Error>611 fn poll_write(&mut self, cx: &mut futures2::task::Context, buf: &[u8]) 612 -> futures2::Poll<usize, io::Error> 613 { 614 futures2::io::AsyncWrite::poll_write(&mut &self.io, cx, buf) 615 } 616 poll_vectored_write(&mut self, cx: &mut futures2::task::Context, vec: &[&IoVec]) -> futures2::Poll<usize, io::Error>617 fn poll_vectored_write(&mut self, cx: &mut futures2::task::Context, vec: &[&IoVec]) 618 -> futures2::Poll<usize, io::Error> 619 { 620 if let futures2::Async::Pending = self.io.poll_write_ready2(cx)? { 621 return Ok(futures2::Async::Pending) 622 } 623 624 let r = self.io.get_ref().write_bufs(vec); 625 626 match r { 627 Ok(n) => { 628 Ok(futures2::Async::Ready(n)) 629 } 630 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 631 self.io.clear_write_ready2(cx)?; 632 Ok(futures2::Async::Pending) 633 } 634 Err(e) => Err(e), 635 } 636 } 637 poll_flush(&mut self, cx: &mut futures2::task::Context) -> futures2::Poll<(), io::Error>638 fn poll_flush(&mut self, cx: &mut futures2::task::Context) -> futures2::Poll<(), io::Error> { 639 futures2::io::AsyncWrite::poll_flush(&mut &self.io, cx) 640 } 641 poll_close(&mut self, cx: &mut futures2::task::Context) -> futures2::Poll<(), io::Error>642 fn poll_close(&mut self, cx: &mut futures2::task::Context) -> futures2::Poll<(), io::Error> { 643 futures2::io::AsyncWrite::poll_close(&mut &self.io, cx) 644 } 645 } 646 647 impl fmt::Debug for TcpStream { fmt(&self, f: &mut fmt::Formatter) -> fmt::Result648 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 649 self.io.get_ref().fmt(f) 650 } 651 } 652 653 impl Future for ConnectFuture { 654 type Item = TcpStream; 655 type Error = io::Error; 656 poll(&mut self) -> Poll<TcpStream, io::Error>657 fn poll(&mut self) -> Poll<TcpStream, io::Error> { 658 self.inner.poll() 659 } 660 } 661 662 #[cfg(feature = "unstable-futures")] 663 impl futures2::Future for ConnectFuture { 664 type Item = TcpStream; 665 type Error = io::Error; 666 poll(&mut self, cx: &mut futures2::task::Context) -> futures2::Poll<TcpStream, io::Error>667 fn poll(&mut self, cx: &mut futures2::task::Context) -> futures2::Poll<TcpStream, io::Error> { 668 futures2::Future::poll(&mut self.inner, cx) 669 } 670 } 671 672 impl ConnectFutureState { poll_inner<F>(&mut self, f: F) -> Poll<TcpStream, io::Error> where F: FnOnce(&mut PollEvented<mio::net::TcpStream>) -> Poll<mio::Ready, io::Error>673 fn poll_inner<F>(&mut self, f: F) -> Poll<TcpStream, io::Error> 674 where F: FnOnce(&mut PollEvented<mio::net::TcpStream>) -> Poll<mio::Ready, io::Error> 675 { 676 { 677 let stream = match *self { 678 ConnectFutureState::Waiting(ref mut s) => s, 679 ConnectFutureState::Error(_) => { 680 let e = match mem::replace(self, ConnectFutureState::Empty) { 681 ConnectFutureState::Error(e) => e, 682 _ => panic!(), 683 }; 684 return Err(e) 685 } 686 ConnectFutureState::Empty => panic!("can't poll TCP stream twice"), 687 }; 688 689 // Once we've connected, wait for the stream to be writable as 690 // that's when the actual connection has been initiated. Once we're 691 // writable we check for `take_socket_error` to see if the connect 692 // actually hit an error or not. 693 // 694 // If all that succeeded then we ship everything on up. 695 if let Async::NotReady = f(&mut stream.io)? { 696 return Ok(Async::NotReady) 697 } 698 699 if let Some(e) = try!(stream.io.get_ref().take_error()) { 700 return Err(e) 701 } 702 } 703 704 match mem::replace(self, ConnectFutureState::Empty) { 705 ConnectFutureState::Waiting(stream) => Ok(Async::Ready(stream)), 706 _ => panic!(), 707 } 708 } 709 } 710 711 impl Future for ConnectFutureState { 712 type Item = TcpStream; 713 type Error = io::Error; 714 poll(&mut self) -> Poll<TcpStream, io::Error>715 fn poll(&mut self) -> Poll<TcpStream, io::Error> { 716 self.poll_inner(|io| io.poll_write_ready()) 717 } 718 } 719 720 #[cfg(feature = "unstable-futures")] 721 impl futures2::Future for ConnectFutureState { 722 type Item = TcpStream; 723 type Error = io::Error; 724 poll(&mut self, cx: &mut futures2::task::Context) -> futures2::Poll<TcpStream, io::Error>725 fn poll(&mut self, cx: &mut futures2::task::Context) -> futures2::Poll<TcpStream, io::Error> { 726 self.poll_inner(|io| io.poll_write_ready2(cx).map(::lower_async)) 727 .map(::lift_async) 728 } 729 } 730 731 #[cfg(unix)] 732 mod sys { 733 use std::os::unix::prelude::*; 734 use super::TcpStream; 735 736 impl AsRawFd for TcpStream { as_raw_fd(&self) -> RawFd737 fn as_raw_fd(&self) -> RawFd { 738 self.io.get_ref().as_raw_fd() 739 } 740 } 741 } 742 743 #[cfg(windows)] 744 mod sys { 745 // TODO: let's land these upstream with mio and then we can add them here. 746 // 747 // use std::os::windows::prelude::*; 748 // use super::TcpStream; 749 // 750 // impl AsRawHandle for TcpStream { 751 // fn as_raw_handle(&self) -> RawHandle { 752 // self.io.get_ref().as_raw_handle() 753 // } 754 // } 755 } 756