1 mod level;
2 pub(crate) use self::level::Expiration;
3 use self::level::Level;
4
5 mod stack;
6 pub(crate) use self::stack::Stack;
7
8 use std::borrow::Borrow;
9 use std::usize;
10
11 /// Timing wheel implementation.
12 ///
13 /// This type provides the hashed timing wheel implementation that backs `Timer`
14 /// and `DelayQueue`.
15 ///
16 /// The structure is generic over `T: Stack`. This allows handling timeout data
17 /// being stored on the heap or in a slab. In order to support the latter case,
18 /// the slab must be passed into each function allowing the implementation to
19 /// lookup timer entries.
20 ///
21 /// See `Timer` documentation for some implementation notes.
22 #[derive(Debug)]
23 pub(crate) struct Wheel<T> {
24 /// The number of milliseconds elapsed since the wheel started.
25 elapsed: u64,
26
27 /// Timer wheel.
28 ///
29 /// Levels:
30 ///
31 /// * 1 ms slots / 64 ms range
32 /// * 64 ms slots / ~ 4 sec range
33 /// * ~ 4 sec slots / ~ 4 min range
34 /// * ~ 4 min slots / ~ 4 hr range
35 /// * ~ 4 hr slots / ~ 12 day range
36 /// * ~ 12 day slots / ~ 2 yr range
37 levels: Vec<Level<T>>,
38 }
39
40 /// Number of levels. Each level has 64 slots. By using 6 levels with 64 slots
41 /// each, the timer is able to track time up to 2 years into the future with a
42 /// precision of 1 millisecond.
43 const NUM_LEVELS: usize = 6;
44
45 /// The maximum duration of a delay
46 const MAX_DURATION: u64 = (1 << (6 * NUM_LEVELS)) - 1;
47
48 #[derive(Debug)]
49 pub(crate) enum InsertError {
50 Elapsed,
51 Invalid,
52 }
53
54 impl<T> Wheel<T>
55 where
56 T: Stack,
57 {
58 /// Create a new timing wheel
new() -> Wheel<T>59 pub(crate) fn new() -> Wheel<T> {
60 let levels = (0..NUM_LEVELS).map(Level::new).collect();
61
62 Wheel { elapsed: 0, levels }
63 }
64
65 /// Return the number of milliseconds that have elapsed since the timing
66 /// wheel's creation.
elapsed(&self) -> u6467 pub(crate) fn elapsed(&self) -> u64 {
68 self.elapsed
69 }
70
71 /// Insert an entry into the timing wheel.
72 ///
73 /// # Arguments
74 ///
75 /// * `when`: is the instant at which the entry should be fired. It is
76 /// represented as the number of milliseconds since the creation
77 /// of the timing wheel.
78 ///
79 /// * `item`: The item to insert into the wheel.
80 ///
81 /// * `store`: The slab or `()` when using heap storage.
82 ///
83 /// # Return
84 ///
85 /// Returns `Ok` when the item is successfully inserted, `Err` otherwise.
86 ///
87 /// `Err(Elapsed)` indicates that `when` represents an instant that has
88 /// already passed. In this case, the caller should fire the timeout
89 /// immediately.
90 ///
91 /// `Err(Invalid)` indicates an invalid `when` argument as been supplied.
insert( &mut self, when: u64, item: T::Owned, store: &mut T::Store, ) -> Result<(), (T::Owned, InsertError)>92 pub(crate) fn insert(
93 &mut self,
94 when: u64,
95 item: T::Owned,
96 store: &mut T::Store,
97 ) -> Result<(), (T::Owned, InsertError)> {
98 if when <= self.elapsed {
99 return Err((item, InsertError::Elapsed));
100 } else if when - self.elapsed > MAX_DURATION {
101 return Err((item, InsertError::Invalid));
102 }
103
104 // Get the level at which the entry should be stored
105 let level = self.level_for(when);
106
107 self.levels[level].add_entry(when, item, store);
108
109 debug_assert!({
110 self.levels[level]
111 .next_expiration(self.elapsed)
112 .map(|e| e.deadline >= self.elapsed)
113 .unwrap_or(true)
114 });
115
116 Ok(())
117 }
118
119 /// Remove `item` from the timing wheel.
remove(&mut self, item: &T::Borrowed, store: &mut T::Store)120 pub(crate) fn remove(&mut self, item: &T::Borrowed, store: &mut T::Store) {
121 let when = T::when(item, store);
122
123 assert!(
124 self.elapsed <= when,
125 "elapsed={}; when={}",
126 self.elapsed,
127 when
128 );
129
130 let level = self.level_for(when);
131
132 self.levels[level].remove_entry(when, item, store);
133 }
134
135 /// Instant at which to poll
poll_at(&self) -> Option<u64>136 pub(crate) fn poll_at(&self) -> Option<u64> {
137 self.next_expiration().map(|expiration| expiration.deadline)
138 }
139
140 /// Advances the timer up to the instant represented by `now`.
poll(&mut self, now: u64, store: &mut T::Store) -> Option<T::Owned>141 pub(crate) fn poll(&mut self, now: u64, store: &mut T::Store) -> Option<T::Owned> {
142 loop {
143 let expiration = self.next_expiration().and_then(|expiration| {
144 if expiration.deadline > now {
145 None
146 } else {
147 Some(expiration)
148 }
149 });
150
151 match expiration {
152 Some(ref expiration) => {
153 if let Some(item) = self.poll_expiration(expiration, store) {
154 return Some(item);
155 }
156
157 self.set_elapsed(expiration.deadline);
158 }
159 None => {
160 // in this case the poll did not indicate an expiration
161 // _and_ we were not able to find a next expiration in
162 // the current list of timers. advance to the poll's
163 // current time and do nothing else.
164 self.set_elapsed(now);
165 return None;
166 }
167 }
168 }
169 }
170
171 /// Returns the instant at which the next timeout expires.
next_expiration(&self) -> Option<Expiration>172 fn next_expiration(&self) -> Option<Expiration> {
173 // Check all levels
174 for level in 0..NUM_LEVELS {
175 if let Some(expiration) = self.levels[level].next_expiration(self.elapsed) {
176 // There cannot be any expirations at a higher level that happen
177 // before this one.
178 debug_assert!(self.no_expirations_before(level + 1, expiration.deadline));
179
180 return Some(expiration);
181 }
182 }
183
184 None
185 }
186
187 /// Used for debug assertions
no_expirations_before(&self, start_level: usize, before: u64) -> bool188 fn no_expirations_before(&self, start_level: usize, before: u64) -> bool {
189 let mut res = true;
190
191 for l2 in start_level..NUM_LEVELS {
192 if let Some(e2) = self.levels[l2].next_expiration(self.elapsed) {
193 if e2.deadline < before {
194 res = false;
195 }
196 }
197 }
198
199 res
200 }
201
202 /// iteratively find entries that are between the wheel's current
203 /// time and the expiration time. for each in that population either
204 /// return it for notification (in the case of the last level) or tier
205 /// it down to the next level (in all other cases).
poll_expiration( &mut self, expiration: &Expiration, store: &mut T::Store, ) -> Option<T::Owned>206 pub(crate) fn poll_expiration(
207 &mut self,
208 expiration: &Expiration,
209 store: &mut T::Store,
210 ) -> Option<T::Owned> {
211 while let Some(item) = self.pop_entry(expiration, store) {
212 if expiration.level == 0 {
213 debug_assert_eq!(T::when(item.borrow(), store), expiration.deadline);
214
215 return Some(item);
216 } else {
217 let when = T::when(item.borrow(), store);
218
219 let next_level = expiration.level - 1;
220
221 self.levels[next_level].add_entry(when, item, store);
222 }
223 }
224
225 None
226 }
227
set_elapsed(&mut self, when: u64)228 fn set_elapsed(&mut self, when: u64) {
229 assert!(
230 self.elapsed <= when,
231 "elapsed={:?}; when={:?}",
232 self.elapsed,
233 when
234 );
235
236 if when > self.elapsed {
237 self.elapsed = when;
238 }
239 }
240
pop_entry(&mut self, expiration: &Expiration, store: &mut T::Store) -> Option<T::Owned>241 fn pop_entry(&mut self, expiration: &Expiration, store: &mut T::Store) -> Option<T::Owned> {
242 self.levels[expiration.level].pop_entry_slot(expiration.slot, store)
243 }
244
level_for(&self, when: u64) -> usize245 fn level_for(&self, when: u64) -> usize {
246 level_for(self.elapsed, when)
247 }
248 }
249
level_for(elapsed: u64, when: u64) -> usize250 fn level_for(elapsed: u64, when: u64) -> usize {
251 const SLOT_MASK: u64 = (1 << 6) - 1;
252
253 // Mask in the trailing bits ignored by the level calculation in order to cap
254 // the possible leading zeros
255 let masked = elapsed ^ when | SLOT_MASK;
256
257 let leading_zeros = masked.leading_zeros() as usize;
258 let significant = 63 - leading_zeros;
259 significant / 6
260 }
261
262 #[cfg(all(test, not(loom)))]
263 mod test {
264 use super::*;
265
266 #[test]
test_level_for()267 fn test_level_for() {
268 for pos in 0..64 {
269 assert_eq!(
270 0,
271 level_for(0, pos),
272 "level_for({}) -- binary = {:b}",
273 pos,
274 pos
275 );
276 }
277
278 for level in 1..5 {
279 for pos in level..64 {
280 let a = pos * 64_usize.pow(level as u32);
281 assert_eq!(
282 level,
283 level_for(0, a as u64),
284 "level_for({}) -- binary = {:b}",
285 a,
286 a
287 );
288
289 if pos > level {
290 let a = a - 1;
291 assert_eq!(
292 level,
293 level_for(0, a as u64),
294 "level_for({}) -- binary = {:b}",
295 a,
296 a
297 );
298 }
299
300 if pos < 64 {
301 let a = a + 1;
302 assert_eq!(
303 level,
304 level_for(0, a as u64),
305 "level_for({}) -- binary = {:b}",
306 a,
307 a
308 );
309 }
310 }
311 }
312 }
313 }
314