//! Allows a future or stream to execute for a maximum amount of time. //! //! See [`Timeout`] documentation for more details. //! //! [`Timeout`]: struct.Timeout.html use clock::now; use Delay; use futures::{Async, Future, Poll, Stream}; use std::error; use std::fmt; use std::time::{Duration, Instant}; /// Allows a `Future` or `Stream` to execute for a limited amount of time. /// /// If the future or stream completes before the timeout has expired, then /// `Timeout` returns the completed value. Otherwise, `Timeout` returns an /// [`Error`]. /// /// # Futures and Streams /// /// The exact behavor depends on if the inner value is a `Future` or a `Stream`. /// In the case of a `Future`, `Timeout` will require the future to complete by /// a fixed deadline. In the case of a `Stream`, `Timeout` will allow each item /// to take the entire timeout before returning an error. /// /// In order to set an upper bound on the processing of the *entire* stream, /// then a timeout should be set on the future that processes the stream. For /// example: /// /// ```rust /// # extern crate futures; /// # extern crate tokio; /// // import the `timeout` function, usually this is done /// // with `use tokio::prelude::*` /// use tokio::prelude::FutureExt; /// use futures::Stream; /// use futures::sync::mpsc; /// use std::time::Duration; /// /// # fn main() { /// let (tx, rx) = mpsc::unbounded(); /// # tx.unbounded_send(()).unwrap(); /// # drop(tx); /// /// let process = rx.for_each(|item| { /// // do something with `item` /// # drop(item); /// # Ok(()) /// }); /// /// # tokio::runtime::current_thread::block_on_all( /// // Wrap the future with a `Timeout` set to expire in 10 milliseconds. /// process.timeout(Duration::from_millis(10)) /// # ).unwrap(); /// # } /// ``` /// /// # Cancelation /// /// Cancelling a `Timeout` is done by dropping the value. No additional cleanup /// or other work is required. /// /// The original future or stream may be obtained by calling [`Timeout::into_inner`]. This /// consumes the `Timeout`. /// /// [`Error`]: struct.Error.html /// [`Timeout::into_inner`]: struct.Timeout.html#method.into_iter #[must_use = "futures do nothing unless polled"] #[derive(Debug)] pub struct Timeout { value: T, delay: Delay, } /// Error returned by `Timeout`. #[derive(Debug)] pub struct Error(Kind); /// Timeout error variants #[derive(Debug)] enum Kind { /// Inner value returned an error Inner(T), /// The timeout elapsed. Elapsed, /// Timer returned an error. Timer(::Error), } impl Timeout { /// Create a new `Timeout` that allows `value` to execute for a duration of /// at most `timeout`. /// /// The exact behavior depends on if `value` is a `Future` or a `Stream`. /// /// See [type] level documentation for more details. /// /// [type]: # /// /// # Examples /// /// Create a new `Timeout` set to expire in 10 milliseconds. /// /// ```rust /// # extern crate futures; /// # extern crate tokio; /// use tokio::timer::Timeout; /// use futures::Future; /// use futures::sync::oneshot; /// use std::time::Duration; /// /// # fn main() { /// let (tx, rx) = oneshot::channel(); /// # tx.send(()).unwrap(); /// /// # tokio::runtime::current_thread::block_on_all( /// // Wrap the future with a `Timeout` set to expire in 10 milliseconds. /// Timeout::new(rx, Duration::from_millis(10)) /// # ).unwrap(); /// # } /// ``` pub fn new(value: T, timeout: Duration) -> Timeout { let delay = Delay::new_timeout(now() + timeout, timeout); Timeout::new_with_delay(value, delay) } pub(crate) fn new_with_delay(value: T, delay: Delay) -> Timeout { Timeout { value, delay } } /// Gets a reference to the underlying value in this timeout. pub fn get_ref(&self) -> &T { &self.value } /// Gets a mutable reference to the underlying value in this timeout. pub fn get_mut(&mut self) -> &mut T { &mut self.value } /// Consumes this timeout, returning the underlying value. pub fn into_inner(self) -> T { self.value } } impl Timeout { /// Create a new `Timeout` that completes when `future` completes or when /// `deadline` is reached. /// /// This function differs from `new` in that: /// /// * It only accepts `Future` arguments. /// * It sets an explicit `Instant` at which the timeout expires. pub fn new_at(future: T, deadline: Instant) -> Timeout { let delay = Delay::new(deadline); Timeout { value: future, delay, } } } impl Future for Timeout where T: Future, { type Item = T::Item; type Error = Error; fn poll(&mut self) -> Poll { // First, try polling the future match self.value.poll() { Ok(Async::Ready(v)) => return Ok(Async::Ready(v)), Ok(Async::NotReady) => {} Err(e) => return Err(Error::inner(e)), } // Now check the timer match self.delay.poll() { Ok(Async::NotReady) => Ok(Async::NotReady), Ok(Async::Ready(_)) => Err(Error::elapsed()), Err(e) => Err(Error::timer(e)), } } } impl Stream for Timeout where T: Stream, { type Item = T::Item; type Error = Error; fn poll(&mut self) -> Poll, Self::Error> { // First, try polling the future match self.value.poll() { Ok(Async::Ready(v)) => { if v.is_some() { self.delay.reset_timeout(); } return Ok(Async::Ready(v)); } Ok(Async::NotReady) => {} Err(e) => return Err(Error::inner(e)), } // Now check the timer match self.delay.poll() { Ok(Async::NotReady) => Ok(Async::NotReady), Ok(Async::Ready(_)) => { self.delay.reset_timeout(); Err(Error::elapsed()) } Err(e) => Err(Error::timer(e)), } } } // ===== impl Error ===== impl Error { /// Create a new `Error` representing the inner value completing with `Err`. pub fn inner(err: T) -> Error { Error(Kind::Inner(err)) } /// Returns `true` if the error was caused by the inner value completing /// with `Err`. pub fn is_inner(&self) -> bool { match self.0 { Kind::Inner(_) => true, _ => false, } } /// Consumes `self`, returning the inner future error. pub fn into_inner(self) -> Option { match self.0 { Kind::Inner(err) => Some(err), _ => None, } } /// Create a new `Error` representing the inner value not completing before /// the deadline is reached. pub fn elapsed() -> Error { Error(Kind::Elapsed) } /// Returns `true` if the error was caused by the inner value not completing /// before the deadline is reached. pub fn is_elapsed(&self) -> bool { match self.0 { Kind::Elapsed => true, _ => false, } } /// Creates a new `Error` representing an error encountered by the timer /// implementation pub fn timer(err: ::Error) -> Error { Error(Kind::Timer(err)) } /// Returns `true` if the error was caused by the timer. pub fn is_timer(&self) -> bool { match self.0 { Kind::Timer(_) => true, _ => false, } } /// Consumes `self`, returning the error raised by the timer implementation. pub fn into_timer(self) -> Option<::Error> { match self.0 { Kind::Timer(err) => Some(err), _ => None, } } } impl error::Error for Error { fn description(&self) -> &str { use self::Kind::*; match self.0 { Inner(ref e) => e.description(), Elapsed => "deadline has elapsed", Timer(ref e) => e.description(), } } } impl fmt::Display for Error { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { use self::Kind::*; match self.0 { Inner(ref e) => e.fmt(fmt), Elapsed => "deadline has elapsed".fmt(fmt), Timer(ref e) => e.fmt(fmt), } } }