1 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
2 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
3 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
4 // option. This file may not be copied, modified, or distributed
5 // except according to those terms.
6 
7 use std::convert::TryFrom;
8 use std::mem;
9 use std::time::{Duration, Instant};
10 
11 /// Internal structure for a timer item.
12 struct TimerItem<T> {
13     time: Instant,
14     item: T,
15 }
16 
17 impl<T> TimerItem<T> {
time(ti: &Self) -> Instant18     fn time(ti: &Self) -> Instant {
19         ti.time
20     }
21 }
22 
23 /// A timer queue.
24 /// This uses a classic timer wheel arrangement, with some characteristics that might be considered peculiar.
25 /// Each slot in the wheel is sorted (complexity O(N) insertions, but O(logN) to find cut points).
26 /// Time is relative, the wheel has an origin time and it is unable to represent times that are more than
27 /// `granularity * capacity` past that time.
28 pub struct Timer<T> {
29     items: Vec<Vec<TimerItem<T>>>,
30     now: Instant,
31     granularity: Duration,
32     cursor: usize,
33 }
34 
35 impl<T> Timer<T> {
36     /// Construct a new wheel at the given granularity, starting at the given time.
37     /// # Panics
38     /// When `capacity` is too large to fit in `u32` or `granularity` is zero.
new(now: Instant, granularity: Duration, capacity: usize) -> Self39     pub fn new(now: Instant, granularity: Duration, capacity: usize) -> Self {
40         assert!(u32::try_from(capacity).is_ok());
41         assert!(granularity.as_nanos() > 0);
42         let mut items = Vec::with_capacity(capacity);
43         items.resize_with(capacity, Default::default);
44         Self {
45             items,
46             now,
47             granularity,
48             cursor: 0,
49         }
50     }
51 
52     /// Return a reference to the time of the next entry.
53     #[must_use]
next_time(&self) -> Option<Instant>54     pub fn next_time(&self) -> Option<Instant> {
55         for i in 0..self.items.len() {
56             let idx = self.bucket(i);
57             if let Some(t) = self.items[idx].first() {
58                 return Some(t.time);
59             }
60         }
61         None
62     }
63 
64     /// Get the full span of time that this can cover.
65     /// Two timers cannot be more than this far apart.
66     /// In practice, this value is less by one amount of the timer granularity.
67     #[inline]
68     #[allow(clippy::cast_possible_truncation)] // guarded by assertion
69     #[must_use]
span(&self) -> Duration70     pub fn span(&self) -> Duration {
71         self.granularity * (self.items.len() as u32)
72     }
73 
74     /// For the given `time`, get the number of whole buckets in the future that is.
75     #[inline]
76     #[allow(clippy::cast_possible_truncation)] // guarded by assertion
delta(&self, time: Instant) -> usize77     fn delta(&self, time: Instant) -> usize {
78         // This really should use Instant::div_duration(), but it can't yet.
79         ((time - self.now).as_nanos() / self.granularity.as_nanos()) as usize
80     }
81 
82     #[inline]
time_bucket(&self, time: Instant) -> usize83     fn time_bucket(&self, time: Instant) -> usize {
84         self.bucket(self.delta(time))
85     }
86 
87     #[inline]
bucket(&self, delta: usize) -> usize88     fn bucket(&self, delta: usize) -> usize {
89         debug_assert!(delta < self.items.len());
90         (self.cursor + delta) % self.items.len()
91     }
92 
93     /// Slide forward in time by `n * self.granularity`.
94     #[allow(unknown_lints, renamed_and_removed_lints, clippy::unknown_clippy_lints)] // Until we require rust 1.45.
95     #[allow(clippy::cast_possible_truncation, clippy::reversed_empty_ranges)]
96     // cast_possible_truncation is ok because we have an assertion guard.
97     // reversed_empty_ranges is to avoid different types on the if/else.
tick(&mut self, n: usize)98     fn tick(&mut self, n: usize) {
99         let new = self.bucket(n);
100         let iter = if new < self.cursor {
101             (self.cursor..self.items.len()).chain(0..new)
102         } else {
103             (self.cursor..new).chain(0..0)
104         };
105         for i in iter {
106             assert!(self.items[i].is_empty());
107         }
108         self.now += self.granularity * (n as u32);
109         self.cursor = new;
110     }
111 
112     /// Asserts if the time given is in the past or too far in the future.
113     /// # Panics
114     /// When `time` is in the past relative to previous calls.
add(&mut self, time: Instant, item: T)115     pub fn add(&mut self, time: Instant, item: T) {
116         assert!(time >= self.now);
117         // Skip forward quickly if there is too large a gap.
118         let short_span = self.span() - self.granularity;
119         if time >= (self.now + self.span() + short_span) {
120             // Assert that there aren't any items.
121             for i in &self.items {
122                 debug_assert!(i.is_empty());
123             }
124             self.now = time - short_span;
125             self.cursor = 0;
126         }
127 
128         // Adjust time forward the minimum amount necessary.
129         let mut d = self.delta(time);
130         if d >= self.items.len() {
131             self.tick(1 + d - self.items.len());
132             d = self.items.len() - 1;
133         }
134 
135         let bucket = self.bucket(d);
136         let ins = match self.items[bucket].binary_search_by_key(&time, TimerItem::time) {
137             Ok(j) | Err(j) => j,
138         };
139         self.items[bucket].insert(ins, TimerItem { time, item });
140     }
141 
142     /// Given knowledge of the time an item was added, remove it.
143     /// This requires use of a predicate that identifies matching items.
remove<F>(&mut self, time: Instant, mut selector: F) -> Option<T> where F: FnMut(&T) -> bool,144     pub fn remove<F>(&mut self, time: Instant, mut selector: F) -> Option<T>
145     where
146         F: FnMut(&T) -> bool,
147     {
148         if time < self.now {
149             return None;
150         }
151         if time > self.now + self.span() {
152             return None;
153         }
154         let bucket = self.time_bucket(time);
155         let start_index = match self.items[bucket].binary_search_by_key(&time, TimerItem::time) {
156             Ok(idx) => idx,
157             Err(_) => return None,
158         };
159         // start_index is just one of potentially many items with the same time.
160         // Search backwards for a match, ...
161         for i in (0..=start_index).rev() {
162             if self.items[bucket][i].time != time {
163                 break;
164             }
165             if selector(&self.items[bucket][i].item) {
166                 return Some(self.items[bucket].remove(i).item);
167             }
168         }
169         // ... then forwards.
170         for i in (start_index + 1)..self.items[bucket].len() {
171             if self.items[bucket][i].time != time {
172                 break;
173             }
174             if selector(&self.items[bucket][i].item) {
175                 return Some(self.items[bucket].remove(i).item);
176             }
177         }
178         None
179     }
180 
181     /// Take the next item, unless there are no items with
182     /// a timeout in the past relative to `until`.
take_next(&mut self, until: Instant) -> Option<T>183     pub fn take_next(&mut self, until: Instant) -> Option<T> {
184         for i in 0..self.items.len() {
185             let idx = self.bucket(i);
186             if !self.items[idx].is_empty() && self.items[idx][0].time <= until {
187                 return Some(self.items[idx].remove(0).item);
188             }
189         }
190         None
191     }
192 
193     /// Create an iterator that takes all items until the given time.
194     /// Note: Items might be removed even if the iterator is not fully exhausted.
take_until(&mut self, until: Instant) -> impl Iterator<Item = T>195     pub fn take_until(&mut self, until: Instant) -> impl Iterator<Item = T> {
196         let get_item = move |x: TimerItem<T>| x.item;
197         if until >= self.now + self.span() {
198             // Drain everything, so a clean sweep.
199             let mut empty_items = Vec::with_capacity(self.items.len());
200             empty_items.resize_with(self.items.len(), Vec::default);
201             let mut items = mem::replace(&mut self.items, empty_items);
202             self.now = until;
203             self.cursor = 0;
204 
205             let tail = items.split_off(self.cursor);
206             return tail.into_iter().chain(items).flatten().map(get_item);
207         }
208 
209         // Only returning a partial span, so do it bucket at a time.
210         let delta = self.delta(until);
211         let mut buckets = Vec::with_capacity(delta + 1);
212 
213         // First, the whole buckets.
214         for i in 0..delta {
215             let idx = self.bucket(i);
216             buckets.push(mem::take(&mut self.items[idx]));
217         }
218         self.tick(delta);
219 
220         // Now we need to split the last bucket, because there might be
221         // some items with `item.time > until`.
222         let bucket = &mut self.items[self.cursor];
223         let last_idx = match bucket.binary_search_by_key(&until, TimerItem::time) {
224             Ok(mut m) => {
225                 // If there are multiple values, the search will hit any of them.
226                 // Make sure to get them all.
227                 while m < bucket.len() && bucket[m].time == until {
228                     m += 1;
229                 }
230                 m
231             }
232             Err(ins) => ins,
233         };
234         let tail = bucket.split_off(last_idx);
235         buckets.push(mem::replace(bucket, tail));
236         // This tomfoolery with the empty vector ensures that
237         // the returned type here matches the one above precisely
238         // without having to invoke the `either` crate.
239         buckets.into_iter().chain(vec![]).flatten().map(get_item)
240     }
241 }
242 
243 #[cfg(test)]
244 mod test {
245     use super::{Duration, Instant, Timer};
246     use lazy_static::lazy_static;
247 
248     lazy_static! {
249         static ref NOW: Instant = Instant::now();
250     }
251 
252     const GRANULARITY: Duration = Duration::from_millis(10);
253     const CAPACITY: usize = 10;
254     #[test]
create()255     fn create() {
256         let t: Timer<()> = Timer::new(*NOW, GRANULARITY, CAPACITY);
257         assert_eq!(t.span(), Duration::from_millis(100));
258         assert_eq!(None, t.next_time());
259     }
260 
261     #[test]
immediate_entry()262     fn immediate_entry() {
263         let mut t = Timer::new(*NOW, GRANULARITY, CAPACITY);
264         t.add(*NOW, 12);
265         assert_eq!(*NOW, t.next_time().expect("should have an entry"));
266         let values: Vec<_> = t.take_until(*NOW).collect();
267         assert_eq!(vec![12], values);
268     }
269 
270     #[test]
same_time()271     fn same_time() {
272         let mut t = Timer::new(*NOW, GRANULARITY, CAPACITY);
273         let v1 = 12;
274         let v2 = 13;
275         t.add(*NOW, v1);
276         t.add(*NOW, v2);
277         assert_eq!(*NOW, t.next_time().expect("should have an entry"));
278         let values: Vec<_> = t.take_until(*NOW).collect();
279         assert!(values.contains(&v1));
280         assert!(values.contains(&v2));
281     }
282 
283     #[test]
add()284     fn add() {
285         let mut t = Timer::new(*NOW, GRANULARITY, CAPACITY);
286         let near_future = *NOW + Duration::from_millis(17);
287         let v = 9;
288         t.add(near_future, v);
289         assert_eq!(near_future, t.next_time().expect("should return a value"));
290         let values: Vec<_> = t
291             .take_until(near_future - Duration::from_millis(1))
292             .collect();
293         assert!(values.is_empty());
294         let values: Vec<_> = t
295             .take_until(near_future + Duration::from_millis(1))
296             .collect();
297         assert!(values.contains(&v));
298     }
299 
300     #[test]
add_future()301     fn add_future() {
302         let mut t = Timer::new(*NOW, GRANULARITY, CAPACITY);
303         let future = *NOW + Duration::from_millis(117);
304         let v = 9;
305         t.add(future, v);
306         assert_eq!(future, t.next_time().expect("should return a value"));
307         let values: Vec<_> = t.take_until(future).collect();
308         assert!(values.contains(&v));
309     }
310 
311     #[test]
add_far_future()312     fn add_far_future() {
313         let mut t = Timer::new(*NOW, GRANULARITY, CAPACITY);
314         let far_future = *NOW + Duration::from_millis(892);
315         let v = 9;
316         t.add(far_future, v);
317         assert_eq!(far_future, t.next_time().expect("should return a value"));
318         let values: Vec<_> = t.take_until(far_future).collect();
319         assert!(values.contains(&v));
320     }
321 
322     const TIMES: &[Duration] = &[
323         Duration::from_millis(40),
324         Duration::from_millis(91),
325         Duration::from_millis(6),
326         Duration::from_millis(3),
327         Duration::from_millis(22),
328         Duration::from_millis(40),
329     ];
330 
with_times() -> Timer<usize>331     fn with_times() -> Timer<usize> {
332         let mut t = Timer::new(*NOW, GRANULARITY, CAPACITY);
333         for (i, time) in TIMES.iter().enumerate() {
334             t.add(*NOW + *time, i);
335         }
336         assert_eq!(
337             *NOW + *TIMES.iter().min().unwrap(),
338             t.next_time().expect("should have a time")
339         );
340         t
341     }
342 
343     #[test]
multiple_values()344     fn multiple_values() {
345         let mut t = with_times();
346         let values: Vec<_> = t.take_until(*NOW + *TIMES.iter().max().unwrap()).collect();
347         for i in 0..TIMES.len() {
348             assert!(values.contains(&i));
349         }
350     }
351 
352     #[test]
take_far_future()353     fn take_far_future() {
354         let mut t = with_times();
355         let values: Vec<_> = t.take_until(*NOW + Duration::from_secs(100)).collect();
356         for i in 0..TIMES.len() {
357             assert!(values.contains(&i));
358         }
359     }
360 
361     #[test]
remove_each()362     fn remove_each() {
363         let mut t = with_times();
364         for (i, time) in TIMES.iter().enumerate() {
365             assert_eq!(Some(i), t.remove(*NOW + *time, |&x| x == i));
366         }
367         assert_eq!(None, t.next_time());
368     }
369 
370     #[test]
remove_future()371     fn remove_future() {
372         let mut t = Timer::new(*NOW, GRANULARITY, CAPACITY);
373         let future = *NOW + Duration::from_millis(117);
374         let v = 9;
375         t.add(future, v);
376 
377         assert_eq!(Some(v), t.remove(future, |candidate| *candidate == v));
378     }
379 
380     #[test]
remove_too_far_future()381     fn remove_too_far_future() {
382         let mut t = Timer::new(*NOW, GRANULARITY, CAPACITY);
383         let future = *NOW + Duration::from_millis(117);
384         let too_far_future = *NOW + t.span() + Duration::from_millis(117);
385         let v = 9;
386         t.add(future, v);
387 
388         assert_eq!(None, t.remove(too_far_future, |candidate| *candidate == v));
389     }
390 }
391