1 mod level;
2 pub(crate) use self::level::Expiration;
3 use self::level::Level;
4 
5 mod stack;
6 pub(crate) use self::stack::Stack;
7 
8 use std::borrow::Borrow;
9 use std::usize;
10 
11 /// Timing wheel implementation.
12 ///
13 /// This type provides the hashed timing wheel implementation that backs `Timer`
14 /// and `DelayQueue`.
15 ///
16 /// The structure is generic over `T: Stack`. This allows handling timeout data
17 /// being stored on the heap or in a slab. In order to support the latter case,
18 /// the slab must be passed into each function allowing the implementation to
19 /// lookup timer entries.
20 ///
21 /// See `Timer` documentation for some implementation notes.
22 #[derive(Debug)]
23 pub(crate) struct Wheel<T> {
24     /// The number of milliseconds elapsed since the wheel started.
25     elapsed: u64,
26 
27     /// Timer wheel.
28     ///
29     /// Levels:
30     ///
31     /// * 1 ms slots / 64 ms range
32     /// * 64 ms slots / ~ 4 sec range
33     /// * ~ 4 sec slots / ~ 4 min range
34     /// * ~ 4 min slots / ~ 4 hr range
35     /// * ~ 4 hr slots / ~ 12 day range
36     /// * ~ 12 day slots / ~ 2 yr range
37     levels: Vec<Level<T>>,
38 }
39 
40 /// Number of levels. Each level has 64 slots. By using 6 levels with 64 slots
41 /// each, the timer is able to track time up to 2 years into the future with a
42 /// precision of 1 millisecond.
43 const NUM_LEVELS: usize = 6;
44 
45 /// The maximum duration of a delay
46 const MAX_DURATION: u64 = (1 << (6 * NUM_LEVELS)) - 1;
47 
48 #[derive(Debug)]
49 pub(crate) enum InsertError {
50     Elapsed,
51     Invalid,
52 }
53 
54 impl<T> Wheel<T>
55 where
56     T: Stack,
57 {
58     /// Create a new timing wheel
new() -> Wheel<T>59     pub(crate) fn new() -> Wheel<T> {
60         let levels = (0..NUM_LEVELS).map(Level::new).collect();
61 
62         Wheel { elapsed: 0, levels }
63     }
64 
65     /// Return the number of milliseconds that have elapsed since the timing
66     /// wheel's creation.
elapsed(&self) -> u6467     pub(crate) fn elapsed(&self) -> u64 {
68         self.elapsed
69     }
70 
71     /// Insert an entry into the timing wheel.
72     ///
73     /// # Arguments
74     ///
75     /// * `when`: is the instant at which the entry should be fired. It is
76     ///           represented as the number of milliseconds since the creation
77     ///           of the timing wheel.
78     ///
79     /// * `item`: The item to insert into the wheel.
80     ///
81     /// * `store`: The slab or `()` when using heap storage.
82     ///
83     /// # Return
84     ///
85     /// Returns `Ok` when the item is successfully inserted, `Err` otherwise.
86     ///
87     /// `Err(Elapsed)` indicates that `when` represents an instant that has
88     /// already passed. In this case, the caller should fire the timeout
89     /// immediately.
90     ///
91     /// `Err(Invalid)` indicates an invalid `when` argument as been supplied.
insert( &mut self, when: u64, item: T::Owned, store: &mut T::Store, ) -> Result<(), (T::Owned, InsertError)>92     pub(crate) fn insert(
93         &mut self,
94         when: u64,
95         item: T::Owned,
96         store: &mut T::Store,
97     ) -> Result<(), (T::Owned, InsertError)> {
98         if when <= self.elapsed {
99             return Err((item, InsertError::Elapsed));
100         } else if when - self.elapsed > MAX_DURATION {
101             return Err((item, InsertError::Invalid));
102         }
103 
104         // Get the level at which the entry should be stored
105         let level = self.level_for(when);
106 
107         self.levels[level].add_entry(when, item, store);
108 
109         debug_assert!({
110             self.levels[level]
111                 .next_expiration(self.elapsed)
112                 .map(|e| e.deadline >= self.elapsed)
113                 .unwrap_or(true)
114         });
115 
116         Ok(())
117     }
118 
119     /// Remove `item` from the timing wheel.
remove(&mut self, item: &T::Borrowed, store: &mut T::Store)120     pub(crate) fn remove(&mut self, item: &T::Borrowed, store: &mut T::Store) {
121         let when = T::when(item, store);
122 
123         assert!(
124             self.elapsed <= when,
125             "elapsed={}; when={}",
126             self.elapsed,
127             when
128         );
129 
130         let level = self.level_for(when);
131 
132         self.levels[level].remove_entry(when, item, store);
133     }
134 
135     /// Instant at which to poll
poll_at(&self) -> Option<u64>136     pub(crate) fn poll_at(&self) -> Option<u64> {
137         self.next_expiration().map(|expiration| expiration.deadline)
138     }
139 
140     /// Advances the timer up to the instant represented by `now`.
poll(&mut self, now: u64, store: &mut T::Store) -> Option<T::Owned>141     pub(crate) fn poll(&mut self, now: u64, store: &mut T::Store) -> Option<T::Owned> {
142         loop {
143             let expiration = self.next_expiration().and_then(|expiration| {
144                 if expiration.deadline > now {
145                     None
146                 } else {
147                     Some(expiration)
148                 }
149             });
150 
151             match expiration {
152                 Some(ref expiration) => {
153                     if let Some(item) = self.poll_expiration(expiration, store) {
154                         return Some(item);
155                     }
156 
157                     self.set_elapsed(expiration.deadline);
158                 }
159                 None => {
160                     // in this case the poll did not indicate an expiration
161                     // _and_ we were not able to find a next expiration in
162                     // the current list of timers.  advance to the poll's
163                     // current time and do nothing else.
164                     self.set_elapsed(now);
165                     return None;
166                 }
167             }
168         }
169     }
170 
171     /// Returns the instant at which the next timeout expires.
next_expiration(&self) -> Option<Expiration>172     fn next_expiration(&self) -> Option<Expiration> {
173         // Check all levels
174         for level in 0..NUM_LEVELS {
175             if let Some(expiration) = self.levels[level].next_expiration(self.elapsed) {
176                 // There cannot be any expirations at a higher level that happen
177                 // before this one.
178                 debug_assert!(self.no_expirations_before(level + 1, expiration.deadline));
179 
180                 return Some(expiration);
181             }
182         }
183 
184         None
185     }
186 
187     /// Used for debug assertions
no_expirations_before(&self, start_level: usize, before: u64) -> bool188     fn no_expirations_before(&self, start_level: usize, before: u64) -> bool {
189         let mut res = true;
190 
191         for l2 in start_level..NUM_LEVELS {
192             if let Some(e2) = self.levels[l2].next_expiration(self.elapsed) {
193                 if e2.deadline < before {
194                     res = false;
195                 }
196             }
197         }
198 
199         res
200     }
201 
202     /// iteratively find entries that are between the wheel's current
203     /// time and the expiration time.  for each in that population either
204     /// return it for notification (in the case of the last level) or tier
205     /// it down to the next level (in all other cases).
poll_expiration( &mut self, expiration: &Expiration, store: &mut T::Store, ) -> Option<T::Owned>206     pub(crate) fn poll_expiration(
207         &mut self,
208         expiration: &Expiration,
209         store: &mut T::Store,
210     ) -> Option<T::Owned> {
211         while let Some(item) = self.pop_entry(expiration, store) {
212             if expiration.level == 0 {
213                 debug_assert_eq!(T::when(item.borrow(), store), expiration.deadline);
214 
215                 return Some(item);
216             } else {
217                 let when = T::when(item.borrow(), store);
218 
219                 let next_level = expiration.level - 1;
220 
221                 self.levels[next_level].add_entry(when, item, store);
222             }
223         }
224 
225         None
226     }
227 
set_elapsed(&mut self, when: u64)228     fn set_elapsed(&mut self, when: u64) {
229         assert!(
230             self.elapsed <= when,
231             "elapsed={:?}; when={:?}",
232             self.elapsed,
233             when
234         );
235 
236         if when > self.elapsed {
237             self.elapsed = when;
238         }
239     }
240 
pop_entry(&mut self, expiration: &Expiration, store: &mut T::Store) -> Option<T::Owned>241     fn pop_entry(&mut self, expiration: &Expiration, store: &mut T::Store) -> Option<T::Owned> {
242         self.levels[expiration.level].pop_entry_slot(expiration.slot, store)
243     }
244 
level_for(&self, when: u64) -> usize245     fn level_for(&self, when: u64) -> usize {
246         level_for(self.elapsed, when)
247     }
248 }
249 
level_for(elapsed: u64, when: u64) -> usize250 fn level_for(elapsed: u64, when: u64) -> usize {
251     const SLOT_MASK: u64 = (1 << 6) - 1;
252 
253     // Mask in the trailing bits ignored by the level calculation in order to cap
254     // the possible leading zeros
255     let masked = elapsed ^ when | SLOT_MASK;
256 
257     let leading_zeros = masked.leading_zeros() as usize;
258     let significant = 63 - leading_zeros;
259     significant / 6
260 }
261 
262 #[cfg(all(test, not(loom)))]
263 mod test {
264     use super::*;
265 
266     #[test]
test_level_for()267     fn test_level_for() {
268         for pos in 0..64 {
269             assert_eq!(
270                 0,
271                 level_for(0, pos),
272                 "level_for({}) -- binary = {:b}",
273                 pos,
274                 pos
275             );
276         }
277 
278         for level in 1..5 {
279             for pos in level..64 {
280                 let a = pos * 64_usize.pow(level as u32);
281                 assert_eq!(
282                     level,
283                     level_for(0, a as u64),
284                     "level_for({}) -- binary = {:b}",
285                     a,
286                     a
287                 );
288 
289                 if pos > level {
290                     let a = a - 1;
291                     assert_eq!(
292                         level,
293                         level_for(0, a as u64),
294                         "level_for({}) -- binary = {:b}",
295                         a,
296                         a
297                     );
298                 }
299 
300                 if pos < 64 {
301                     let a = a + 1;
302                     assert_eq!(
303                         level,
304                         level_for(0, a as u64),
305                         "level_for({}) -- binary = {:b}",
306                         a,
307                         a
308                     );
309                 }
310             }
311         }
312     }
313 }
314