1 //! Slow down a stream by enforcing a delay between items. 2 3 use {clock, Delay, Error}; 4 5 use futures::future::Either; 6 use futures::{Async, Future, Poll, Stream}; 7 8 use std::{ 9 error::Error as StdError, 10 fmt::{Display, Formatter, Result as FmtResult}, 11 time::Duration, 12 }; 13 14 /// Slow down a stream by enforcing a delay between items. 15 #[derive(Debug)] 16 #[must_use = "streams do nothing unless polled"] 17 pub struct Throttle<T> { 18 delay: Option<Delay>, 19 duration: Duration, 20 stream: T, 21 } 22 23 /// Either the error of the underlying stream, or an error within 24 /// tokio's timing machinery. 25 #[derive(Debug)] 26 pub struct ThrottleError<T>(Either<T, Error>); 27 28 impl<T> Throttle<T> { 29 /// Slow down a stream by enforcing a delay between items. new(stream: T, duration: Duration) -> Self30 pub fn new(stream: T, duration: Duration) -> Self { 31 Self { 32 delay: None, 33 duration: duration, 34 stream: stream, 35 } 36 } 37 38 /// Acquires a reference to the underlying stream that this combinator is 39 /// pulling from. get_ref(&self) -> &T40 pub fn get_ref(&self) -> &T { 41 &self.stream 42 } 43 44 /// Acquires a mutable reference to the underlying stream that this combinator 45 /// is pulling from. 46 /// 47 /// Note that care must be taken to avoid tampering with the state of the stream 48 /// which may otherwise confuse this combinator. get_mut(&mut self) -> &mut T49 pub fn get_mut(&mut self) -> &mut T { 50 &mut self.stream 51 } 52 53 /// Consumes this combinator, returning the underlying stream. 54 /// 55 /// Note that this may discard intermediate state of this combinator, so care 56 /// should be taken to avoid losing resources when this is called. into_inner(self) -> T57 pub fn into_inner(self) -> T { 58 self.stream 59 } 60 } 61 62 impl<T: Stream> Stream for Throttle<T> { 63 type Item = T::Item; 64 type Error = ThrottleError<T::Error>; 65 poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>66 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { 67 if let Some(ref mut delay) = self.delay { 68 try_ready!({ delay.poll().map_err(ThrottleError::from_timer_err) }); 69 } 70 71 self.delay = None; 72 let value = try_ready!({ self.stream.poll().map_err(ThrottleError::from_stream_err) }); 73 74 if value.is_some() { 75 self.delay = Some(Delay::new(clock::now() + self.duration)); 76 } 77 78 Ok(Async::Ready(value)) 79 } 80 } 81 82 impl<T> ThrottleError<T> { 83 /// Creates a new `ThrottleError` from the given stream error. from_stream_err(err: T) -> Self84 pub fn from_stream_err(err: T) -> Self { 85 ThrottleError(Either::A(err)) 86 } 87 88 /// Creates a new `ThrottleError` from the given tokio timer error. from_timer_err(err: Error) -> Self89 pub fn from_timer_err(err: Error) -> Self { 90 ThrottleError(Either::B(err)) 91 } 92 93 /// Attempts to get the underlying stream error, if it is present. get_stream_error(&self) -> Option<&T>94 pub fn get_stream_error(&self) -> Option<&T> { 95 match self.0 { 96 Either::A(ref x) => Some(x), 97 _ => None, 98 } 99 } 100 101 /// Attempts to get the underlying timer error, if it is present. get_timer_error(&self) -> Option<&Error>102 pub fn get_timer_error(&self) -> Option<&Error> { 103 match self.0 { 104 Either::B(ref x) => Some(x), 105 _ => None, 106 } 107 } 108 109 /// Attempts to extract the underlying stream error, if it is present. into_stream_error(self) -> Option<T>110 pub fn into_stream_error(self) -> Option<T> { 111 match self.0 { 112 Either::A(x) => Some(x), 113 _ => None, 114 } 115 } 116 117 /// Attempts to extract the underlying timer error, if it is present. into_timer_error(self) -> Option<Error>118 pub fn into_timer_error(self) -> Option<Error> { 119 match self.0 { 120 Either::B(x) => Some(x), 121 _ => None, 122 } 123 } 124 125 /// Returns whether the throttle error has occured because of an error 126 /// in the underlying stream. is_stream_error(&self) -> bool127 pub fn is_stream_error(&self) -> bool { 128 !self.is_timer_error() 129 } 130 131 /// Returns whether the throttle error has occured because of an error 132 /// in tokio's timer system. is_timer_error(&self) -> bool133 pub fn is_timer_error(&self) -> bool { 134 match self.0 { 135 Either::A(_) => false, 136 Either::B(_) => true, 137 } 138 } 139 } 140 141 impl<T: StdError> Display for ThrottleError<T> { fmt(&self, f: &mut Formatter) -> FmtResult142 fn fmt(&self, f: &mut Formatter) -> FmtResult { 143 match self.0 { 144 Either::A(ref err) => write!(f, "stream error: {}", err), 145 Either::B(ref err) => write!(f, "timer error: {}", err), 146 } 147 } 148 } 149 150 impl<T: StdError + 'static> StdError for ThrottleError<T> { description(&self) -> &str151 fn description(&self) -> &str { 152 match self.0 { 153 Either::A(_) => "stream error", 154 Either::B(_) => "timer error", 155 } 156 } 157 158 // FIXME(taiki-e): When the minimum support version of tokio reaches Rust 1.30, 159 // replace this with Error::source. 160 #[allow(deprecated)] cause(&self) -> Option<&dyn StdError>161 fn cause(&self) -> Option<&dyn StdError> { 162 match self.0 { 163 Either::A(ref err) => Some(err), 164 Either::B(ref err) => Some(err), 165 } 166 } 167 } 168