1 //! Slow down a stream by enforcing a delay between items.
2 
3 use {clock, Delay, Error};
4 
5 use futures::future::Either;
6 use futures::{Async, Future, Poll, Stream};
7 
8 use std::{
9     error::Error as StdError,
10     fmt::{Display, Formatter, Result as FmtResult},
11     time::Duration,
12 };
13 
14 /// Slow down a stream by enforcing a delay between items.
15 #[derive(Debug)]
16 #[must_use = "streams do nothing unless polled"]
17 pub struct Throttle<T> {
18     delay: Option<Delay>,
19     duration: Duration,
20     stream: T,
21 }
22 
23 /// Either the error of the underlying stream, or an error within
24 /// tokio's timing machinery.
25 #[derive(Debug)]
26 pub struct ThrottleError<T>(Either<T, Error>);
27 
28 impl<T> Throttle<T> {
29     /// Slow down a stream by enforcing a delay between items.
new(stream: T, duration: Duration) -> Self30     pub fn new(stream: T, duration: Duration) -> Self {
31         Self {
32             delay: None,
33             duration: duration,
34             stream: stream,
35         }
36     }
37 
38     /// Acquires a reference to the underlying stream that this combinator is
39     /// pulling from.
get_ref(&self) -> &T40     pub fn get_ref(&self) -> &T {
41         &self.stream
42     }
43 
44     /// Acquires a mutable reference to the underlying stream that this combinator
45     /// is pulling from.
46     ///
47     /// Note that care must be taken to avoid tampering with the state of the stream
48     /// which may otherwise confuse this combinator.
get_mut(&mut self) -> &mut T49     pub fn get_mut(&mut self) -> &mut T {
50         &mut self.stream
51     }
52 
53     /// Consumes this combinator, returning the underlying stream.
54     ///
55     /// Note that this may discard intermediate state of this combinator, so care
56     /// should be taken to avoid losing resources when this is called.
into_inner(self) -> T57     pub fn into_inner(self) -> T {
58         self.stream
59     }
60 }
61 
62 impl<T: Stream> Stream for Throttle<T> {
63     type Item = T::Item;
64     type Error = ThrottleError<T::Error>;
65 
poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>66     fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
67         if let Some(ref mut delay) = self.delay {
68             try_ready!({ delay.poll().map_err(ThrottleError::from_timer_err) });
69         }
70 
71         self.delay = None;
72         let value = try_ready!({ self.stream.poll().map_err(ThrottleError::from_stream_err) });
73 
74         if value.is_some() {
75             self.delay = Some(Delay::new(clock::now() + self.duration));
76         }
77 
78         Ok(Async::Ready(value))
79     }
80 }
81 
82 impl<T> ThrottleError<T> {
83     /// Creates a new `ThrottleError` from the given stream error.
from_stream_err(err: T) -> Self84     pub fn from_stream_err(err: T) -> Self {
85         ThrottleError(Either::A(err))
86     }
87 
88     /// Creates a new `ThrottleError` from the given tokio timer error.
from_timer_err(err: Error) -> Self89     pub fn from_timer_err(err: Error) -> Self {
90         ThrottleError(Either::B(err))
91     }
92 
93     /// Attempts to get the underlying stream error, if it is present.
get_stream_error(&self) -> Option<&T>94     pub fn get_stream_error(&self) -> Option<&T> {
95         match self.0 {
96             Either::A(ref x) => Some(x),
97             _ => None,
98         }
99     }
100 
101     /// Attempts to get the underlying timer error, if it is present.
get_timer_error(&self) -> Option<&Error>102     pub fn get_timer_error(&self) -> Option<&Error> {
103         match self.0 {
104             Either::B(ref x) => Some(x),
105             _ => None,
106         }
107     }
108 
109     /// Attempts to extract the underlying stream error, if it is present.
into_stream_error(self) -> Option<T>110     pub fn into_stream_error(self) -> Option<T> {
111         match self.0 {
112             Either::A(x) => Some(x),
113             _ => None,
114         }
115     }
116 
117     /// Attempts to extract the underlying timer error, if it is present.
into_timer_error(self) -> Option<Error>118     pub fn into_timer_error(self) -> Option<Error> {
119         match self.0 {
120             Either::B(x) => Some(x),
121             _ => None,
122         }
123     }
124 
125     /// Returns whether the throttle error has occured because of an error
126     /// in the underlying stream.
is_stream_error(&self) -> bool127     pub fn is_stream_error(&self) -> bool {
128         !self.is_timer_error()
129     }
130 
131     /// Returns whether the throttle error has occured because of an error
132     /// in tokio's timer system.
is_timer_error(&self) -> bool133     pub fn is_timer_error(&self) -> bool {
134         match self.0 {
135             Either::A(_) => false,
136             Either::B(_) => true,
137         }
138     }
139 }
140 
141 impl<T: StdError> Display for ThrottleError<T> {
fmt(&self, f: &mut Formatter) -> FmtResult142     fn fmt(&self, f: &mut Formatter) -> FmtResult {
143         match self.0 {
144             Either::A(ref err) => write!(f, "stream error: {}", err),
145             Either::B(ref err) => write!(f, "timer error: {}", err),
146         }
147     }
148 }
149 
150 impl<T: StdError + 'static> StdError for ThrottleError<T> {
description(&self) -> &str151     fn description(&self) -> &str {
152         match self.0 {
153             Either::A(_) => "stream error",
154             Either::B(_) => "timer error",
155         }
156     }
157 
158     // FIXME(taiki-e): When the minimum support version of tokio reaches Rust 1.30,
159     // replace this with Error::source.
160     #[allow(deprecated)]
cause(&self) -> Option<&dyn StdError>161     fn cause(&self) -> Option<&dyn StdError> {
162         match self.0 {
163             Either::A(ref err) => Some(err),
164             Either::B(ref err) => Some(err),
165         }
166     }
167 }
168