1 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or 2 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license 3 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your 4 // option. This file may not be copied, modified, or distributed 5 // except according to those terms. 6 7 use std::convert::TryFrom; 8 use std::mem; 9 use std::time::{Duration, Instant}; 10 11 /// Internal structure for a timer item. 12 struct TimerItem<T> { 13 time: Instant, 14 item: T, 15 } 16 17 impl<T> TimerItem<T> { time(ti: &Self) -> Instant18 fn time(ti: &Self) -> Instant { 19 ti.time 20 } 21 } 22 23 /// A timer queue. 24 /// This uses a classic timer wheel arrangement, with some characteristics that might be considered peculiar. 25 /// Each slot in the wheel is sorted (complexity O(N) insertions, but O(logN) to find cut points). 26 /// Time is relative, the wheel has an origin time and it is unable to represent times that are more than 27 /// `granularity * capacity` past that time. 28 pub struct Timer<T> { 29 items: Vec<Vec<TimerItem<T>>>, 30 now: Instant, 31 granularity: Duration, 32 cursor: usize, 33 } 34 35 impl<T> Timer<T> { 36 /// Construct a new wheel at the given granularity, starting at the given time. 37 /// # Panics 38 /// When `capacity` is too large to fit in `u32` or `granularity` is zero. new(now: Instant, granularity: Duration, capacity: usize) -> Self39 pub fn new(now: Instant, granularity: Duration, capacity: usize) -> Self { 40 assert!(u32::try_from(capacity).is_ok()); 41 assert!(granularity.as_nanos() > 0); 42 let mut items = Vec::with_capacity(capacity); 43 items.resize_with(capacity, Default::default); 44 Self { 45 items, 46 now, 47 granularity, 48 cursor: 0, 49 } 50 } 51 52 /// Return a reference to the time of the next entry. 53 #[must_use] next_time(&self) -> Option<Instant>54 pub fn next_time(&self) -> Option<Instant> { 55 for i in 0..self.items.len() { 56 let idx = self.bucket(i); 57 if let Some(t) = self.items[idx].first() { 58 return Some(t.time); 59 } 60 } 61 None 62 } 63 64 /// Get the full span of time that this can cover. 65 /// Two timers cannot be more than this far apart. 66 /// In practice, this value is less by one amount of the timer granularity. 67 #[inline] 68 #[allow(clippy::cast_possible_truncation)] // guarded by assertion 69 #[must_use] span(&self) -> Duration70 pub fn span(&self) -> Duration { 71 self.granularity * (self.items.len() as u32) 72 } 73 74 /// For the given `time`, get the number of whole buckets in the future that is. 75 #[inline] 76 #[allow(clippy::cast_possible_truncation)] // guarded by assertion delta(&self, time: Instant) -> usize77 fn delta(&self, time: Instant) -> usize { 78 // This really should use Instant::div_duration(), but it can't yet. 79 ((time - self.now).as_nanos() / self.granularity.as_nanos()) as usize 80 } 81 82 #[inline] time_bucket(&self, time: Instant) -> usize83 fn time_bucket(&self, time: Instant) -> usize { 84 self.bucket(self.delta(time)) 85 } 86 87 #[inline] bucket(&self, delta: usize) -> usize88 fn bucket(&self, delta: usize) -> usize { 89 debug_assert!(delta < self.items.len()); 90 (self.cursor + delta) % self.items.len() 91 } 92 93 /// Slide forward in time by `n * self.granularity`. 94 #[allow(unknown_lints, renamed_and_removed_lints, clippy::unknown_clippy_lints)] // Until we require rust 1.45. 95 #[allow(clippy::cast_possible_truncation, clippy::reversed_empty_ranges)] 96 // cast_possible_truncation is ok because we have an assertion guard. 97 // reversed_empty_ranges is to avoid different types on the if/else. tick(&mut self, n: usize)98 fn tick(&mut self, n: usize) { 99 let new = self.bucket(n); 100 let iter = if new < self.cursor { 101 (self.cursor..self.items.len()).chain(0..new) 102 } else { 103 (self.cursor..new).chain(0..0) 104 }; 105 for i in iter { 106 assert!(self.items[i].is_empty()); 107 } 108 self.now += self.granularity * (n as u32); 109 self.cursor = new; 110 } 111 112 /// Asserts if the time given is in the past or too far in the future. 113 /// # Panics 114 /// When `time` is in the past relative to previous calls. add(&mut self, time: Instant, item: T)115 pub fn add(&mut self, time: Instant, item: T) { 116 assert!(time >= self.now); 117 // Skip forward quickly if there is too large a gap. 118 let short_span = self.span() - self.granularity; 119 if time >= (self.now + self.span() + short_span) { 120 // Assert that there aren't any items. 121 for i in &self.items { 122 debug_assert!(i.is_empty()); 123 } 124 self.now = time - short_span; 125 self.cursor = 0; 126 } 127 128 // Adjust time forward the minimum amount necessary. 129 let mut d = self.delta(time); 130 if d >= self.items.len() { 131 self.tick(1 + d - self.items.len()); 132 d = self.items.len() - 1; 133 } 134 135 let bucket = self.bucket(d); 136 let ins = match self.items[bucket].binary_search_by_key(&time, TimerItem::time) { 137 Ok(j) | Err(j) => j, 138 }; 139 self.items[bucket].insert(ins, TimerItem { time, item }); 140 } 141 142 /// Given knowledge of the time an item was added, remove it. 143 /// This requires use of a predicate that identifies matching items. remove<F>(&mut self, time: Instant, mut selector: F) -> Option<T> where F: FnMut(&T) -> bool,144 pub fn remove<F>(&mut self, time: Instant, mut selector: F) -> Option<T> 145 where 146 F: FnMut(&T) -> bool, 147 { 148 if time < self.now { 149 return None; 150 } 151 if time > self.now + self.span() { 152 return None; 153 } 154 let bucket = self.time_bucket(time); 155 let start_index = match self.items[bucket].binary_search_by_key(&time, TimerItem::time) { 156 Ok(idx) => idx, 157 Err(_) => return None, 158 }; 159 // start_index is just one of potentially many items with the same time. 160 // Search backwards for a match, ... 161 for i in (0..=start_index).rev() { 162 if self.items[bucket][i].time != time { 163 break; 164 } 165 if selector(&self.items[bucket][i].item) { 166 return Some(self.items[bucket].remove(i).item); 167 } 168 } 169 // ... then forwards. 170 for i in (start_index + 1)..self.items[bucket].len() { 171 if self.items[bucket][i].time != time { 172 break; 173 } 174 if selector(&self.items[bucket][i].item) { 175 return Some(self.items[bucket].remove(i).item); 176 } 177 } 178 None 179 } 180 181 /// Take the next item, unless there are no items with 182 /// a timeout in the past relative to `until`. take_next(&mut self, until: Instant) -> Option<T>183 pub fn take_next(&mut self, until: Instant) -> Option<T> { 184 for i in 0..self.items.len() { 185 let idx = self.bucket(i); 186 if !self.items[idx].is_empty() && self.items[idx][0].time <= until { 187 return Some(self.items[idx].remove(0).item); 188 } 189 } 190 None 191 } 192 193 /// Create an iterator that takes all items until the given time. 194 /// Note: Items might be removed even if the iterator is not fully exhausted. take_until(&mut self, until: Instant) -> impl Iterator<Item = T>195 pub fn take_until(&mut self, until: Instant) -> impl Iterator<Item = T> { 196 let get_item = move |x: TimerItem<T>| x.item; 197 if until >= self.now + self.span() { 198 // Drain everything, so a clean sweep. 199 let mut empty_items = Vec::with_capacity(self.items.len()); 200 empty_items.resize_with(self.items.len(), Vec::default); 201 let mut items = mem::replace(&mut self.items, empty_items); 202 self.now = until; 203 self.cursor = 0; 204 205 let tail = items.split_off(self.cursor); 206 return tail.into_iter().chain(items).flatten().map(get_item); 207 } 208 209 // Only returning a partial span, so do it bucket at a time. 210 let delta = self.delta(until); 211 let mut buckets = Vec::with_capacity(delta + 1); 212 213 // First, the whole buckets. 214 for i in 0..delta { 215 let idx = self.bucket(i); 216 buckets.push(mem::take(&mut self.items[idx])); 217 } 218 self.tick(delta); 219 220 // Now we need to split the last bucket, because there might be 221 // some items with `item.time > until`. 222 let bucket = &mut self.items[self.cursor]; 223 let last_idx = match bucket.binary_search_by_key(&until, TimerItem::time) { 224 Ok(mut m) => { 225 // If there are multiple values, the search will hit any of them. 226 // Make sure to get them all. 227 while m < bucket.len() && bucket[m].time == until { 228 m += 1; 229 } 230 m 231 } 232 Err(ins) => ins, 233 }; 234 let tail = bucket.split_off(last_idx); 235 buckets.push(mem::replace(bucket, tail)); 236 // This tomfoolery with the empty vector ensures that 237 // the returned type here matches the one above precisely 238 // without having to invoke the `either` crate. 239 buckets.into_iter().chain(vec![]).flatten().map(get_item) 240 } 241 } 242 243 #[cfg(test)] 244 mod test { 245 use super::{Duration, Instant, Timer}; 246 use lazy_static::lazy_static; 247 248 lazy_static! { 249 static ref NOW: Instant = Instant::now(); 250 } 251 252 const GRANULARITY: Duration = Duration::from_millis(10); 253 const CAPACITY: usize = 10; 254 #[test] create()255 fn create() { 256 let t: Timer<()> = Timer::new(*NOW, GRANULARITY, CAPACITY); 257 assert_eq!(t.span(), Duration::from_millis(100)); 258 assert_eq!(None, t.next_time()); 259 } 260 261 #[test] immediate_entry()262 fn immediate_entry() { 263 let mut t = Timer::new(*NOW, GRANULARITY, CAPACITY); 264 t.add(*NOW, 12); 265 assert_eq!(*NOW, t.next_time().expect("should have an entry")); 266 let values: Vec<_> = t.take_until(*NOW).collect(); 267 assert_eq!(vec![12], values); 268 } 269 270 #[test] same_time()271 fn same_time() { 272 let mut t = Timer::new(*NOW, GRANULARITY, CAPACITY); 273 let v1 = 12; 274 let v2 = 13; 275 t.add(*NOW, v1); 276 t.add(*NOW, v2); 277 assert_eq!(*NOW, t.next_time().expect("should have an entry")); 278 let values: Vec<_> = t.take_until(*NOW).collect(); 279 assert!(values.contains(&v1)); 280 assert!(values.contains(&v2)); 281 } 282 283 #[test] add()284 fn add() { 285 let mut t = Timer::new(*NOW, GRANULARITY, CAPACITY); 286 let near_future = *NOW + Duration::from_millis(17); 287 let v = 9; 288 t.add(near_future, v); 289 assert_eq!(near_future, t.next_time().expect("should return a value")); 290 let values: Vec<_> = t 291 .take_until(near_future - Duration::from_millis(1)) 292 .collect(); 293 assert!(values.is_empty()); 294 let values: Vec<_> = t 295 .take_until(near_future + Duration::from_millis(1)) 296 .collect(); 297 assert!(values.contains(&v)); 298 } 299 300 #[test] add_future()301 fn add_future() { 302 let mut t = Timer::new(*NOW, GRANULARITY, CAPACITY); 303 let future = *NOW + Duration::from_millis(117); 304 let v = 9; 305 t.add(future, v); 306 assert_eq!(future, t.next_time().expect("should return a value")); 307 let values: Vec<_> = t.take_until(future).collect(); 308 assert!(values.contains(&v)); 309 } 310 311 #[test] add_far_future()312 fn add_far_future() { 313 let mut t = Timer::new(*NOW, GRANULARITY, CAPACITY); 314 let far_future = *NOW + Duration::from_millis(892); 315 let v = 9; 316 t.add(far_future, v); 317 assert_eq!(far_future, t.next_time().expect("should return a value")); 318 let values: Vec<_> = t.take_until(far_future).collect(); 319 assert!(values.contains(&v)); 320 } 321 322 const TIMES: &[Duration] = &[ 323 Duration::from_millis(40), 324 Duration::from_millis(91), 325 Duration::from_millis(6), 326 Duration::from_millis(3), 327 Duration::from_millis(22), 328 Duration::from_millis(40), 329 ]; 330 with_times() -> Timer<usize>331 fn with_times() -> Timer<usize> { 332 let mut t = Timer::new(*NOW, GRANULARITY, CAPACITY); 333 for (i, time) in TIMES.iter().enumerate() { 334 t.add(*NOW + *time, i); 335 } 336 assert_eq!( 337 *NOW + *TIMES.iter().min().unwrap(), 338 t.next_time().expect("should have a time") 339 ); 340 t 341 } 342 343 #[test] multiple_values()344 fn multiple_values() { 345 let mut t = with_times(); 346 let values: Vec<_> = t.take_until(*NOW + *TIMES.iter().max().unwrap()).collect(); 347 for i in 0..TIMES.len() { 348 assert!(values.contains(&i)); 349 } 350 } 351 352 #[test] take_far_future()353 fn take_far_future() { 354 let mut t = with_times(); 355 let values: Vec<_> = t.take_until(*NOW + Duration::from_secs(100)).collect(); 356 for i in 0..TIMES.len() { 357 assert!(values.contains(&i)); 358 } 359 } 360 361 #[test] remove_each()362 fn remove_each() { 363 let mut t = with_times(); 364 for (i, time) in TIMES.iter().enumerate() { 365 assert_eq!(Some(i), t.remove(*NOW + *time, |&x| x == i)); 366 } 367 assert_eq!(None, t.next_time()); 368 } 369 370 #[test] remove_future()371 fn remove_future() { 372 let mut t = Timer::new(*NOW, GRANULARITY, CAPACITY); 373 let future = *NOW + Duration::from_millis(117); 374 let v = 9; 375 t.add(future, v); 376 377 assert_eq!(Some(v), t.remove(future, |candidate| *candidate == v)); 378 } 379 380 #[test] remove_too_far_future()381 fn remove_too_far_future() { 382 let mut t = Timer::new(*NOW, GRANULARITY, CAPACITY); 383 let future = *NOW + Duration::from_millis(117); 384 let too_far_future = *NOW + t.span() + Duration::from_millis(117); 385 let v = 9; 386 t.add(future, v); 387 388 assert_eq!(None, t.remove(too_far_future, |candidate| *candidate == v)); 389 } 390 } 391