1 use std::fmt; 2 use std::io::{self, Read, Write}; 3 use std::mem; 4 use std::net::{self, Shutdown, SocketAddr}; 5 use std::time::Duration; 6 7 use bytes::{Buf, BufMut}; 8 use futures::{Async, Future, Poll}; 9 use iovec::IoVec; 10 use mio; 11 use tokio_io::{AsyncRead, AsyncWrite}; 12 use tokio_reactor::{Handle, PollEvented}; 13 14 /// An I/O object representing a TCP stream connected to a remote endpoint. 15 /// 16 /// A TCP stream can either be created by connecting to an endpoint, via the 17 /// [`connect`] method, or by [accepting] a connection from a [listener]. 18 /// 19 /// [`connect`]: struct.TcpStream.html#method.connect 20 /// [accepting]: struct.TcpListener.html#method.accept 21 /// [listener]: struct.TcpListener.html 22 /// 23 /// # Examples 24 /// 25 /// ``` 26 /// # extern crate tokio; 27 /// # extern crate futures; 28 /// use futures::Future; 29 /// use tokio::io::AsyncWrite; 30 /// use tokio::net::TcpStream; 31 /// use std::net::SocketAddr; 32 /// 33 /// # fn main() -> Result<(), Box<std::error::Error>> { 34 /// let addr = "127.0.0.1:34254".parse::<SocketAddr>()?; 35 /// let stream = TcpStream::connect(&addr); 36 /// stream.map(|mut stream| { 37 /// // Attempt to write bytes asynchronously to the stream 38 /// stream.poll_write(&[1]); 39 /// }); 40 /// # Ok(()) 41 /// # } 42 /// ``` 43 pub struct TcpStream { 44 io: PollEvented<mio::net::TcpStream>, 45 } 46 47 /// Future returned by `TcpStream::connect` which will resolve to a `TcpStream` 48 /// when the stream is connected. 49 #[must_use = "futures do nothing unless polled"] 50 #[derive(Debug)] 51 pub struct ConnectFuture { 52 inner: ConnectFutureState, 53 } 54 55 #[must_use = "futures do nothing unless polled"] 56 #[derive(Debug)] 57 enum ConnectFutureState { 58 Waiting(TcpStream), 59 Error(io::Error), 60 Empty, 61 } 62 63 impl TcpStream { 64 /// Create a new TCP stream connected to the specified address. 65 /// 66 /// This function will create a new TCP socket and attempt to connect it to 67 /// the `addr` provided. The returned future will be resolved once the 68 /// stream has successfully connected, or it will return an error if one 69 /// occurs. 70 /// 71 /// # Examples 72 /// 73 /// ``` 74 /// # extern crate tokio; 75 /// # extern crate futures; 76 /// use futures::Future; 77 /// use tokio::net::TcpStream; 78 /// use std::net::SocketAddr; 79 /// 80 /// # fn main() -> Result<(), Box<std::error::Error>> { 81 /// let addr = "127.0.0.1:34254".parse::<SocketAddr>()?; 82 /// let stream = TcpStream::connect(&addr) 83 /// .map(|stream| 84 /// println!("successfully connected to {}", stream.local_addr().unwrap())); 85 /// # Ok(()) 86 /// # } 87 /// ``` connect(addr: &SocketAddr) -> ConnectFuture88 pub fn connect(addr: &SocketAddr) -> ConnectFuture { 89 use self::ConnectFutureState::*; 90 91 let inner = match mio::net::TcpStream::connect(addr) { 92 Ok(tcp) => Waiting(TcpStream::new(tcp)), 93 Err(e) => Error(e), 94 }; 95 96 ConnectFuture { inner } 97 } 98 new(connected: mio::net::TcpStream) -> TcpStream99 pub(crate) fn new(connected: mio::net::TcpStream) -> TcpStream { 100 let io = PollEvented::new(connected); 101 TcpStream { io } 102 } 103 104 /// Create a new `TcpStream` from a `net::TcpStream`. 105 /// 106 /// This function will convert a TCP stream created by the standard library 107 /// to a TCP stream ready to be used with the provided event loop handle. 108 /// Use `Handle::default()` to lazily bind to an event loop, just like `connect` does. 109 /// 110 /// # Examples 111 /// 112 /// ```no_run 113 /// # extern crate tokio; 114 /// # extern crate tokio_reactor; 115 /// use tokio::net::TcpStream; 116 /// use std::net::TcpStream as StdTcpStream; 117 /// use tokio_reactor::Handle; 118 /// 119 /// # fn main() -> Result<(), Box<std::error::Error>> { 120 /// let std_stream = StdTcpStream::connect("127.0.0.1:34254")?; 121 /// let stream = TcpStream::from_std(std_stream, &Handle::default())?; 122 /// # Ok(()) 123 /// # } 124 /// ``` from_std(stream: net::TcpStream, handle: &Handle) -> io::Result<TcpStream>125 pub fn from_std(stream: net::TcpStream, handle: &Handle) -> io::Result<TcpStream> { 126 let io = mio::net::TcpStream::from_stream(stream)?; 127 let io = PollEvented::new_with_handle(io, handle)?; 128 129 Ok(TcpStream { io }) 130 } 131 132 /// Creates a new `TcpStream` from the pending socket inside the given 133 /// `std::net::TcpStream`, connecting it to the address specified. 134 /// 135 /// This constructor allows configuring the socket before it's actually 136 /// connected, and this function will transfer ownership to the returned 137 /// `TcpStream` if successful. An unconnected `TcpStream` can be created 138 /// with the `net2::TcpBuilder` type (and also configured via that route). 139 /// 140 /// The platform specific behavior of this function looks like: 141 /// 142 /// * On Unix, the socket is placed into nonblocking mode and then a 143 /// `connect` call is issued. 144 /// 145 /// * On Windows, the address is stored internally and the connect operation 146 /// is issued when the returned `TcpStream` is registered with an event 147 /// loop. Note that on Windows you must `bind` a socket before it can be 148 /// connected, so if a custom `TcpBuilder` is used it should be bound 149 /// (perhaps to `INADDR_ANY`) before this method is called. connect_std( stream: net::TcpStream, addr: &SocketAddr, handle: &Handle, ) -> ConnectFuture150 pub fn connect_std( 151 stream: net::TcpStream, 152 addr: &SocketAddr, 153 handle: &Handle, 154 ) -> ConnectFuture { 155 use self::ConnectFutureState::*; 156 157 let io = mio::net::TcpStream::connect_stream(stream, addr) 158 .and_then(|io| PollEvented::new_with_handle(io, handle)); 159 160 let inner = match io { 161 Ok(io) => Waiting(TcpStream { io }), 162 Err(e) => Error(e), 163 }; 164 165 ConnectFuture { inner: inner } 166 } 167 168 /// Check the TCP stream's read readiness state. 169 /// 170 /// The mask argument allows specifying what readiness to notify on. This 171 /// can be any value, including platform specific readiness, **except** 172 /// `writable`. HUP is always implicitly included on platforms that support 173 /// it. 174 /// 175 /// If the resource is not ready for a read then `Async::NotReady` is 176 /// returned and the current task is notified once a new event is received. 177 /// 178 /// The stream will remain in a read-ready state until calls to `poll_read` 179 /// return `NotReady`. 180 /// 181 /// # Panics 182 /// 183 /// This function panics if: 184 /// 185 /// * `ready` includes writable. 186 /// * called from outside of a task context. 187 /// 188 /// # Examples 189 /// 190 /// ``` 191 /// # extern crate mio; 192 /// # extern crate tokio; 193 /// # extern crate futures; 194 /// use mio::Ready; 195 /// use futures::Async; 196 /// use futures::Future; 197 /// use tokio::net::TcpStream; 198 /// use std::net::SocketAddr; 199 /// 200 /// # fn main() -> Result<(), Box<std::error::Error>> { 201 /// let addr = "127.0.0.1:34254".parse::<SocketAddr>()?; 202 /// let stream = TcpStream::connect(&addr); 203 /// stream.map(|stream| { 204 /// match stream.poll_read_ready(Ready::readable()) { 205 /// Ok(Async::Ready(_)) => println!("read ready"), 206 /// Ok(Async::NotReady) => println!("not read ready"), 207 /// Err(e) => eprintln!("got error: {}", e), 208 /// } 209 /// }); 210 /// # Ok(()) 211 /// # } 212 /// ``` poll_read_ready(&self, mask: mio::Ready) -> Poll<mio::Ready, io::Error>213 pub fn poll_read_ready(&self, mask: mio::Ready) -> Poll<mio::Ready, io::Error> { 214 self.io.poll_read_ready(mask) 215 } 216 217 /// Check the TCP stream's write readiness state. 218 /// 219 /// This always checks for writable readiness and also checks for HUP 220 /// readiness on platforms that support it. 221 /// 222 /// If the resource is not ready for a write then `Async::NotReady` is 223 /// returned and the current task is notified once a new event is received. 224 /// 225 /// The I/O resource will remain in a write-ready state until calls to 226 /// `poll_write` return `NotReady`. 227 /// 228 /// # Panics 229 /// 230 /// This function panics if called from outside of a task context. 231 /// 232 /// # Examples 233 /// 234 /// ``` 235 /// # extern crate tokio; 236 /// # extern crate futures; 237 /// use futures::Async; 238 /// use futures::Future; 239 /// use tokio::net::TcpStream; 240 /// use std::net::SocketAddr; 241 /// 242 /// # fn main() -> Result<(), Box<std::error::Error>> { 243 /// let addr = "127.0.0.1:34254".parse::<SocketAddr>()?; 244 /// let stream = TcpStream::connect(&addr); 245 /// stream.map(|stream| { 246 /// match stream.poll_write_ready() { 247 /// Ok(Async::Ready(_)) => println!("write ready"), 248 /// Ok(Async::NotReady) => println!("not write ready"), 249 /// Err(e) => eprintln!("got error: {}", e), 250 /// } 251 /// }); 252 /// # Ok(()) 253 /// # } 254 /// ``` poll_write_ready(&self) -> Poll<mio::Ready, io::Error>255 pub fn poll_write_ready(&self) -> Poll<mio::Ready, io::Error> { 256 self.io.poll_write_ready() 257 } 258 259 /// Returns the local address that this stream is bound to. 260 /// 261 /// # Examples 262 /// 263 /// ``` 264 /// # extern crate tokio; 265 /// # extern crate futures; 266 /// use tokio::net::TcpStream; 267 /// use futures::Future; 268 /// use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; 269 /// 270 /// # fn main() -> Result<(), Box<std::error::Error>> { 271 /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?; 272 /// let stream = TcpStream::connect(&addr); 273 /// stream.map(|stream| { 274 /// assert_eq!(stream.local_addr().unwrap(), 275 /// SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 8080))); 276 /// }); 277 /// # Ok(()) 278 /// # } 279 /// ``` local_addr(&self) -> io::Result<SocketAddr>280 pub fn local_addr(&self) -> io::Result<SocketAddr> { 281 self.io.get_ref().local_addr() 282 } 283 284 /// Returns the remote address that this stream is connected to. 285 /// # Examples 286 /// 287 /// ``` 288 /// # extern crate tokio; 289 /// # extern crate futures; 290 /// use tokio::net::TcpStream; 291 /// use futures::Future; 292 /// use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; 293 /// 294 /// # fn main() -> Result<(), Box<std::error::Error>> { 295 /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?; 296 /// let stream = TcpStream::connect(&addr); 297 /// stream.map(|stream| { 298 /// assert_eq!(stream.peer_addr().unwrap(), 299 /// SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 8080))); 300 /// }); 301 /// # Ok(()) 302 /// # } 303 /// ``` peer_addr(&self) -> io::Result<SocketAddr>304 pub fn peer_addr(&self) -> io::Result<SocketAddr> { 305 self.io.get_ref().peer_addr() 306 } 307 308 #[deprecated(since = "0.1.2", note = "use poll_peek instead")] 309 #[doc(hidden)] peek(&mut self, buf: &mut [u8]) -> io::Result<usize>310 pub fn peek(&mut self, buf: &mut [u8]) -> io::Result<usize> { 311 match self.poll_peek(buf)? { 312 Async::Ready(n) => Ok(n), 313 Async::NotReady => Err(io::ErrorKind::WouldBlock.into()), 314 } 315 } 316 317 /// Receives data on the socket from the remote address to which it is 318 /// connected, without removing that data from the queue. On success, 319 /// returns the number of bytes peeked. 320 /// 321 /// Successive calls return the same data. This is accomplished by passing 322 /// `MSG_PEEK` as a flag to the underlying recv system call. 323 /// 324 /// # Return 325 /// 326 /// On success, returns `Ok(Async::Ready(num_bytes_read))`. 327 /// 328 /// If no data is available for reading, the method returns 329 /// `Ok(Async::NotReady)` and arranges for the current task to receive a 330 /// notification when the socket becomes readable or is closed. 331 /// 332 /// # Panics 333 /// 334 /// This function will panic if called from outside of a task context. 335 /// 336 /// # Examples 337 /// 338 /// ``` 339 /// # extern crate tokio; 340 /// # extern crate futures; 341 /// use tokio::net::TcpStream; 342 /// use futures::Async; 343 /// use futures::Future; 344 /// use std::net::SocketAddr; 345 /// 346 /// # fn main() -> Result<(), Box<std::error::Error>> { 347 /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?; 348 /// let stream = TcpStream::connect(&addr); 349 /// stream.map(|mut stream| { 350 /// let mut buf = [0; 10]; 351 /// match stream.poll_peek(&mut buf) { 352 /// Ok(Async::Ready(len)) => println!("read {} bytes", len), 353 /// Ok(Async::NotReady) => println!("no data available"), 354 /// Err(e) => eprintln!("got error: {}", e), 355 /// } 356 /// }); 357 /// # Ok(()) 358 /// # } 359 /// ``` poll_peek(&mut self, buf: &mut [u8]) -> Poll<usize, io::Error>360 pub fn poll_peek(&mut self, buf: &mut [u8]) -> Poll<usize, io::Error> { 361 try_ready!(self.io.poll_read_ready(mio::Ready::readable())); 362 363 match self.io.get_ref().peek(buf) { 364 Ok(ret) => Ok(ret.into()), 365 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 366 self.io.clear_read_ready(mio::Ready::readable())?; 367 Ok(Async::NotReady) 368 } 369 Err(e) => Err(e), 370 } 371 } 372 373 /// Shuts down the read, write, or both halves of this connection. 374 /// 375 /// This function will cause all pending and future I/O on the specified 376 /// portions to return immediately with an appropriate value (see the 377 /// documentation of `Shutdown`). 378 /// 379 /// # Examples 380 /// 381 /// ``` 382 /// # extern crate tokio; 383 /// # extern crate futures; 384 /// use tokio::net::TcpStream; 385 /// use futures::Future; 386 /// use std::net::{Shutdown, SocketAddr}; 387 /// 388 /// # fn main() -> Result<(), Box<std::error::Error>> { 389 /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?; 390 /// let stream = TcpStream::connect(&addr); 391 /// stream.map(|stream| { 392 /// stream.shutdown(Shutdown::Both) 393 /// }); 394 /// # Ok(()) 395 /// # } 396 /// ``` shutdown(&self, how: Shutdown) -> io::Result<()>397 pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { 398 self.io.get_ref().shutdown(how) 399 } 400 401 /// Gets the value of the `TCP_NODELAY` option on this socket. 402 /// 403 /// For more information about this option, see [`set_nodelay`]. 404 /// 405 /// [`set_nodelay`]: #method.set_nodelay 406 /// 407 /// # Examples 408 /// 409 /// ``` 410 /// # extern crate tokio; 411 /// # extern crate futures; 412 /// use tokio::net::TcpStream; 413 /// use futures::Future; 414 /// use std::net::SocketAddr; 415 /// 416 /// # fn main() -> Result<(), Box<std::error::Error>> { 417 /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?; 418 /// let stream = TcpStream::connect(&addr); 419 /// stream.map(|stream| { 420 /// stream.set_nodelay(true).expect("set_nodelay call failed");; 421 /// assert_eq!(stream.nodelay().unwrap_or(false), true); 422 /// }); 423 /// # Ok(()) 424 /// # } 425 /// ``` nodelay(&self) -> io::Result<bool>426 pub fn nodelay(&self) -> io::Result<bool> { 427 self.io.get_ref().nodelay() 428 } 429 430 /// Sets the value of the `TCP_NODELAY` option on this socket. 431 /// 432 /// If set, this option disables the Nagle algorithm. This means that 433 /// segments are always sent as soon as possible, even if there is only a 434 /// small amount of data. When not set, data is buffered until there is a 435 /// sufficient amount to send out, thereby avoiding the frequent sending of 436 /// small packets. 437 /// 438 /// # Examples 439 /// 440 /// ``` 441 /// # extern crate tokio; 442 /// # extern crate futures; 443 /// use tokio::net::TcpStream; 444 /// use futures::Future; 445 /// use std::net::SocketAddr; 446 /// 447 /// # fn main() -> Result<(), Box<std::error::Error>> { 448 /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?; 449 /// let stream = TcpStream::connect(&addr); 450 /// stream.map(|stream| { 451 /// stream.set_nodelay(true).expect("set_nodelay call failed"); 452 /// }); 453 /// # Ok(()) 454 /// # } 455 /// ``` set_nodelay(&self, nodelay: bool) -> io::Result<()>456 pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> { 457 self.io.get_ref().set_nodelay(nodelay) 458 } 459 460 /// Gets the value of the `SO_RCVBUF` option on this socket. 461 /// 462 /// For more information about this option, see [`set_recv_buffer_size`]. 463 /// 464 /// [`set_recv_buffer_size`]: #tymethod.set_recv_buffer_size 465 /// 466 /// # Examples 467 /// 468 /// ``` 469 /// # extern crate tokio; 470 /// # extern crate futures; 471 /// use tokio::net::TcpStream; 472 /// use futures::Future; 473 /// use std::net::SocketAddr; 474 /// 475 /// # fn main() -> Result<(), Box<std::error::Error>> { 476 /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?; 477 /// let stream = TcpStream::connect(&addr); 478 /// stream.map(|stream| { 479 /// stream.set_recv_buffer_size(100).expect("set_recv_buffer_size failed"); 480 /// assert_eq!(stream.recv_buffer_size().unwrap_or(0), 100); 481 /// }); 482 /// # Ok(()) 483 /// # } 484 /// ``` recv_buffer_size(&self) -> io::Result<usize>485 pub fn recv_buffer_size(&self) -> io::Result<usize> { 486 self.io.get_ref().recv_buffer_size() 487 } 488 489 /// Sets the value of the `SO_RCVBUF` option on this socket. 490 /// 491 /// Changes the size of the operating system's receive buffer associated 492 /// with the socket. 493 /// 494 /// # Examples 495 /// 496 /// ``` 497 /// # extern crate tokio; 498 /// # extern crate futures; 499 /// use tokio::net::TcpStream; 500 /// use futures::Future; 501 /// use std::net::SocketAddr; 502 /// 503 /// # fn main() -> Result<(), Box<std::error::Error>> { 504 /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?; 505 /// let stream = TcpStream::connect(&addr); 506 /// stream.map(|stream| { 507 /// stream.set_recv_buffer_size(100).expect("set_recv_buffer_size failed"); 508 /// }); 509 /// # Ok(()) 510 /// # } 511 /// ``` set_recv_buffer_size(&self, size: usize) -> io::Result<()>512 pub fn set_recv_buffer_size(&self, size: usize) -> io::Result<()> { 513 self.io.get_ref().set_recv_buffer_size(size) 514 } 515 516 /// Gets the value of the `SO_SNDBUF` option on this socket. 517 /// 518 /// For more information about this option, see [`set_send_buffer`]. 519 /// 520 /// [`set_send_buffer`]: #tymethod.set_send_buffer 521 /// 522 /// # Examples 523 /// 524 /// ``` 525 /// # extern crate tokio; 526 /// # extern crate futures; 527 /// use tokio::net::TcpStream; 528 /// use futures::Future; 529 /// use std::net::SocketAddr; 530 /// 531 /// # fn main() -> Result<(), Box<std::error::Error>> { 532 /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?; 533 /// let stream = TcpStream::connect(&addr); 534 /// stream.map(|stream| { 535 /// stream.set_send_buffer_size(100).expect("set_send_buffer_size failed"); 536 /// assert_eq!(stream.send_buffer_size().unwrap_or(0), 100); 537 /// }); 538 /// # Ok(()) 539 /// # } 540 /// ``` send_buffer_size(&self) -> io::Result<usize>541 pub fn send_buffer_size(&self) -> io::Result<usize> { 542 self.io.get_ref().send_buffer_size() 543 } 544 545 /// Sets the value of the `SO_SNDBUF` option on this socket. 546 /// 547 /// Changes the size of the operating system's send buffer associated with 548 /// the socket. 549 /// 550 /// # Examples 551 /// 552 /// ``` 553 /// # extern crate tokio; 554 /// # extern crate futures; 555 /// use tokio::net::TcpStream; 556 /// use futures::Future; 557 /// use std::net::SocketAddr; 558 /// 559 /// # fn main() -> Result<(), Box<std::error::Error>> { 560 /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?; 561 /// let stream = TcpStream::connect(&addr); 562 /// stream.map(|stream| { 563 /// stream.set_send_buffer_size(100).expect("set_send_buffer_size failed"); 564 /// }); 565 /// # Ok(()) 566 /// # } 567 /// ``` set_send_buffer_size(&self, size: usize) -> io::Result<()>568 pub fn set_send_buffer_size(&self, size: usize) -> io::Result<()> { 569 self.io.get_ref().set_send_buffer_size(size) 570 } 571 572 /// Returns whether keepalive messages are enabled on this socket, and if so 573 /// the duration of time between them. 574 /// 575 /// For more information about this option, see [`set_keepalive`]. 576 /// 577 /// [`set_keepalive`]: #tymethod.set_keepalive 578 /// 579 /// # Examples 580 /// 581 /// ``` 582 /// # extern crate tokio; 583 /// # extern crate futures; 584 /// use tokio::net::TcpStream; 585 /// use futures::Future; 586 /// use std::net::SocketAddr; 587 /// 588 /// # fn main() -> Result<(), Box<std::error::Error>> { 589 /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?; 590 /// let stream = TcpStream::connect(&addr); 591 /// stream.map(|stream| { 592 /// stream.set_keepalive(None).expect("set_keepalive failed"); 593 /// assert_eq!(stream.keepalive().unwrap(), None); 594 /// }); 595 /// # Ok(()) 596 /// # } 597 /// ``` keepalive(&self) -> io::Result<Option<Duration>>598 pub fn keepalive(&self) -> io::Result<Option<Duration>> { 599 self.io.get_ref().keepalive() 600 } 601 602 /// Sets whether keepalive messages are enabled to be sent on this socket. 603 /// 604 /// On Unix, this option will set the `SO_KEEPALIVE` as well as the 605 /// `TCP_KEEPALIVE` or `TCP_KEEPIDLE` option (depending on your platform). 606 /// On Windows, this will set the `SIO_KEEPALIVE_VALS` option. 607 /// 608 /// If `None` is specified then keepalive messages are disabled, otherwise 609 /// the duration specified will be the time to remain idle before sending a 610 /// TCP keepalive probe. 611 /// 612 /// Some platforms specify this value in seconds, so sub-second 613 /// specifications may be omitted. 614 /// 615 /// # Examples 616 /// 617 /// ``` 618 /// # extern crate tokio; 619 /// # extern crate futures; 620 /// use tokio::net::TcpStream; 621 /// use futures::Future; 622 /// use std::net::SocketAddr; 623 /// 624 /// # fn main() -> Result<(), Box<std::error::Error>> { 625 /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?; 626 /// let stream = TcpStream::connect(&addr); 627 /// stream.map(|stream| { 628 /// stream.set_keepalive(None).expect("set_keepalive failed"); 629 /// }); 630 /// # Ok(()) 631 /// # } 632 /// ``` set_keepalive(&self, keepalive: Option<Duration>) -> io::Result<()>633 pub fn set_keepalive(&self, keepalive: Option<Duration>) -> io::Result<()> { 634 self.io.get_ref().set_keepalive(keepalive) 635 } 636 637 /// Gets the value of the `IP_TTL` option for this socket. 638 /// 639 /// For more information about this option, see [`set_ttl`]. 640 /// 641 /// [`set_ttl`]: #tymethod.set_ttl 642 /// 643 /// # Examples 644 /// 645 /// ``` 646 /// # extern crate tokio; 647 /// # extern crate futures; 648 /// use tokio::net::TcpStream; 649 /// use futures::Future; 650 /// use std::net::SocketAddr; 651 /// 652 /// # fn main() -> Result<(), Box<std::error::Error>> { 653 /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?; 654 /// let stream = TcpStream::connect(&addr); 655 /// stream.map(|stream| { 656 /// stream.set_ttl(100).expect("set_ttl failed"); 657 /// assert_eq!(stream.ttl().unwrap_or(0), 100); 658 /// }); 659 /// # Ok(()) 660 /// # } 661 /// ``` ttl(&self) -> io::Result<u32>662 pub fn ttl(&self) -> io::Result<u32> { 663 self.io.get_ref().ttl() 664 } 665 666 /// Sets the value for the `IP_TTL` option on this socket. 667 /// 668 /// This value sets the time-to-live field that is used in every packet sent 669 /// from this socket. 670 /// 671 /// # Examples 672 /// 673 /// ``` 674 /// # extern crate tokio; 675 /// # extern crate futures; 676 /// use tokio::net::TcpStream; 677 /// use futures::Future; 678 /// use std::net::SocketAddr; 679 /// 680 /// # fn main() -> Result<(), Box<std::error::Error>> { 681 /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?; 682 /// let stream = TcpStream::connect(&addr); 683 /// stream.map(|stream| { 684 /// stream.set_ttl(100).expect("set_ttl failed"); 685 /// }); 686 /// # Ok(()) 687 /// # } 688 /// ``` set_ttl(&self, ttl: u32) -> io::Result<()>689 pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { 690 self.io.get_ref().set_ttl(ttl) 691 } 692 693 /// Reads the linger duration for this socket by getting the `SO_LINGER` 694 /// option. 695 /// 696 /// For more information about this option, see [`set_linger`]. 697 /// 698 /// [`set_linger`]: #tymethod.set_linger 699 /// 700 /// # Examples 701 /// 702 /// ``` 703 /// # extern crate tokio; 704 /// # extern crate futures; 705 /// use tokio::net::TcpStream; 706 /// use futures::Future; 707 /// use std::net::SocketAddr; 708 /// 709 /// # fn main() -> Result<(), Box<std::error::Error>> { 710 /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?; 711 /// let stream = TcpStream::connect(&addr); 712 /// stream.map(|stream| { 713 /// stream.set_linger(None).expect("set_linger failed"); 714 /// assert_eq!(stream.linger().unwrap(), None); 715 /// }); 716 /// # Ok(()) 717 /// # } 718 /// ``` linger(&self) -> io::Result<Option<Duration>>719 pub fn linger(&self) -> io::Result<Option<Duration>> { 720 self.io.get_ref().linger() 721 } 722 723 /// Sets the linger duration of this socket by setting the `SO_LINGER` 724 /// option. 725 /// 726 /// This option controls the action taken when a stream has unsent messages 727 /// and the stream is closed. If `SO_LINGER` is set, the system 728 /// shall block the process until it can transmit the data or until the 729 /// time expires. 730 /// 731 /// If `SO_LINGER` is not specified, and the stream is closed, the system 732 /// handles the call in a way that allows the process to continue as quickly 733 /// as possible. 734 /// 735 /// # Examples 736 /// 737 /// ``` 738 /// # extern crate tokio; 739 /// # extern crate futures; 740 /// use tokio::net::TcpStream; 741 /// use futures::Future; 742 /// use std::net::SocketAddr; 743 /// 744 /// # fn main() -> Result<(), Box<std::error::Error>> { 745 /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?; 746 /// let stream = TcpStream::connect(&addr); 747 /// stream.map(|stream| { 748 /// stream.set_linger(None).expect("set_linger failed"); 749 /// }); 750 /// # Ok(()) 751 /// # } 752 /// ``` set_linger(&self, dur: Option<Duration>) -> io::Result<()>753 pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> { 754 self.io.get_ref().set_linger(dur) 755 } 756 757 /// Creates a new independently owned handle to the underlying socket. 758 /// 759 /// The returned `TcpStream` is a reference to the same stream that this 760 /// object references. Both handles will read and write the same stream of 761 /// data, and options set on one stream will be propagated to the other 762 /// stream. 763 /// 764 /// # Examples 765 /// 766 /// ``` 767 /// # extern crate tokio; 768 /// # extern crate futures; 769 /// use tokio::net::TcpStream; 770 /// use futures::Future; 771 /// use std::net::SocketAddr; 772 /// 773 /// # fn main() -> Result<(), Box<std::error::Error>> { 774 /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?; 775 /// let stream = TcpStream::connect(&addr); 776 /// stream.map(|stream| { 777 /// let clone = stream.try_clone().unwrap(); 778 /// }); 779 /// # Ok(()) 780 /// # } 781 /// ``` 782 #[deprecated(since = "0.1.14", note = "use `split()` instead")] 783 #[doc(hidden)] try_clone(&self) -> io::Result<TcpStream>784 pub fn try_clone(&self) -> io::Result<TcpStream> { 785 // Rationale for deprecation: 786 // - https://github.com/tokio-rs/tokio/pull/824 787 // - https://github.com/tokio-rs/tokio/issues/774#issuecomment-451059317 788 let msg = "`TcpStream::try_clone()` is deprecated because it doesn't work as intended"; 789 Err(io::Error::new(io::ErrorKind::Other, msg)) 790 } 791 } 792 793 // ===== impl Read / Write ===== 794 795 impl Read for TcpStream { read(&mut self, buf: &mut [u8]) -> io::Result<usize>796 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { 797 self.io.read(buf) 798 } 799 } 800 801 impl Write for TcpStream { write(&mut self, buf: &[u8]) -> io::Result<usize>802 fn write(&mut self, buf: &[u8]) -> io::Result<usize> { 803 self.io.write(buf) 804 } flush(&mut self) -> io::Result<()>805 fn flush(&mut self) -> io::Result<()> { 806 Ok(()) 807 } 808 } 809 810 impl AsyncRead for TcpStream { prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool811 unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool { 812 false 813 } 814 read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error>815 fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> { 816 <&TcpStream>::read_buf(&mut &*self, buf) 817 } 818 } 819 820 impl AsyncWrite for TcpStream { shutdown(&mut self) -> Poll<(), io::Error>821 fn shutdown(&mut self) -> Poll<(), io::Error> { 822 <&TcpStream>::shutdown(&mut &*self) 823 } 824 write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error>825 fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> { 826 <&TcpStream>::write_buf(&mut &*self, buf) 827 } 828 } 829 830 // ===== impl Read / Write for &'a ===== 831 832 impl<'a> Read for &'a TcpStream { read(&mut self, buf: &mut [u8]) -> io::Result<usize>833 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { 834 (&self.io).read(buf) 835 } 836 } 837 838 impl<'a> Write for &'a TcpStream { write(&mut self, buf: &[u8]) -> io::Result<usize>839 fn write(&mut self, buf: &[u8]) -> io::Result<usize> { 840 (&self.io).write(buf) 841 } 842 flush(&mut self) -> io::Result<()>843 fn flush(&mut self) -> io::Result<()> { 844 (&self.io).flush() 845 } 846 } 847 848 impl<'a> AsyncRead for &'a TcpStream { prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool849 unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool { 850 false 851 } 852 read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error>853 fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> { 854 if let Async::NotReady = self.io.poll_read_ready(mio::Ready::readable())? { 855 return Ok(Async::NotReady); 856 } 857 858 let r = unsafe { 859 // The `IoVec` type can't have a 0-length size, so we create a bunch 860 // of dummy versions on the stack with 1 length which we'll quickly 861 // overwrite. 862 let b1: &mut [u8] = &mut [0]; 863 let b2: &mut [u8] = &mut [0]; 864 let b3: &mut [u8] = &mut [0]; 865 let b4: &mut [u8] = &mut [0]; 866 let b5: &mut [u8] = &mut [0]; 867 let b6: &mut [u8] = &mut [0]; 868 let b7: &mut [u8] = &mut [0]; 869 let b8: &mut [u8] = &mut [0]; 870 let b9: &mut [u8] = &mut [0]; 871 let b10: &mut [u8] = &mut [0]; 872 let b11: &mut [u8] = &mut [0]; 873 let b12: &mut [u8] = &mut [0]; 874 let b13: &mut [u8] = &mut [0]; 875 let b14: &mut [u8] = &mut [0]; 876 let b15: &mut [u8] = &mut [0]; 877 let b16: &mut [u8] = &mut [0]; 878 let mut bufs: [&mut IoVec; 16] = [ 879 b1.into(), 880 b2.into(), 881 b3.into(), 882 b4.into(), 883 b5.into(), 884 b6.into(), 885 b7.into(), 886 b8.into(), 887 b9.into(), 888 b10.into(), 889 b11.into(), 890 b12.into(), 891 b13.into(), 892 b14.into(), 893 b15.into(), 894 b16.into(), 895 ]; 896 let n = buf.bytes_vec_mut(&mut bufs); 897 self.io.get_ref().read_bufs(&mut bufs[..n]) 898 }; 899 900 match r { 901 Ok(n) => { 902 unsafe { 903 buf.advance_mut(n); 904 } 905 Ok(Async::Ready(n)) 906 } 907 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 908 self.io.clear_read_ready(mio::Ready::readable())?; 909 Ok(Async::NotReady) 910 } 911 Err(e) => Err(e), 912 } 913 } 914 } 915 916 impl<'a> AsyncWrite for &'a TcpStream { shutdown(&mut self) -> Poll<(), io::Error>917 fn shutdown(&mut self) -> Poll<(), io::Error> { 918 Ok(().into()) 919 } 920 write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error>921 fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> { 922 if let Async::NotReady = self.io.poll_write_ready()? { 923 return Ok(Async::NotReady); 924 } 925 926 let r = { 927 // The `IoVec` type can't have a zero-length size, so create a dummy 928 // version from a 1-length slice which we'll overwrite with the 929 // `bytes_vec` method. 930 static DUMMY: &[u8] = &[0]; 931 let iovec = <&IoVec>::from(DUMMY); 932 let mut bufs = [iovec; 64]; 933 let n = buf.bytes_vec(&mut bufs); 934 self.io.get_ref().write_bufs(&bufs[..n]) 935 }; 936 match r { 937 Ok(n) => { 938 buf.advance(n); 939 Ok(Async::Ready(n)) 940 } 941 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 942 self.io.clear_write_ready()?; 943 Ok(Async::NotReady) 944 } 945 Err(e) => Err(e), 946 } 947 } 948 } 949 950 impl fmt::Debug for TcpStream { fmt(&self, f: &mut fmt::Formatter) -> fmt::Result951 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 952 self.io.get_ref().fmt(f) 953 } 954 } 955 956 impl Future for ConnectFuture { 957 type Item = TcpStream; 958 type Error = io::Error; 959 poll(&mut self) -> Poll<TcpStream, io::Error>960 fn poll(&mut self) -> Poll<TcpStream, io::Error> { 961 self.inner.poll() 962 } 963 } 964 965 impl ConnectFutureState { poll_inner<F>(&mut self, f: F) -> Poll<TcpStream, io::Error> where F: FnOnce(&mut PollEvented<mio::net::TcpStream>) -> Poll<mio::Ready, io::Error>,966 fn poll_inner<F>(&mut self, f: F) -> Poll<TcpStream, io::Error> 967 where 968 F: FnOnce(&mut PollEvented<mio::net::TcpStream>) -> Poll<mio::Ready, io::Error>, 969 { 970 { 971 let stream = match *self { 972 ConnectFutureState::Waiting(ref mut s) => s, 973 ConnectFutureState::Error(_) => { 974 let e = match mem::replace(self, ConnectFutureState::Empty) { 975 ConnectFutureState::Error(e) => e, 976 _ => panic!(), 977 }; 978 return Err(e); 979 } 980 ConnectFutureState::Empty => panic!("can't poll TCP stream twice"), 981 }; 982 983 // Once we've connected, wait for the stream to be writable as 984 // that's when the actual connection has been initiated. Once we're 985 // writable we check for `take_socket_error` to see if the connect 986 // actually hit an error or not. 987 // 988 // If all that succeeded then we ship everything on up. 989 if let Async::NotReady = f(&mut stream.io)? { 990 return Ok(Async::NotReady); 991 } 992 993 if let Some(e) = stream.io.get_ref().take_error()? { 994 return Err(e); 995 } 996 } 997 998 match mem::replace(self, ConnectFutureState::Empty) { 999 ConnectFutureState::Waiting(stream) => Ok(Async::Ready(stream)), 1000 _ => panic!(), 1001 } 1002 } 1003 } 1004 1005 impl Future for ConnectFutureState { 1006 type Item = TcpStream; 1007 type Error = io::Error; 1008 poll(&mut self) -> Poll<TcpStream, io::Error>1009 fn poll(&mut self) -> Poll<TcpStream, io::Error> { 1010 self.poll_inner(|io| io.poll_write_ready()) 1011 } 1012 } 1013 1014 #[cfg(unix)] 1015 mod sys { 1016 use super::TcpStream; 1017 use std::os::unix::prelude::*; 1018 1019 impl AsRawFd for TcpStream { as_raw_fd(&self) -> RawFd1020 fn as_raw_fd(&self) -> RawFd { 1021 self.io.get_ref().as_raw_fd() 1022 } 1023 } 1024 } 1025 1026 #[cfg(windows)] 1027 mod sys { 1028 // TODO: let's land these upstream with mio and then we can add them here. 1029 // 1030 // use std::os::windows::prelude::*; 1031 // use super::TcpStream; 1032 // 1033 // impl AsRawHandle for TcpStream { 1034 // fn as_raw_handle(&self) -> RawHandle { 1035 // self.io.get_ref().as_raw_handle() 1036 // } 1037 // } 1038 } 1039