1 //! Allows a future or stream to execute for a maximum amount of time. 2 //! 3 //! See [`Timeout`] documentation for more details. 4 //! 5 //! [`Timeout`]: struct.Timeout.html 6 7 use clock::now; 8 use Delay; 9 10 use futures::{Async, Future, Poll, Stream}; 11 12 use std::error; 13 use std::fmt; 14 use std::time::{Duration, Instant}; 15 16 /// Allows a `Future` or `Stream` to execute for a limited amount of time. 17 /// 18 /// If the future or stream completes before the timeout has expired, then 19 /// `Timeout` returns the completed value. Otherwise, `Timeout` returns an 20 /// [`Error`]. 21 /// 22 /// # Futures and Streams 23 /// 24 /// The exact behavor depends on if the inner value is a `Future` or a `Stream`. 25 /// In the case of a `Future`, `Timeout` will require the future to complete by 26 /// a fixed deadline. In the case of a `Stream`, `Timeout` will allow each item 27 /// to take the entire timeout before returning an error. 28 /// 29 /// In order to set an upper bound on the processing of the *entire* stream, 30 /// then a timeout should be set on the future that processes the stream. For 31 /// example: 32 /// 33 /// ```rust 34 /// # extern crate futures; 35 /// # extern crate tokio; 36 /// // import the `timeout` function, usually this is done 37 /// // with `use tokio::prelude::*` 38 /// use tokio::prelude::FutureExt; 39 /// use futures::Stream; 40 /// use futures::sync::mpsc; 41 /// use std::time::Duration; 42 /// 43 /// # fn main() { 44 /// let (tx, rx) = mpsc::unbounded(); 45 /// # tx.unbounded_send(()).unwrap(); 46 /// # drop(tx); 47 /// 48 /// let process = rx.for_each(|item| { 49 /// // do something with `item` 50 /// # drop(item); 51 /// # Ok(()) 52 /// }); 53 /// 54 /// # tokio::runtime::current_thread::block_on_all( 55 /// // Wrap the future with a `Timeout` set to expire in 10 milliseconds. 56 /// process.timeout(Duration::from_millis(10)) 57 /// # ).unwrap(); 58 /// # } 59 /// ``` 60 /// 61 /// # Cancelation 62 /// 63 /// Cancelling a `Timeout` is done by dropping the value. No additional cleanup 64 /// or other work is required. 65 /// 66 /// The original future or stream may be obtained by calling [`Timeout::into_inner`]. This 67 /// consumes the `Timeout`. 68 /// 69 /// [`Error`]: struct.Error.html 70 /// [`Timeout::into_inner`]: struct.Timeout.html#method.into_iter 71 #[must_use = "futures do nothing unless polled"] 72 #[derive(Debug)] 73 pub struct Timeout<T> { 74 value: T, 75 delay: Delay, 76 } 77 78 /// Error returned by `Timeout`. 79 #[derive(Debug)] 80 pub struct Error<T>(Kind<T>); 81 82 /// Timeout error variants 83 #[derive(Debug)] 84 enum Kind<T> { 85 /// Inner value returned an error 86 Inner(T), 87 88 /// The timeout elapsed. 89 Elapsed, 90 91 /// Timer returned an error. 92 Timer(::Error), 93 } 94 95 impl<T> Timeout<T> { 96 /// Create a new `Timeout` that allows `value` to execute for a duration of 97 /// at most `timeout`. 98 /// 99 /// The exact behavior depends on if `value` is a `Future` or a `Stream`. 100 /// 101 /// See [type] level documentation for more details. 102 /// 103 /// [type]: # 104 /// 105 /// # Examples 106 /// 107 /// Create a new `Timeout` set to expire in 10 milliseconds. 108 /// 109 /// ```rust 110 /// # extern crate futures; 111 /// # extern crate tokio; 112 /// use tokio::timer::Timeout; 113 /// use futures::Future; 114 /// use futures::sync::oneshot; 115 /// use std::time::Duration; 116 /// 117 /// # fn main() { 118 /// let (tx, rx) = oneshot::channel(); 119 /// # tx.send(()).unwrap(); 120 /// 121 /// # tokio::runtime::current_thread::block_on_all( 122 /// // Wrap the future with a `Timeout` set to expire in 10 milliseconds. 123 /// Timeout::new(rx, Duration::from_millis(10)) 124 /// # ).unwrap(); 125 /// # } 126 /// ``` new(value: T, timeout: Duration) -> Timeout<T>127 pub fn new(value: T, timeout: Duration) -> Timeout<T> { 128 let delay = Delay::new_timeout(now() + timeout, timeout); 129 Timeout::new_with_delay(value, delay) 130 } 131 new_with_delay(value: T, delay: Delay) -> Timeout<T>132 pub(crate) fn new_with_delay(value: T, delay: Delay) -> Timeout<T> { 133 Timeout { value, delay } 134 } 135 136 /// Gets a reference to the underlying value in this timeout. get_ref(&self) -> &T137 pub fn get_ref(&self) -> &T { 138 &self.value 139 } 140 141 /// Gets a mutable reference to the underlying value in this timeout. get_mut(&mut self) -> &mut T142 pub fn get_mut(&mut self) -> &mut T { 143 &mut self.value 144 } 145 146 /// Consumes this timeout, returning the underlying value. into_inner(self) -> T147 pub fn into_inner(self) -> T { 148 self.value 149 } 150 } 151 152 impl<T: Future> Timeout<T> { 153 /// Create a new `Timeout` that completes when `future` completes or when 154 /// `deadline` is reached. 155 /// 156 /// This function differs from `new` in that: 157 /// 158 /// * It only accepts `Future` arguments. 159 /// * It sets an explicit `Instant` at which the timeout expires. new_at(future: T, deadline: Instant) -> Timeout<T>160 pub fn new_at(future: T, deadline: Instant) -> Timeout<T> { 161 let delay = Delay::new(deadline); 162 163 Timeout { 164 value: future, 165 delay, 166 } 167 } 168 } 169 170 impl<T> Future for Timeout<T> 171 where 172 T: Future, 173 { 174 type Item = T::Item; 175 type Error = Error<T::Error>; 176 poll(&mut self) -> Poll<Self::Item, Self::Error>177 fn poll(&mut self) -> Poll<Self::Item, Self::Error> { 178 // First, try polling the future 179 match self.value.poll() { 180 Ok(Async::Ready(v)) => return Ok(Async::Ready(v)), 181 Ok(Async::NotReady) => {} 182 Err(e) => return Err(Error::inner(e)), 183 } 184 185 // Now check the timer 186 match self.delay.poll() { 187 Ok(Async::NotReady) => Ok(Async::NotReady), 188 Ok(Async::Ready(_)) => Err(Error::elapsed()), 189 Err(e) => Err(Error::timer(e)), 190 } 191 } 192 } 193 194 impl<T> Stream for Timeout<T> 195 where 196 T: Stream, 197 { 198 type Item = T::Item; 199 type Error = Error<T::Error>; 200 poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>201 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { 202 // First, try polling the future 203 match self.value.poll() { 204 Ok(Async::Ready(v)) => { 205 if v.is_some() { 206 self.delay.reset_timeout(); 207 } 208 return Ok(Async::Ready(v)); 209 } 210 Ok(Async::NotReady) => {} 211 Err(e) => return Err(Error::inner(e)), 212 } 213 214 // Now check the timer 215 match self.delay.poll() { 216 Ok(Async::NotReady) => Ok(Async::NotReady), 217 Ok(Async::Ready(_)) => { 218 self.delay.reset_timeout(); 219 Err(Error::elapsed()) 220 } 221 Err(e) => Err(Error::timer(e)), 222 } 223 } 224 } 225 226 // ===== impl Error ===== 227 228 impl<T> Error<T> { 229 /// Create a new `Error` representing the inner value completing with `Err`. inner(err: T) -> Error<T>230 pub fn inner(err: T) -> Error<T> { 231 Error(Kind::Inner(err)) 232 } 233 234 /// Returns `true` if the error was caused by the inner value completing 235 /// with `Err`. is_inner(&self) -> bool236 pub fn is_inner(&self) -> bool { 237 match self.0 { 238 Kind::Inner(_) => true, 239 _ => false, 240 } 241 } 242 243 /// Consumes `self`, returning the inner future error. into_inner(self) -> Option<T>244 pub fn into_inner(self) -> Option<T> { 245 match self.0 { 246 Kind::Inner(err) => Some(err), 247 _ => None, 248 } 249 } 250 251 /// Create a new `Error` representing the inner value not completing before 252 /// the deadline is reached. elapsed() -> Error<T>253 pub fn elapsed() -> Error<T> { 254 Error(Kind::Elapsed) 255 } 256 257 /// Returns `true` if the error was caused by the inner value not completing 258 /// before the deadline is reached. is_elapsed(&self) -> bool259 pub fn is_elapsed(&self) -> bool { 260 match self.0 { 261 Kind::Elapsed => true, 262 _ => false, 263 } 264 } 265 266 /// Creates a new `Error` representing an error encountered by the timer 267 /// implementation timer(err: ::Error) -> Error<T>268 pub fn timer(err: ::Error) -> Error<T> { 269 Error(Kind::Timer(err)) 270 } 271 272 /// Returns `true` if the error was caused by the timer. is_timer(&self) -> bool273 pub fn is_timer(&self) -> bool { 274 match self.0 { 275 Kind::Timer(_) => true, 276 _ => false, 277 } 278 } 279 280 /// Consumes `self`, returning the error raised by the timer implementation. into_timer(self) -> Option<::Error>281 pub fn into_timer(self) -> Option<::Error> { 282 match self.0 { 283 Kind::Timer(err) => Some(err), 284 _ => None, 285 } 286 } 287 } 288 289 impl<T: error::Error> error::Error for Error<T> { description(&self) -> &str290 fn description(&self) -> &str { 291 use self::Kind::*; 292 293 match self.0 { 294 Inner(ref e) => e.description(), 295 Elapsed => "deadline has elapsed", 296 Timer(ref e) => e.description(), 297 } 298 } 299 } 300 301 impl<T: fmt::Display> fmt::Display for Error<T> { fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result302 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { 303 use self::Kind::*; 304 305 match self.0 { 306 Inner(ref e) => e.fmt(fmt), 307 Elapsed => "deadline has elapsed".fmt(fmt), 308 Timer(ref e) => e.fmt(fmt), 309 } 310 } 311 } 312