1 //! Allows a future or stream to execute for a maximum amount of time.
2 //!
3 //! See [`Timeout`] documentation for more details.
4 //!
5 //! [`Timeout`]: struct.Timeout.html
6 
7 use clock::now;
8 use Delay;
9 
10 use futures::{Async, Future, Poll, Stream};
11 
12 use std::error;
13 use std::fmt;
14 use std::time::{Duration, Instant};
15 
16 /// Allows a `Future` or `Stream` to execute for a limited amount of time.
17 ///
18 /// If the future or stream completes before the timeout has expired, then
19 /// `Timeout` returns the completed value. Otherwise, `Timeout` returns an
20 /// [`Error`].
21 ///
22 /// # Futures and Streams
23 ///
24 /// The exact behavor depends on if the inner value is a `Future` or a `Stream`.
25 /// In the case of a `Future`, `Timeout` will require the future to complete by
26 /// a fixed deadline. In the case of a `Stream`, `Timeout` will allow each item
27 /// to take the entire timeout before returning an error.
28 ///
29 /// In order to set an upper bound on the processing of the *entire* stream,
30 /// then a timeout should be set on the future that processes the stream. For
31 /// example:
32 ///
33 /// ```rust
34 /// # extern crate futures;
35 /// # extern crate tokio;
36 /// // import the `timeout` function, usually this is done
37 /// // with `use tokio::prelude::*`
38 /// use tokio::prelude::FutureExt;
39 /// use futures::Stream;
40 /// use futures::sync::mpsc;
41 /// use std::time::Duration;
42 ///
43 /// # fn main() {
44 /// let (tx, rx) = mpsc::unbounded();
45 /// # tx.unbounded_send(()).unwrap();
46 /// # drop(tx);
47 ///
48 /// let process = rx.for_each(|item| {
49 ///     // do something with `item`
50 /// # drop(item);
51 /// # Ok(())
52 /// });
53 ///
54 /// # tokio::runtime::current_thread::block_on_all(
55 /// // Wrap the future with a `Timeout` set to expire in 10 milliseconds.
56 /// process.timeout(Duration::from_millis(10))
57 /// # ).unwrap();
58 /// # }
59 /// ```
60 ///
61 /// # Cancelation
62 ///
63 /// Cancelling a `Timeout` is done by dropping the value. No additional cleanup
64 /// or other work is required.
65 ///
66 /// The original future or stream may be obtained by calling [`Timeout::into_inner`]. This
67 /// consumes the `Timeout`.
68 ///
69 /// [`Error`]: struct.Error.html
70 /// [`Timeout::into_inner`]: struct.Timeout.html#method.into_iter
71 #[must_use = "futures do nothing unless polled"]
72 #[derive(Debug)]
73 pub struct Timeout<T> {
74     value: T,
75     delay: Delay,
76 }
77 
78 /// Error returned by `Timeout`.
79 #[derive(Debug)]
80 pub struct Error<T>(Kind<T>);
81 
82 /// Timeout error variants
83 #[derive(Debug)]
84 enum Kind<T> {
85     /// Inner value returned an error
86     Inner(T),
87 
88     /// The timeout elapsed.
89     Elapsed,
90 
91     /// Timer returned an error.
92     Timer(::Error),
93 }
94 
95 impl<T> Timeout<T> {
96     /// Create a new `Timeout` that allows `value` to execute for a duration of
97     /// at most `timeout`.
98     ///
99     /// The exact behavior depends on if `value` is a `Future` or a `Stream`.
100     ///
101     /// See [type] level documentation for more details.
102     ///
103     /// [type]: #
104     ///
105     /// # Examples
106     ///
107     /// Create a new `Timeout` set to expire in 10 milliseconds.
108     ///
109     /// ```rust
110     /// # extern crate futures;
111     /// # extern crate tokio;
112     /// use tokio::timer::Timeout;
113     /// use futures::Future;
114     /// use futures::sync::oneshot;
115     /// use std::time::Duration;
116     ///
117     /// # fn main() {
118     /// let (tx, rx) = oneshot::channel();
119     /// # tx.send(()).unwrap();
120     ///
121     /// # tokio::runtime::current_thread::block_on_all(
122     /// // Wrap the future with a `Timeout` set to expire in 10 milliseconds.
123     /// Timeout::new(rx, Duration::from_millis(10))
124     /// # ).unwrap();
125     /// # }
126     /// ```
new(value: T, timeout: Duration) -> Timeout<T>127     pub fn new(value: T, timeout: Duration) -> Timeout<T> {
128         let delay = Delay::new_timeout(now() + timeout, timeout);
129         Timeout::new_with_delay(value, delay)
130     }
131 
new_with_delay(value: T, delay: Delay) -> Timeout<T>132     pub(crate) fn new_with_delay(value: T, delay: Delay) -> Timeout<T> {
133         Timeout { value, delay }
134     }
135 
136     /// Gets a reference to the underlying value in this timeout.
get_ref(&self) -> &T137     pub fn get_ref(&self) -> &T {
138         &self.value
139     }
140 
141     /// Gets a mutable reference to the underlying value in this timeout.
get_mut(&mut self) -> &mut T142     pub fn get_mut(&mut self) -> &mut T {
143         &mut self.value
144     }
145 
146     /// Consumes this timeout, returning the underlying value.
into_inner(self) -> T147     pub fn into_inner(self) -> T {
148         self.value
149     }
150 }
151 
152 impl<T: Future> Timeout<T> {
153     /// Create a new `Timeout` that completes when `future` completes or when
154     /// `deadline` is reached.
155     ///
156     /// This function differs from `new` in that:
157     ///
158     /// * It only accepts `Future` arguments.
159     /// * It sets an explicit `Instant` at which the timeout expires.
new_at(future: T, deadline: Instant) -> Timeout<T>160     pub fn new_at(future: T, deadline: Instant) -> Timeout<T> {
161         let delay = Delay::new(deadline);
162 
163         Timeout {
164             value: future,
165             delay,
166         }
167     }
168 }
169 
170 impl<T> Future for Timeout<T>
171 where
172     T: Future,
173 {
174     type Item = T::Item;
175     type Error = Error<T::Error>;
176 
poll(&mut self) -> Poll<Self::Item, Self::Error>177     fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
178         // First, try polling the future
179         match self.value.poll() {
180             Ok(Async::Ready(v)) => return Ok(Async::Ready(v)),
181             Ok(Async::NotReady) => {}
182             Err(e) => return Err(Error::inner(e)),
183         }
184 
185         // Now check the timer
186         match self.delay.poll() {
187             Ok(Async::NotReady) => Ok(Async::NotReady),
188             Ok(Async::Ready(_)) => Err(Error::elapsed()),
189             Err(e) => Err(Error::timer(e)),
190         }
191     }
192 }
193 
194 impl<T> Stream for Timeout<T>
195 where
196     T: Stream,
197 {
198     type Item = T::Item;
199     type Error = Error<T::Error>;
200 
poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>201     fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
202         // First, try polling the future
203         match self.value.poll() {
204             Ok(Async::Ready(v)) => {
205                 if v.is_some() {
206                     self.delay.reset_timeout();
207                 }
208                 return Ok(Async::Ready(v));
209             }
210             Ok(Async::NotReady) => {}
211             Err(e) => return Err(Error::inner(e)),
212         }
213 
214         // Now check the timer
215         match self.delay.poll() {
216             Ok(Async::NotReady) => Ok(Async::NotReady),
217             Ok(Async::Ready(_)) => {
218                 self.delay.reset_timeout();
219                 Err(Error::elapsed())
220             }
221             Err(e) => Err(Error::timer(e)),
222         }
223     }
224 }
225 
226 // ===== impl Error =====
227 
228 impl<T> Error<T> {
229     /// Create a new `Error` representing the inner value completing with `Err`.
inner(err: T) -> Error<T>230     pub fn inner(err: T) -> Error<T> {
231         Error(Kind::Inner(err))
232     }
233 
234     /// Returns `true` if the error was caused by the inner value completing
235     /// with `Err`.
is_inner(&self) -> bool236     pub fn is_inner(&self) -> bool {
237         match self.0 {
238             Kind::Inner(_) => true,
239             _ => false,
240         }
241     }
242 
243     /// Consumes `self`, returning the inner future error.
into_inner(self) -> Option<T>244     pub fn into_inner(self) -> Option<T> {
245         match self.0 {
246             Kind::Inner(err) => Some(err),
247             _ => None,
248         }
249     }
250 
251     /// Create a new `Error` representing the inner value not completing before
252     /// the deadline is reached.
elapsed() -> Error<T>253     pub fn elapsed() -> Error<T> {
254         Error(Kind::Elapsed)
255     }
256 
257     /// Returns `true` if the error was caused by the inner value not completing
258     /// before the deadline is reached.
is_elapsed(&self) -> bool259     pub fn is_elapsed(&self) -> bool {
260         match self.0 {
261             Kind::Elapsed => true,
262             _ => false,
263         }
264     }
265 
266     /// Creates a new `Error` representing an error encountered by the timer
267     /// implementation
timer(err: ::Error) -> Error<T>268     pub fn timer(err: ::Error) -> Error<T> {
269         Error(Kind::Timer(err))
270     }
271 
272     /// Returns `true` if the error was caused by the timer.
is_timer(&self) -> bool273     pub fn is_timer(&self) -> bool {
274         match self.0 {
275             Kind::Timer(_) => true,
276             _ => false,
277         }
278     }
279 
280     /// Consumes `self`, returning the error raised by the timer implementation.
into_timer(self) -> Option<::Error>281     pub fn into_timer(self) -> Option<::Error> {
282         match self.0 {
283             Kind::Timer(err) => Some(err),
284             _ => None,
285         }
286     }
287 }
288 
289 impl<T: error::Error> error::Error for Error<T> {
description(&self) -> &str290     fn description(&self) -> &str {
291         use self::Kind::*;
292 
293         match self.0 {
294             Inner(ref e) => e.description(),
295             Elapsed => "deadline has elapsed",
296             Timer(ref e) => e.description(),
297         }
298     }
299 }
300 
301 impl<T: fmt::Display> fmt::Display for Error<T> {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result302     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
303         use self::Kind::*;
304 
305         match self.0 {
306             Inner(ref e) => e.fmt(fmt),
307             Elapsed => "deadline has elapsed".fmt(fmt),
308             Timer(ref e) => e.fmt(fmt),
309         }
310     }
311 }
312