1 use crate::time::driver::{TimerHandle, TimerShared};
2 use crate::time::error::InsertError;
3 
4 mod level;
5 pub(crate) use self::level::Expiration;
6 use self::level::Level;
7 
8 use std::ptr::NonNull;
9 
10 use super::EntryList;
11 
12 /// Timing wheel implementation.
13 ///
14 /// This type provides the hashed timing wheel implementation that backs `Timer`
15 /// and `DelayQueue`.
16 ///
17 /// The structure is generic over `T: Stack`. This allows handling timeout data
18 /// being stored on the heap or in a slab. In order to support the latter case,
19 /// the slab must be passed into each function allowing the implementation to
20 /// lookup timer entries.
21 ///
22 /// See `Timer` documentation for some implementation notes.
23 #[derive(Debug)]
24 pub(crate) struct Wheel {
25     /// The number of milliseconds elapsed since the wheel started.
26     elapsed: u64,
27 
28     /// Timer wheel.
29     ///
30     /// Levels:
31     ///
32     /// * 1 ms slots / 64 ms range
33     /// * 64 ms slots / ~ 4 sec range
34     /// * ~ 4 sec slots / ~ 4 min range
35     /// * ~ 4 min slots / ~ 4 hr range
36     /// * ~ 4 hr slots / ~ 12 day range
37     /// * ~ 12 day slots / ~ 2 yr range
38     levels: Vec<Level>,
39 
40     /// Entries queued for firing
41     pending: EntryList,
42 }
43 
44 /// Number of levels. Each level has 64 slots. By using 6 levels with 64 slots
45 /// each, the timer is able to track time up to 2 years into the future with a
46 /// precision of 1 millisecond.
47 const NUM_LEVELS: usize = 6;
48 
49 /// The maximum duration of a `Sleep`
50 pub(super) const MAX_DURATION: u64 = (1 << (6 * NUM_LEVELS)) - 1;
51 
52 impl Wheel {
53     /// Create a new timing wheel
new() -> Wheel54     pub(crate) fn new() -> Wheel {
55         let levels = (0..NUM_LEVELS).map(Level::new).collect();
56 
57         Wheel {
58             elapsed: 0,
59             levels,
60             pending: EntryList::new(),
61         }
62     }
63 
64     /// Return the number of milliseconds that have elapsed since the timing
65     /// wheel's creation.
elapsed(&self) -> u6466     pub(crate) fn elapsed(&self) -> u64 {
67         self.elapsed
68     }
69 
70     /// Insert an entry into the timing wheel.
71     ///
72     /// # Arguments
73     ///
74     /// * `item`: The item to insert into the wheel.
75     ///
76     /// # Return
77     ///
78     /// Returns `Ok` when the item is successfully inserted, `Err` otherwise.
79     ///
80     /// `Err(Elapsed)` indicates that `when` represents an instant that has
81     /// already passed. In this case, the caller should fire the timeout
82     /// immediately.
83     ///
84     /// `Err(Invalid)` indicates an invalid `when` argument as been supplied.
85     ///
86     /// # Safety
87     ///
88     /// This function registers item into an intrusive linked list. The caller
89     /// must ensure that `item` is pinned and will not be dropped without first
90     /// being deregistered.
insert( &mut self, item: TimerHandle, ) -> Result<u64, (TimerHandle, InsertError)>91     pub(crate) unsafe fn insert(
92         &mut self,
93         item: TimerHandle,
94     ) -> Result<u64, (TimerHandle, InsertError)> {
95         let when = item.sync_when();
96 
97         if when <= self.elapsed {
98             return Err((item, InsertError::Elapsed));
99         }
100 
101         // Get the level at which the entry should be stored
102         let level = self.level_for(when);
103 
104         unsafe {
105             self.levels[level].add_entry(item);
106         }
107 
108         debug_assert!({
109             self.levels[level]
110                 .next_expiration(self.elapsed)
111                 .map(|e| e.deadline >= self.elapsed)
112                 .unwrap_or(true)
113         });
114 
115         Ok(when)
116     }
117 
118     /// Remove `item` from the timing wheel.
remove(&mut self, item: NonNull<TimerShared>)119     pub(crate) unsafe fn remove(&mut self, item: NonNull<TimerShared>) {
120         unsafe {
121             let when = item.as_ref().cached_when();
122             if when == u64::MAX {
123                 self.pending.remove(item);
124             } else {
125                 debug_assert!(
126                     self.elapsed <= when,
127                     "elapsed={}; when={}",
128                     self.elapsed,
129                     when
130                 );
131 
132                 let level = self.level_for(when);
133 
134                 self.levels[level].remove_entry(item);
135             }
136         }
137     }
138 
139     /// Instant at which to poll
poll_at(&self) -> Option<u64>140     pub(crate) fn poll_at(&self) -> Option<u64> {
141         self.next_expiration().map(|expiration| expiration.deadline)
142     }
143 
144     /// Advances the timer up to the instant represented by `now`.
poll(&mut self, now: u64) -> Option<TimerHandle>145     pub(crate) fn poll(&mut self, now: u64) -> Option<TimerHandle> {
146         loop {
147             if let Some(handle) = self.pending.pop_back() {
148                 return Some(handle);
149             }
150 
151             // under what circumstances is poll.expiration Some vs. None?
152             let expiration = self.next_expiration().and_then(|expiration| {
153                 if expiration.deadline > now {
154                     None
155                 } else {
156                     Some(expiration)
157                 }
158             });
159 
160             match expiration {
161                 Some(ref expiration) if expiration.deadline > now => return None,
162                 Some(ref expiration) => {
163                     self.process_expiration(expiration);
164 
165                     self.set_elapsed(expiration.deadline);
166                 }
167                 None => {
168                     // in this case the poll did not indicate an expiration
169                     // _and_ we were not able to find a next expiration in
170                     // the current list of timers.  advance to the poll's
171                     // current time and do nothing else.
172                     self.set_elapsed(now);
173                     break;
174                 }
175             }
176         }
177 
178         self.pending.pop_back()
179     }
180 
181     /// Returns the instant at which the next timeout expires.
next_expiration(&self) -> Option<Expiration>182     fn next_expiration(&self) -> Option<Expiration> {
183         if !self.pending.is_empty() {
184             // Expire immediately as we have things pending firing
185             return Some(Expiration {
186                 level: 0,
187                 slot: 0,
188                 deadline: self.elapsed,
189             });
190         }
191 
192         // Check all levels
193         for level in 0..NUM_LEVELS {
194             if let Some(expiration) = self.levels[level].next_expiration(self.elapsed) {
195                 // There cannot be any expirations at a higher level that happen
196                 // before this one.
197                 debug_assert!(self.no_expirations_before(level + 1, expiration.deadline));
198 
199                 return Some(expiration);
200             }
201         }
202 
203         None
204     }
205 
206     /// Returns the tick at which this timer wheel next needs to perform some
207     /// processing, or None if there are no timers registered.
next_expiration_time(&self) -> Option<u64>208     pub(super) fn next_expiration_time(&self) -> Option<u64> {
209         self.next_expiration().map(|ex| ex.deadline)
210     }
211 
212     /// Used for debug assertions
no_expirations_before(&self, start_level: usize, before: u64) -> bool213     fn no_expirations_before(&self, start_level: usize, before: u64) -> bool {
214         let mut res = true;
215 
216         for l2 in start_level..NUM_LEVELS {
217             if let Some(e2) = self.levels[l2].next_expiration(self.elapsed) {
218                 if e2.deadline < before {
219                     res = false;
220                 }
221             }
222         }
223 
224         res
225     }
226 
227     /// iteratively find entries that are between the wheel's current
228     /// time and the expiration time.  for each in that population either
229     /// queue it for notification (in the case of the last level) or tier
230     /// it down to the next level (in all other cases).
process_expiration(&mut self, expiration: &Expiration)231     pub(crate) fn process_expiration(&mut self, expiration: &Expiration) {
232         // Note that we need to take _all_ of the entries off the list before
233         // processing any of them. This is important because it's possible that
234         // those entries might need to be reinserted into the same slot.
235         //
236         // This happens only on the highest level, when an entry is inserted
237         // more than MAX_DURATION into the future. When this happens, we wrap
238         // around, and process some entries a multiple of MAX_DURATION before
239         // they actually need to be dropped down a level. We then reinsert them
240         // back into the same position; we must make sure we don't then process
241         // those entries again or we'll end up in an infinite loop.
242         let mut entries = self.take_entries(expiration);
243 
244         while let Some(item) = entries.pop_back() {
245             if expiration.level == 0 {
246                 debug_assert_eq!(unsafe { item.cached_when() }, expiration.deadline);
247             }
248 
249             // Try to expire the entry; this is cheap (doesn't synchronize) if
250             // the timer is not expired, and updates cached_when.
251             match unsafe { item.mark_pending(expiration.deadline) } {
252                 Ok(()) => {
253                     // Item was expired
254                     self.pending.push_front(item);
255                 }
256                 Err(expiration_tick) => {
257                     let level = level_for(expiration.deadline, expiration_tick);
258                     unsafe {
259                         self.levels[level].add_entry(item);
260                     }
261                 }
262             }
263         }
264     }
265 
set_elapsed(&mut self, when: u64)266     fn set_elapsed(&mut self, when: u64) {
267         assert!(
268             self.elapsed <= when,
269             "elapsed={:?}; when={:?}",
270             self.elapsed,
271             when
272         );
273 
274         if when > self.elapsed {
275             self.elapsed = when;
276         }
277     }
278 
279     /// Obtains the list of entries that need processing for the given expiration.
280     ///
take_entries(&mut self, expiration: &Expiration) -> EntryList281     fn take_entries(&mut self, expiration: &Expiration) -> EntryList {
282         self.levels[expiration.level].take_slot(expiration.slot)
283     }
284 
level_for(&self, when: u64) -> usize285     fn level_for(&self, when: u64) -> usize {
286         level_for(self.elapsed, when)
287     }
288 }
289 
level_for(elapsed: u64, when: u64) -> usize290 fn level_for(elapsed: u64, when: u64) -> usize {
291     const SLOT_MASK: u64 = (1 << 6) - 1;
292 
293     // Mask in the trailing bits ignored by the level calculation in order to cap
294     // the possible leading zeros
295     let mut masked = elapsed ^ when | SLOT_MASK;
296 
297     if masked >= MAX_DURATION {
298         // Fudge the timer into the top level
299         masked = MAX_DURATION - 1;
300     }
301 
302     let leading_zeros = masked.leading_zeros() as usize;
303     let significant = 63 - leading_zeros;
304 
305     significant / 6
306 }
307 
308 #[cfg(all(test, not(loom)))]
309 mod test {
310     use super::*;
311 
312     #[test]
test_level_for()313     fn test_level_for() {
314         for pos in 0..64 {
315             assert_eq!(
316                 0,
317                 level_for(0, pos),
318                 "level_for({}) -- binary = {:b}",
319                 pos,
320                 pos
321             );
322         }
323 
324         for level in 1..5 {
325             for pos in level..64 {
326                 let a = pos * 64_usize.pow(level as u32);
327                 assert_eq!(
328                     level,
329                     level_for(0, a as u64),
330                     "level_for({}) -- binary = {:b}",
331                     a,
332                     a
333                 );
334 
335                 if pos > level {
336                     let a = a - 1;
337                     assert_eq!(
338                         level,
339                         level_for(0, a as u64),
340                         "level_for({}) -- binary = {:b}",
341                         a,
342                         a
343                     );
344                 }
345 
346                 if pos < 64 {
347                     let a = a + 1;
348                     assert_eq!(
349                         level,
350                         level_for(0, a as u64),
351                         "level_for({}) -- binary = {:b}",
352                         a,
353                         a
354                     );
355                 }
356             }
357         }
358     }
359 }
360