1 use std::fmt;
2 use std::io::{self, Read, Write};
3 use std::mem;
4 use std::net::{self, Shutdown, SocketAddr};
5 use std::time::Duration;
6 
7 use bytes::{Buf, BufMut};
8 use futures::{Async, Future, Poll};
9 use iovec::IoVec;
10 use mio;
11 use tokio_io::{AsyncRead, AsyncWrite};
12 use tokio_reactor::{Handle, PollEvented};
13 
14 /// An I/O object representing a TCP stream connected to a remote endpoint.
15 ///
16 /// A TCP stream can either be created by connecting to an endpoint, via the
17 /// [`connect`] method, or by [accepting] a connection from a [listener].
18 ///
19 /// [`connect`]: struct.TcpStream.html#method.connect
20 /// [accepting]: struct.TcpListener.html#method.accept
21 /// [listener]: struct.TcpListener.html
22 ///
23 /// # Examples
24 ///
25 /// ```
26 /// # extern crate tokio;
27 /// # extern crate futures;
28 /// use futures::Future;
29 /// use tokio::io::AsyncWrite;
30 /// use tokio::net::TcpStream;
31 /// use std::net::SocketAddr;
32 ///
33 /// # fn main() -> Result<(), Box<std::error::Error>> {
34 /// let addr = "127.0.0.1:34254".parse::<SocketAddr>()?;
35 /// let stream = TcpStream::connect(&addr);
36 /// stream.map(|mut stream| {
37 ///     // Attempt to write bytes asynchronously to the stream
38 ///     stream.poll_write(&[1]);
39 /// });
40 /// # Ok(())
41 /// # }
42 /// ```
43 pub struct TcpStream {
44     io: PollEvented<mio::net::TcpStream>,
45 }
46 
47 /// Future returned by `TcpStream::connect` which will resolve to a `TcpStream`
48 /// when the stream is connected.
49 #[must_use = "futures do nothing unless polled"]
50 #[derive(Debug)]
51 pub struct ConnectFuture {
52     inner: ConnectFutureState,
53 }
54 
55 #[must_use = "futures do nothing unless polled"]
56 #[derive(Debug)]
57 enum ConnectFutureState {
58     Waiting(TcpStream),
59     Error(io::Error),
60     Empty,
61 }
62 
63 impl TcpStream {
64     /// Create a new TCP stream connected to the specified address.
65     ///
66     /// This function will create a new TCP socket and attempt to connect it to
67     /// the `addr` provided. The returned future will be resolved once the
68     /// stream has successfully connected, or it will return an error if one
69     /// occurs.
70     ///
71     /// # Examples
72     ///
73     /// ```
74     /// # extern crate tokio;
75     /// # extern crate futures;
76     /// use futures::Future;
77     /// use tokio::net::TcpStream;
78     /// use std::net::SocketAddr;
79     ///
80     /// # fn main() -> Result<(), Box<std::error::Error>> {
81     /// let addr = "127.0.0.1:34254".parse::<SocketAddr>()?;
82     /// let stream = TcpStream::connect(&addr)
83     ///     .map(|stream|
84     ///         println!("successfully connected to {}", stream.local_addr().unwrap()));
85     /// # Ok(())
86     /// # }
87     /// ```
connect(addr: &SocketAddr) -> ConnectFuture88     pub fn connect(addr: &SocketAddr) -> ConnectFuture {
89         use self::ConnectFutureState::*;
90 
91         let inner = match mio::net::TcpStream::connect(addr) {
92             Ok(tcp) => Waiting(TcpStream::new(tcp)),
93             Err(e) => Error(e),
94         };
95 
96         ConnectFuture { inner }
97     }
98 
new(connected: mio::net::TcpStream) -> TcpStream99     pub(crate) fn new(connected: mio::net::TcpStream) -> TcpStream {
100         let io = PollEvented::new(connected);
101         TcpStream { io }
102     }
103 
104     /// Create a new `TcpStream` from a `net::TcpStream`.
105     ///
106     /// This function will convert a TCP stream created by the standard library
107     /// to a TCP stream ready to be used with the provided event loop handle.
108     /// Use `Handle::default()` to lazily bind to an event loop, just like `connect` does.
109     ///
110     /// # Examples
111     ///
112     /// ```no_run
113     /// # extern crate tokio;
114     /// # extern crate tokio_reactor;
115     /// use tokio::net::TcpStream;
116     /// use std::net::TcpStream as StdTcpStream;
117     /// use tokio_reactor::Handle;
118     ///
119     /// # fn main() -> Result<(), Box<std::error::Error>> {
120     /// let std_stream = StdTcpStream::connect("127.0.0.1:34254")?;
121     /// let stream = TcpStream::from_std(std_stream, &Handle::default())?;
122     /// # Ok(())
123     /// # }
124     /// ```
from_std(stream: net::TcpStream, handle: &Handle) -> io::Result<TcpStream>125     pub fn from_std(stream: net::TcpStream, handle: &Handle) -> io::Result<TcpStream> {
126         let io = mio::net::TcpStream::from_stream(stream)?;
127         let io = PollEvented::new_with_handle(io, handle)?;
128 
129         Ok(TcpStream { io })
130     }
131 
132     /// Creates a new `TcpStream` from the pending socket inside the given
133     /// `std::net::TcpStream`, connecting it to the address specified.
134     ///
135     /// This constructor allows configuring the socket before it's actually
136     /// connected, and this function will transfer ownership to the returned
137     /// `TcpStream` if successful. An unconnected `TcpStream` can be created
138     /// with the `net2::TcpBuilder` type (and also configured via that route).
139     ///
140     /// The platform specific behavior of this function looks like:
141     ///
142     /// * On Unix, the socket is placed into nonblocking mode and then a
143     ///   `connect` call is issued.
144     ///
145     /// * On Windows, the address is stored internally and the connect operation
146     ///   is issued when the returned `TcpStream` is registered with an event
147     ///   loop. Note that on Windows you must `bind` a socket before it can be
148     ///   connected, so if a custom `TcpBuilder` is used it should be bound
149     ///   (perhaps to `INADDR_ANY`) before this method is called.
connect_std( stream: net::TcpStream, addr: &SocketAddr, handle: &Handle, ) -> ConnectFuture150     pub fn connect_std(
151         stream: net::TcpStream,
152         addr: &SocketAddr,
153         handle: &Handle,
154     ) -> ConnectFuture {
155         use self::ConnectFutureState::*;
156 
157         let io = mio::net::TcpStream::connect_stream(stream, addr)
158             .and_then(|io| PollEvented::new_with_handle(io, handle));
159 
160         let inner = match io {
161             Ok(io) => Waiting(TcpStream { io }),
162             Err(e) => Error(e),
163         };
164 
165         ConnectFuture { inner: inner }
166     }
167 
168     /// Check the TCP stream's read readiness state.
169     ///
170     /// The mask argument allows specifying what readiness to notify on. This
171     /// can be any value, including platform specific readiness, **except**
172     /// `writable`. HUP is always implicitly included on platforms that support
173     /// it.
174     ///
175     /// If the resource is not ready for a read then `Async::NotReady` is
176     /// returned and the current task is notified once a new event is received.
177     ///
178     /// The stream will remain in a read-ready state until calls to `poll_read`
179     /// return `NotReady`.
180     ///
181     /// # Panics
182     ///
183     /// This function panics if:
184     ///
185     /// * `ready` includes writable.
186     /// * called from outside of a task context.
187     ///
188     /// # Examples
189     ///
190     /// ```
191     /// # extern crate mio;
192     /// # extern crate tokio;
193     /// # extern crate futures;
194     /// use mio::Ready;
195     /// use futures::Async;
196     /// use futures::Future;
197     /// use tokio::net::TcpStream;
198     /// use std::net::SocketAddr;
199     ///
200     /// # fn main() -> Result<(), Box<std::error::Error>> {
201     /// let addr = "127.0.0.1:34254".parse::<SocketAddr>()?;
202     /// let stream = TcpStream::connect(&addr);
203     /// stream.map(|stream| {
204     ///     match stream.poll_read_ready(Ready::readable()) {
205     ///         Ok(Async::Ready(_)) => println!("read ready"),
206     ///         Ok(Async::NotReady) => println!("not read ready"),
207     ///         Err(e) => eprintln!("got error: {}", e),
208     /// }
209     /// });
210     /// # Ok(())
211     /// # }
212     /// ```
poll_read_ready(&self, mask: mio::Ready) -> Poll<mio::Ready, io::Error>213     pub fn poll_read_ready(&self, mask: mio::Ready) -> Poll<mio::Ready, io::Error> {
214         self.io.poll_read_ready(mask)
215     }
216 
217     /// Check the TCP stream's write readiness state.
218     ///
219     /// This always checks for writable readiness and also checks for HUP
220     /// readiness on platforms that support it.
221     ///
222     /// If the resource is not ready for a write then `Async::NotReady` is
223     /// returned and the current task is notified once a new event is received.
224     ///
225     /// The I/O resource will remain in a write-ready state until calls to
226     /// `poll_write` return `NotReady`.
227     ///
228     /// # Panics
229     ///
230     /// This function panics if called from outside of a task context.
231     ///
232     /// # Examples
233     ///
234     /// ```
235     /// # extern crate tokio;
236     /// # extern crate futures;
237     /// use futures::Async;
238     /// use futures::Future;
239     /// use tokio::net::TcpStream;
240     /// use std::net::SocketAddr;
241     ///
242     /// # fn main() -> Result<(), Box<std::error::Error>> {
243     /// let addr = "127.0.0.1:34254".parse::<SocketAddr>()?;
244     /// let stream = TcpStream::connect(&addr);
245     /// stream.map(|stream| {
246     ///     match stream.poll_write_ready() {
247     ///         Ok(Async::Ready(_)) => println!("write ready"),
248     ///         Ok(Async::NotReady) => println!("not write ready"),
249     ///         Err(e) => eprintln!("got error: {}", e),
250     /// }
251     /// });
252     /// # Ok(())
253     /// # }
254     /// ```
poll_write_ready(&self) -> Poll<mio::Ready, io::Error>255     pub fn poll_write_ready(&self) -> Poll<mio::Ready, io::Error> {
256         self.io.poll_write_ready()
257     }
258 
259     /// Returns the local address that this stream is bound to.
260     ///
261     /// # Examples
262     ///
263     /// ```
264     /// # extern crate tokio;
265     /// # extern crate futures;
266     /// use tokio::net::TcpStream;
267     /// use futures::Future;
268     /// use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
269     ///
270     /// # fn main() -> Result<(), Box<std::error::Error>> {
271     /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
272     /// let stream = TcpStream::connect(&addr);
273     /// stream.map(|stream| {
274     ///     assert_eq!(stream.local_addr().unwrap(),
275     ///         SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 8080)));
276     /// });
277     /// # Ok(())
278     /// # }
279     /// ```
local_addr(&self) -> io::Result<SocketAddr>280     pub fn local_addr(&self) -> io::Result<SocketAddr> {
281         self.io.get_ref().local_addr()
282     }
283 
284     /// Returns the remote address that this stream is connected to.
285     /// # Examples
286     ///
287     /// ```
288     /// # extern crate tokio;
289     /// # extern crate futures;
290     /// use tokio::net::TcpStream;
291     /// use futures::Future;
292     /// use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
293     ///
294     /// # fn main() -> Result<(), Box<std::error::Error>> {
295     /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
296     /// let stream = TcpStream::connect(&addr);
297     /// stream.map(|stream| {
298     ///     assert_eq!(stream.peer_addr().unwrap(),
299     ///         SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 8080)));
300     /// });
301     /// # Ok(())
302     /// # }
303     /// ```
peer_addr(&self) -> io::Result<SocketAddr>304     pub fn peer_addr(&self) -> io::Result<SocketAddr> {
305         self.io.get_ref().peer_addr()
306     }
307 
308     #[deprecated(since = "0.1.2", note = "use poll_peek instead")]
309     #[doc(hidden)]
peek(&mut self, buf: &mut [u8]) -> io::Result<usize>310     pub fn peek(&mut self, buf: &mut [u8]) -> io::Result<usize> {
311         match self.poll_peek(buf)? {
312             Async::Ready(n) => Ok(n),
313             Async::NotReady => Err(io::ErrorKind::WouldBlock.into()),
314         }
315     }
316 
317     /// Receives data on the socket from the remote address to which it is
318     /// connected, without removing that data from the queue. On success,
319     /// returns the number of bytes peeked.
320     ///
321     /// Successive calls return the same data. This is accomplished by passing
322     /// `MSG_PEEK` as a flag to the underlying recv system call.
323     ///
324     /// # Return
325     ///
326     /// On success, returns `Ok(Async::Ready(num_bytes_read))`.
327     ///
328     /// If no data is available for reading, the method returns
329     /// `Ok(Async::NotReady)` and arranges for the current task to receive a
330     /// notification when the socket becomes readable or is closed.
331     ///
332     /// # Panics
333     ///
334     /// This function will panic if called from outside of a task context.
335     ///
336     /// # Examples
337     ///
338     /// ```
339     /// # extern crate tokio;
340     /// # extern crate futures;
341     /// use tokio::net::TcpStream;
342     /// use futures::Async;
343     /// use futures::Future;
344     /// use std::net::SocketAddr;
345     ///
346     /// # fn main() -> Result<(), Box<std::error::Error>> {
347     /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
348     /// let stream = TcpStream::connect(&addr);
349     /// stream.map(|mut stream| {
350     ///     let mut buf = [0; 10];
351     ///     match stream.poll_peek(&mut buf) {
352     ///        Ok(Async::Ready(len)) => println!("read {} bytes", len),
353     ///        Ok(Async::NotReady) => println!("no data available"),
354     ///        Err(e) => eprintln!("got error: {}", e),
355     ///     }
356     /// });
357     /// # Ok(())
358     /// # }
359     /// ```
poll_peek(&mut self, buf: &mut [u8]) -> Poll<usize, io::Error>360     pub fn poll_peek(&mut self, buf: &mut [u8]) -> Poll<usize, io::Error> {
361         try_ready!(self.io.poll_read_ready(mio::Ready::readable()));
362 
363         match self.io.get_ref().peek(buf) {
364             Ok(ret) => Ok(ret.into()),
365             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
366                 self.io.clear_read_ready(mio::Ready::readable())?;
367                 Ok(Async::NotReady)
368             }
369             Err(e) => Err(e),
370         }
371     }
372 
373     /// Shuts down the read, write, or both halves of this connection.
374     ///
375     /// This function will cause all pending and future I/O on the specified
376     /// portions to return immediately with an appropriate value (see the
377     /// documentation of `Shutdown`).
378     ///
379     /// # Examples
380     ///
381     /// ```
382     /// # extern crate tokio;
383     /// # extern crate futures;
384     /// use tokio::net::TcpStream;
385     /// use futures::Future;
386     /// use std::net::{Shutdown, SocketAddr};
387     ///
388     /// # fn main() -> Result<(), Box<std::error::Error>> {
389     /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
390     /// let stream = TcpStream::connect(&addr);
391     /// stream.map(|stream| {
392     ///     stream.shutdown(Shutdown::Both)
393     /// });
394     /// # Ok(())
395     /// # }
396     /// ```
shutdown(&self, how: Shutdown) -> io::Result<()>397     pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
398         self.io.get_ref().shutdown(how)
399     }
400 
401     /// Gets the value of the `TCP_NODELAY` option on this socket.
402     ///
403     /// For more information about this option, see [`set_nodelay`].
404     ///
405     /// [`set_nodelay`]: #method.set_nodelay
406     ///
407     /// # Examples
408     ///
409     /// ```
410     /// # extern crate tokio;
411     /// # extern crate futures;
412     /// use tokio::net::TcpStream;
413     /// use futures::Future;
414     /// use std::net::SocketAddr;
415     ///
416     /// # fn main() -> Result<(), Box<std::error::Error>> {
417     /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
418     /// let stream = TcpStream::connect(&addr);
419     /// stream.map(|stream| {
420     ///     stream.set_nodelay(true).expect("set_nodelay call failed");;
421     ///     assert_eq!(stream.nodelay().unwrap_or(false), true);
422     /// });
423     /// # Ok(())
424     /// # }
425     /// ```
nodelay(&self) -> io::Result<bool>426     pub fn nodelay(&self) -> io::Result<bool> {
427         self.io.get_ref().nodelay()
428     }
429 
430     /// Sets the value of the `TCP_NODELAY` option on this socket.
431     ///
432     /// If set, this option disables the Nagle algorithm. This means that
433     /// segments are always sent as soon as possible, even if there is only a
434     /// small amount of data. When not set, data is buffered until there is a
435     /// sufficient amount to send out, thereby avoiding the frequent sending of
436     /// small packets.
437     ///
438     /// # Examples
439     ///
440     /// ```
441     /// # extern crate tokio;
442     /// # extern crate futures;
443     /// use tokio::net::TcpStream;
444     /// use futures::Future;
445     /// use std::net::SocketAddr;
446     ///
447     /// # fn main() -> Result<(), Box<std::error::Error>> {
448     /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
449     /// let stream = TcpStream::connect(&addr);
450     /// stream.map(|stream| {
451     ///     stream.set_nodelay(true).expect("set_nodelay call failed");
452     /// });
453     /// # Ok(())
454     /// # }
455     /// ```
set_nodelay(&self, nodelay: bool) -> io::Result<()>456     pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
457         self.io.get_ref().set_nodelay(nodelay)
458     }
459 
460     /// Gets the value of the `SO_RCVBUF` option on this socket.
461     ///
462     /// For more information about this option, see [`set_recv_buffer_size`].
463     ///
464     /// [`set_recv_buffer_size`]: #tymethod.set_recv_buffer_size
465     ///
466     /// # Examples
467     ///
468     /// ```
469     /// # extern crate tokio;
470     /// # extern crate futures;
471     /// use tokio::net::TcpStream;
472     /// use futures::Future;
473     /// use std::net::SocketAddr;
474     ///
475     /// # fn main() -> Result<(), Box<std::error::Error>> {
476     /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
477     /// let stream = TcpStream::connect(&addr);
478     /// stream.map(|stream| {
479     ///     stream.set_recv_buffer_size(100).expect("set_recv_buffer_size failed");
480     ///     assert_eq!(stream.recv_buffer_size().unwrap_or(0), 100);
481     /// });
482     /// # Ok(())
483     /// # }
484     /// ```
recv_buffer_size(&self) -> io::Result<usize>485     pub fn recv_buffer_size(&self) -> io::Result<usize> {
486         self.io.get_ref().recv_buffer_size()
487     }
488 
489     /// Sets the value of the `SO_RCVBUF` option on this socket.
490     ///
491     /// Changes the size of the operating system's receive buffer associated
492     /// with the socket.
493     ///
494     /// # Examples
495     ///
496     /// ```
497     /// # extern crate tokio;
498     /// # extern crate futures;
499     /// use tokio::net::TcpStream;
500     /// use futures::Future;
501     /// use std::net::SocketAddr;
502     ///
503     /// # fn main() -> Result<(), Box<std::error::Error>> {
504     /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
505     /// let stream = TcpStream::connect(&addr);
506     /// stream.map(|stream| {
507     ///     stream.set_recv_buffer_size(100).expect("set_recv_buffer_size failed");
508     /// });
509     /// # Ok(())
510     /// # }
511     /// ```
set_recv_buffer_size(&self, size: usize) -> io::Result<()>512     pub fn set_recv_buffer_size(&self, size: usize) -> io::Result<()> {
513         self.io.get_ref().set_recv_buffer_size(size)
514     }
515 
516     /// Gets the value of the `SO_SNDBUF` option on this socket.
517     ///
518     /// For more information about this option, see [`set_send_buffer`].
519     ///
520     /// [`set_send_buffer`]: #tymethod.set_send_buffer
521     ///
522     /// # Examples
523     ///
524     /// ```
525     /// # extern crate tokio;
526     /// # extern crate futures;
527     /// use tokio::net::TcpStream;
528     /// use futures::Future;
529     /// use std::net::SocketAddr;
530     ///
531     /// # fn main() -> Result<(), Box<std::error::Error>> {
532     /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
533     /// let stream = TcpStream::connect(&addr);
534     /// stream.map(|stream| {
535     ///     stream.set_send_buffer_size(100).expect("set_send_buffer_size failed");
536     ///     assert_eq!(stream.send_buffer_size().unwrap_or(0), 100);
537     /// });
538     /// # Ok(())
539     /// # }
540     /// ```
send_buffer_size(&self) -> io::Result<usize>541     pub fn send_buffer_size(&self) -> io::Result<usize> {
542         self.io.get_ref().send_buffer_size()
543     }
544 
545     /// Sets the value of the `SO_SNDBUF` option on this socket.
546     ///
547     /// Changes the size of the operating system's send buffer associated with
548     /// the socket.
549     ///
550     /// # Examples
551     ///
552     /// ```
553     /// # extern crate tokio;
554     /// # extern crate futures;
555     /// use tokio::net::TcpStream;
556     /// use futures::Future;
557     /// use std::net::SocketAddr;
558     ///
559     /// # fn main() -> Result<(), Box<std::error::Error>> {
560     /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
561     /// let stream = TcpStream::connect(&addr);
562     /// stream.map(|stream| {
563     ///     stream.set_send_buffer_size(100).expect("set_send_buffer_size failed");
564     /// });
565     /// # Ok(())
566     /// # }
567     /// ```
set_send_buffer_size(&self, size: usize) -> io::Result<()>568     pub fn set_send_buffer_size(&self, size: usize) -> io::Result<()> {
569         self.io.get_ref().set_send_buffer_size(size)
570     }
571 
572     /// Returns whether keepalive messages are enabled on this socket, and if so
573     /// the duration of time between them.
574     ///
575     /// For more information about this option, see [`set_keepalive`].
576     ///
577     /// [`set_keepalive`]: #tymethod.set_keepalive
578     ///
579     /// # Examples
580     ///
581     /// ```
582     /// # extern crate tokio;
583     /// # extern crate futures;
584     /// use tokio::net::TcpStream;
585     /// use futures::Future;
586     /// use std::net::SocketAddr;
587     ///
588     /// # fn main() -> Result<(), Box<std::error::Error>> {
589     /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
590     /// let stream = TcpStream::connect(&addr);
591     /// stream.map(|stream| {
592     ///     stream.set_keepalive(None).expect("set_keepalive failed");
593     ///     assert_eq!(stream.keepalive().unwrap(), None);
594     /// });
595     /// # Ok(())
596     /// # }
597     /// ```
keepalive(&self) -> io::Result<Option<Duration>>598     pub fn keepalive(&self) -> io::Result<Option<Duration>> {
599         self.io.get_ref().keepalive()
600     }
601 
602     /// Sets whether keepalive messages are enabled to be sent on this socket.
603     ///
604     /// On Unix, this option will set the `SO_KEEPALIVE` as well as the
605     /// `TCP_KEEPALIVE` or `TCP_KEEPIDLE` option (depending on your platform).
606     /// On Windows, this will set the `SIO_KEEPALIVE_VALS` option.
607     ///
608     /// If `None` is specified then keepalive messages are disabled, otherwise
609     /// the duration specified will be the time to remain idle before sending a
610     /// TCP keepalive probe.
611     ///
612     /// Some platforms specify this value in seconds, so sub-second
613     /// specifications may be omitted.
614     ///
615     /// # Examples
616     ///
617     /// ```
618     /// # extern crate tokio;
619     /// # extern crate futures;
620     /// use tokio::net::TcpStream;
621     /// use futures::Future;
622     /// use std::net::SocketAddr;
623     ///
624     /// # fn main() -> Result<(), Box<std::error::Error>> {
625     /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
626     /// let stream = TcpStream::connect(&addr);
627     /// stream.map(|stream| {
628     ///     stream.set_keepalive(None).expect("set_keepalive failed");
629     /// });
630     /// # Ok(())
631     /// # }
632     /// ```
set_keepalive(&self, keepalive: Option<Duration>) -> io::Result<()>633     pub fn set_keepalive(&self, keepalive: Option<Duration>) -> io::Result<()> {
634         self.io.get_ref().set_keepalive(keepalive)
635     }
636 
637     /// Gets the value of the `IP_TTL` option for this socket.
638     ///
639     /// For more information about this option, see [`set_ttl`].
640     ///
641     /// [`set_ttl`]: #tymethod.set_ttl
642     ///
643     /// # Examples
644     ///
645     /// ```
646     /// # extern crate tokio;
647     /// # extern crate futures;
648     /// use tokio::net::TcpStream;
649     /// use futures::Future;
650     /// use std::net::SocketAddr;
651     ///
652     /// # fn main() -> Result<(), Box<std::error::Error>> {
653     /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
654     /// let stream = TcpStream::connect(&addr);
655     /// stream.map(|stream| {
656     ///     stream.set_ttl(100).expect("set_ttl failed");
657     ///     assert_eq!(stream.ttl().unwrap_or(0), 100);
658     /// });
659     /// # Ok(())
660     /// # }
661     /// ```
ttl(&self) -> io::Result<u32>662     pub fn ttl(&self) -> io::Result<u32> {
663         self.io.get_ref().ttl()
664     }
665 
666     /// Sets the value for the `IP_TTL` option on this socket.
667     ///
668     /// This value sets the time-to-live field that is used in every packet sent
669     /// from this socket.
670     ///
671     /// # Examples
672     ///
673     /// ```
674     /// # extern crate tokio;
675     /// # extern crate futures;
676     /// use tokio::net::TcpStream;
677     /// use futures::Future;
678     /// use std::net::SocketAddr;
679     ///
680     /// # fn main() -> Result<(), Box<std::error::Error>> {
681     /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
682     /// let stream = TcpStream::connect(&addr);
683     /// stream.map(|stream| {
684     ///     stream.set_ttl(100).expect("set_ttl failed");
685     /// });
686     /// # Ok(())
687     /// # }
688     /// ```
set_ttl(&self, ttl: u32) -> io::Result<()>689     pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
690         self.io.get_ref().set_ttl(ttl)
691     }
692 
693     /// Reads the linger duration for this socket by getting the `SO_LINGER`
694     /// option.
695     ///
696     /// For more information about this option, see [`set_linger`].
697     ///
698     /// [`set_linger`]: #tymethod.set_linger
699     ///
700     /// # Examples
701     ///
702     /// ```
703     /// # extern crate tokio;
704     /// # extern crate futures;
705     /// use tokio::net::TcpStream;
706     /// use futures::Future;
707     /// use std::net::SocketAddr;
708     ///
709     /// # fn main() -> Result<(), Box<std::error::Error>> {
710     /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
711     /// let stream = TcpStream::connect(&addr);
712     /// stream.map(|stream| {
713     ///     stream.set_linger(None).expect("set_linger failed");
714     ///     assert_eq!(stream.linger().unwrap(), None);
715     /// });
716     /// # Ok(())
717     /// # }
718     /// ```
linger(&self) -> io::Result<Option<Duration>>719     pub fn linger(&self) -> io::Result<Option<Duration>> {
720         self.io.get_ref().linger()
721     }
722 
723     /// Sets the linger duration of this socket by setting the `SO_LINGER`
724     /// option.
725     ///
726     /// This option controls the action taken when a stream has unsent messages
727     /// and the stream is closed. If `SO_LINGER` is set, the system
728     /// shall block the process  until it can transmit the data or until the
729     /// time expires.
730     ///
731     /// If `SO_LINGER` is not specified, and the stream is closed, the system
732     /// handles the call in a way that allows the process to continue as quickly
733     /// as possible.
734     ///
735     /// # Examples
736     ///
737     /// ```
738     /// # extern crate tokio;
739     /// # extern crate futures;
740     /// use tokio::net::TcpStream;
741     /// use futures::Future;
742     /// use std::net::SocketAddr;
743     ///
744     /// # fn main() -> Result<(), Box<std::error::Error>> {
745     /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
746     /// let stream = TcpStream::connect(&addr);
747     /// stream.map(|stream| {
748     ///     stream.set_linger(None).expect("set_linger failed");
749     /// });
750     /// # Ok(())
751     /// # }
752     /// ```
set_linger(&self, dur: Option<Duration>) -> io::Result<()>753     pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> {
754         self.io.get_ref().set_linger(dur)
755     }
756 
757     /// Creates a new independently owned handle to the underlying socket.
758     ///
759     /// The returned `TcpStream` is a reference to the same stream that this
760     /// object references. Both handles will read and write the same stream of
761     /// data, and options set on one stream will be propagated to the other
762     /// stream.
763     ///
764     /// # Examples
765     ///
766     /// ```
767     /// # extern crate tokio;
768     /// # extern crate futures;
769     /// use tokio::net::TcpStream;
770     /// use futures::Future;
771     /// use std::net::SocketAddr;
772     ///
773     /// # fn main() -> Result<(), Box<std::error::Error>> {
774     /// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
775     /// let stream = TcpStream::connect(&addr);
776     /// stream.map(|stream| {
777     ///     let clone = stream.try_clone().unwrap();
778     /// });
779     /// # Ok(())
780     /// # }
781     /// ```
782     #[deprecated(since = "0.1.14", note = "use `split()` instead")]
783     #[doc(hidden)]
try_clone(&self) -> io::Result<TcpStream>784     pub fn try_clone(&self) -> io::Result<TcpStream> {
785         // Rationale for deprecation:
786         // - https://github.com/tokio-rs/tokio/pull/824
787         // - https://github.com/tokio-rs/tokio/issues/774#issuecomment-451059317
788         let msg = "`TcpStream::try_clone()` is deprecated because it doesn't work as intended";
789         Err(io::Error::new(io::ErrorKind::Other, msg))
790     }
791 }
792 
793 // ===== impl Read / Write =====
794 
795 impl Read for TcpStream {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>796     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
797         self.io.read(buf)
798     }
799 }
800 
801 impl Write for TcpStream {
write(&mut self, buf: &[u8]) -> io::Result<usize>802     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
803         self.io.write(buf)
804     }
flush(&mut self) -> io::Result<()>805     fn flush(&mut self) -> io::Result<()> {
806         Ok(())
807     }
808 }
809 
810 impl AsyncRead for TcpStream {
prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool811     unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
812         false
813     }
814 
read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error>815     fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
816         <&TcpStream>::read_buf(&mut &*self, buf)
817     }
818 }
819 
820 impl AsyncWrite for TcpStream {
shutdown(&mut self) -> Poll<(), io::Error>821     fn shutdown(&mut self) -> Poll<(), io::Error> {
822         <&TcpStream>::shutdown(&mut &*self)
823     }
824 
write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error>825     fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
826         <&TcpStream>::write_buf(&mut &*self, buf)
827     }
828 }
829 
830 // ===== impl Read / Write for &'a =====
831 
832 impl<'a> Read for &'a TcpStream {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>833     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
834         (&self.io).read(buf)
835     }
836 }
837 
838 impl<'a> Write for &'a TcpStream {
write(&mut self, buf: &[u8]) -> io::Result<usize>839     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
840         (&self.io).write(buf)
841     }
842 
flush(&mut self) -> io::Result<()>843     fn flush(&mut self) -> io::Result<()> {
844         (&self.io).flush()
845     }
846 }
847 
848 impl<'a> AsyncRead for &'a TcpStream {
prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool849     unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
850         false
851     }
852 
read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error>853     fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
854         if let Async::NotReady = self.io.poll_read_ready(mio::Ready::readable())? {
855             return Ok(Async::NotReady);
856         }
857 
858         let r = unsafe {
859             // The `IoVec` type can't have a 0-length size, so we create a bunch
860             // of dummy versions on the stack with 1 length which we'll quickly
861             // overwrite.
862             let b1: &mut [u8] = &mut [0];
863             let b2: &mut [u8] = &mut [0];
864             let b3: &mut [u8] = &mut [0];
865             let b4: &mut [u8] = &mut [0];
866             let b5: &mut [u8] = &mut [0];
867             let b6: &mut [u8] = &mut [0];
868             let b7: &mut [u8] = &mut [0];
869             let b8: &mut [u8] = &mut [0];
870             let b9: &mut [u8] = &mut [0];
871             let b10: &mut [u8] = &mut [0];
872             let b11: &mut [u8] = &mut [0];
873             let b12: &mut [u8] = &mut [0];
874             let b13: &mut [u8] = &mut [0];
875             let b14: &mut [u8] = &mut [0];
876             let b15: &mut [u8] = &mut [0];
877             let b16: &mut [u8] = &mut [0];
878             let mut bufs: [&mut IoVec; 16] = [
879                 b1.into(),
880                 b2.into(),
881                 b3.into(),
882                 b4.into(),
883                 b5.into(),
884                 b6.into(),
885                 b7.into(),
886                 b8.into(),
887                 b9.into(),
888                 b10.into(),
889                 b11.into(),
890                 b12.into(),
891                 b13.into(),
892                 b14.into(),
893                 b15.into(),
894                 b16.into(),
895             ];
896             let n = buf.bytes_vec_mut(&mut bufs);
897             self.io.get_ref().read_bufs(&mut bufs[..n])
898         };
899 
900         match r {
901             Ok(n) => {
902                 unsafe {
903                     buf.advance_mut(n);
904                 }
905                 Ok(Async::Ready(n))
906             }
907             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
908                 self.io.clear_read_ready(mio::Ready::readable())?;
909                 Ok(Async::NotReady)
910             }
911             Err(e) => Err(e),
912         }
913     }
914 }
915 
916 impl<'a> AsyncWrite for &'a TcpStream {
shutdown(&mut self) -> Poll<(), io::Error>917     fn shutdown(&mut self) -> Poll<(), io::Error> {
918         Ok(().into())
919     }
920 
write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error>921     fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
922         if let Async::NotReady = self.io.poll_write_ready()? {
923             return Ok(Async::NotReady);
924         }
925 
926         let r = {
927             // The `IoVec` type can't have a zero-length size, so create a dummy
928             // version from a 1-length slice which we'll overwrite with the
929             // `bytes_vec` method.
930             static DUMMY: &[u8] = &[0];
931             let iovec = <&IoVec>::from(DUMMY);
932             let mut bufs = [iovec; 64];
933             let n = buf.bytes_vec(&mut bufs);
934             self.io.get_ref().write_bufs(&bufs[..n])
935         };
936         match r {
937             Ok(n) => {
938                 buf.advance(n);
939                 Ok(Async::Ready(n))
940             }
941             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
942                 self.io.clear_write_ready()?;
943                 Ok(Async::NotReady)
944             }
945             Err(e) => Err(e),
946         }
947     }
948 }
949 
950 impl fmt::Debug for TcpStream {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result951     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
952         self.io.get_ref().fmt(f)
953     }
954 }
955 
956 impl Future for ConnectFuture {
957     type Item = TcpStream;
958     type Error = io::Error;
959 
poll(&mut self) -> Poll<TcpStream, io::Error>960     fn poll(&mut self) -> Poll<TcpStream, io::Error> {
961         self.inner.poll()
962     }
963 }
964 
965 impl ConnectFutureState {
poll_inner<F>(&mut self, f: F) -> Poll<TcpStream, io::Error> where F: FnOnce(&mut PollEvented<mio::net::TcpStream>) -> Poll<mio::Ready, io::Error>,966     fn poll_inner<F>(&mut self, f: F) -> Poll<TcpStream, io::Error>
967     where
968         F: FnOnce(&mut PollEvented<mio::net::TcpStream>) -> Poll<mio::Ready, io::Error>,
969     {
970         {
971             let stream = match *self {
972                 ConnectFutureState::Waiting(ref mut s) => s,
973                 ConnectFutureState::Error(_) => {
974                     let e = match mem::replace(self, ConnectFutureState::Empty) {
975                         ConnectFutureState::Error(e) => e,
976                         _ => panic!(),
977                     };
978                     return Err(e);
979                 }
980                 ConnectFutureState::Empty => panic!("can't poll TCP stream twice"),
981             };
982 
983             // Once we've connected, wait for the stream to be writable as
984             // that's when the actual connection has been initiated. Once we're
985             // writable we check for `take_socket_error` to see if the connect
986             // actually hit an error or not.
987             //
988             // If all that succeeded then we ship everything on up.
989             if let Async::NotReady = f(&mut stream.io)? {
990                 return Ok(Async::NotReady);
991             }
992 
993             if let Some(e) = stream.io.get_ref().take_error()? {
994                 return Err(e);
995             }
996         }
997 
998         match mem::replace(self, ConnectFutureState::Empty) {
999             ConnectFutureState::Waiting(stream) => Ok(Async::Ready(stream)),
1000             _ => panic!(),
1001         }
1002     }
1003 }
1004 
1005 impl Future for ConnectFutureState {
1006     type Item = TcpStream;
1007     type Error = io::Error;
1008 
poll(&mut self) -> Poll<TcpStream, io::Error>1009     fn poll(&mut self) -> Poll<TcpStream, io::Error> {
1010         self.poll_inner(|io| io.poll_write_ready())
1011     }
1012 }
1013 
1014 #[cfg(unix)]
1015 mod sys {
1016     use super::TcpStream;
1017     use std::os::unix::prelude::*;
1018 
1019     impl AsRawFd for TcpStream {
as_raw_fd(&self) -> RawFd1020         fn as_raw_fd(&self) -> RawFd {
1021             self.io.get_ref().as_raw_fd()
1022         }
1023     }
1024 }
1025 
1026 #[cfg(windows)]
1027 mod sys {
1028     // TODO: let's land these upstream with mio and then we can add them here.
1029     //
1030     // use std::os::windows::prelude::*;
1031     // use super::TcpStream;
1032     //
1033     // impl AsRawHandle for TcpStream {
1034     //     fn as_raw_handle(&self) -> RawHandle {
1035     //         self.io.get_ref().as_raw_handle()
1036     //     }
1037     // }
1038 }
1039