1 use Delay; 2 3 use clock; 4 5 use futures::{Future, Poll, Stream}; 6 7 use std::time::{Duration, Instant}; 8 9 /// A stream representing notifications at fixed interval 10 #[derive(Debug)] 11 pub struct Interval { 12 /// Future that completes the next time the `Interval` yields a value. 13 delay: Delay, 14 15 /// The duration between values yielded by `Interval`. 16 duration: Duration, 17 } 18 19 impl Interval { 20 /// Create a new `Interval` that starts at `at` and yields every `duration` 21 /// interval after that. 22 /// 23 /// Note that when it starts, it produces item too. 24 /// 25 /// The `duration` argument must be a non-zero duration. 26 /// 27 /// # Panics 28 /// 29 /// This function panics if `duration` is zero. new(at: Instant, duration: Duration) -> Interval30 pub fn new(at: Instant, duration: Duration) -> Interval { 31 assert!( 32 duration > Duration::new(0, 0), 33 "`duration` must be non-zero." 34 ); 35 36 Interval::new_with_delay(Delay::new(at), duration) 37 } 38 39 /// Creates new `Interval` that yields with interval of `duration`. 40 /// 41 /// The function is shortcut for `Interval::new(Instant::now() + duration, duration)`. 42 /// 43 /// The `duration` argument must be a non-zero duration. 44 /// 45 /// # Panics 46 /// 47 /// This function panics if `duration` is zero. new_interval(duration: Duration) -> Interval48 pub fn new_interval(duration: Duration) -> Interval { 49 Interval::new(clock::now() + duration, duration) 50 } 51 new_with_delay(delay: Delay, duration: Duration) -> Interval52 pub(crate) fn new_with_delay(delay: Delay, duration: Duration) -> Interval { 53 Interval { delay, duration } 54 } 55 } 56 57 impl Stream for Interval { 58 type Item = Instant; 59 type Error = ::Error; 60 poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>61 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { 62 // Wait for the delay to be done 63 let _ = try_ready!(self.delay.poll()); 64 65 // Get the `now` by looking at the `delay` deadline 66 let now = self.delay.deadline(); 67 68 // The next interval value is `duration` after the one that just 69 // yielded. 70 self.delay.reset(now + self.duration); 71 72 // Return the current instant 73 Ok(Some(now).into()) 74 } 75 } 76