1 //! Time driver 2 3 mod atomic_stack; 4 use self::atomic_stack::AtomicStack; 5 6 mod entry; 7 pub(super) use self::entry::Entry; 8 9 mod handle; 10 pub(crate) use self::handle::Handle; 11 12 mod registration; 13 pub(crate) use self::registration::Registration; 14 15 mod stack; 16 use self::stack::Stack; 17 18 use crate::loom::sync::atomic::{AtomicU64, AtomicUsize}; 19 use crate::park::{Park, Unpark}; 20 use crate::time::{wheel, Error}; 21 use crate::time::{Clock, Duration, Instant}; 22 23 use std::sync::atomic::Ordering::{Acquire, Relaxed, Release, SeqCst}; 24 25 use std::sync::Arc; 26 use std::usize; 27 use std::{cmp, fmt}; 28 29 /// Time implementation that drives [`Delay`][delay], [`Interval`][interval], and [`Timeout`][timeout]. 30 /// 31 /// A `Driver` instance tracks the state necessary for managing time and 32 /// notifying the [`Delay`][delay] instances once their deadlines are reached. 33 /// 34 /// It is expected that a single instance manages many individual [`Delay`][delay] 35 /// instances. The `Driver` implementation is thread-safe and, as such, is able 36 /// to handle callers from across threads. 37 /// 38 /// After creating the `Driver` instance, the caller must repeatedly call `park` 39 /// or `park_timeout`. The time driver will perform no work unless `park` or 40 /// `park_timeout` is called repeatedly. 41 /// 42 /// The driver has a resolution of one millisecond. Any unit of time that falls 43 /// between milliseconds are rounded up to the next millisecond. 44 /// 45 /// When an instance is dropped, any outstanding [`Delay`][delay] instance that has not 46 /// elapsed will be notified with an error. At this point, calling `poll` on the 47 /// [`Delay`][delay] instance will result in panic. 48 /// 49 /// # Implementation 50 /// 51 /// The time driver is based on the [paper by Varghese and Lauck][paper]. 52 /// 53 /// A hashed timing wheel is a vector of slots, where each slot handles a time 54 /// slice. As time progresses, the timer walks over the slot for the current 55 /// instant, and processes each entry for that slot. When the timer reaches the 56 /// end of the wheel, it starts again at the beginning. 57 /// 58 /// The implementation maintains six wheels arranged in a set of levels. As the 59 /// levels go up, the slots of the associated wheel represent larger intervals 60 /// of time. At each level, the wheel has 64 slots. Each slot covers a range of 61 /// time equal to the wheel at the lower level. At level zero, each slot 62 /// represents one millisecond of time. 63 /// 64 /// The wheels are: 65 /// 66 /// * Level 0: 64 x 1 millisecond slots. 67 /// * Level 1: 64 x 64 millisecond slots. 68 /// * Level 2: 64 x ~4 second slots. 69 /// * Level 3: 64 x ~4 minute slots. 70 /// * Level 4: 64 x ~4 hour slots. 71 /// * Level 5: 64 x ~12 day slots. 72 /// 73 /// When the timer processes entries at level zero, it will notify all the 74 /// `Delay` instances as their deadlines have been reached. For all higher 75 /// levels, all entries will be redistributed across the wheel at the next level 76 /// down. Eventually, as time progresses, entries will [`Delay`][delay] instances will 77 /// either be canceled (dropped) or their associated entries will reach level 78 /// zero and be notified. 79 /// 80 /// [paper]: http://www.cs.columbia.edu/~nahum/w6998/papers/ton97-timing-wheels.pdf 81 /// [delay]: crate::time::Delay 82 /// [timeout]: crate::time::Timeout 83 /// [interval]: crate::time::Interval 84 #[derive(Debug)] 85 pub(crate) struct Driver<T: Park> { 86 /// Shared state 87 inner: Arc<Inner>, 88 89 /// Timer wheel 90 wheel: wheel::Wheel<Stack>, 91 92 /// Thread parker. The `Driver` park implementation delegates to this. 93 park: T, 94 95 /// Source of "now" instances 96 clock: Clock, 97 98 /// True if the driver is being shutdown 99 is_shutdown: bool, 100 } 101 102 /// Timer state shared between `Driver`, `Handle`, and `Registration`. 103 pub(crate) struct Inner { 104 /// The instant at which the timer started running. 105 start: Instant, 106 107 /// The last published timer `elapsed` value. 108 elapsed: AtomicU64, 109 110 /// Number of active timeouts 111 num: AtomicUsize, 112 113 /// Head of the "process" linked list. 114 process: AtomicStack, 115 116 /// Unparks the timer thread. 117 unpark: Box<dyn Unpark>, 118 } 119 120 /// Maximum number of timeouts the system can handle concurrently. 121 const MAX_TIMEOUTS: usize = usize::MAX >> 1; 122 123 // ===== impl Driver ===== 124 125 impl<T> Driver<T> 126 where 127 T: Park, 128 { 129 /// Creates a new `Driver` instance that uses `park` to block the current 130 /// thread and `clock` to get the current `Instant`. 131 /// 132 /// Specifying the source of time is useful when testing. new(park: T, clock: Clock) -> Driver<T>133 pub(crate) fn new(park: T, clock: Clock) -> Driver<T> { 134 let unpark = Box::new(park.unpark()); 135 136 Driver { 137 inner: Arc::new(Inner::new(clock.now(), unpark)), 138 wheel: wheel::Wheel::new(), 139 park, 140 clock, 141 is_shutdown: false, 142 } 143 } 144 145 /// Returns a handle to the timer. 146 /// 147 /// The `Handle` is how `Delay` instances are created. The `Delay` instances 148 /// can either be created directly or the `Handle` instance can be passed to 149 /// `with_default`, setting the timer as the default timer for the execution 150 /// context. handle(&self) -> Handle151 pub(crate) fn handle(&self) -> Handle { 152 Handle::new(Arc::downgrade(&self.inner)) 153 } 154 155 /// Converts an `Expiration` to an `Instant`. expiration_instant(&self, when: u64) -> Instant156 fn expiration_instant(&self, when: u64) -> Instant { 157 self.inner.start + Duration::from_millis(when) 158 } 159 160 /// Runs timer related logic process(&mut self)161 fn process(&mut self) { 162 let now = crate::time::ms( 163 self.clock.now() - self.inner.start, 164 crate::time::Round::Down, 165 ); 166 let mut poll = wheel::Poll::new(now); 167 168 while let Some(entry) = self.wheel.poll(&mut poll, &mut ()) { 169 let when = entry.when_internal().expect("invalid internal entry state"); 170 171 // Fire the entry 172 entry.fire(when); 173 174 // Track that the entry has been fired 175 entry.set_when_internal(None); 176 } 177 178 // Update the elapsed cache 179 self.inner.elapsed.store(self.wheel.elapsed(), SeqCst); 180 } 181 182 /// Processes the entry queue 183 /// 184 /// This handles adding and canceling timeouts. process_queue(&mut self)185 fn process_queue(&mut self) { 186 for entry in self.inner.process.take() { 187 match (entry.when_internal(), entry.load_state()) { 188 (None, None) => { 189 // Nothing to do 190 } 191 (Some(_), None) => { 192 // Remove the entry 193 self.clear_entry(&entry); 194 } 195 (None, Some(when)) => { 196 // Queue the entry 197 self.add_entry(entry, when); 198 } 199 (Some(_), Some(next)) => { 200 self.clear_entry(&entry); 201 self.add_entry(entry, next); 202 } 203 } 204 } 205 } 206 clear_entry(&mut self, entry: &Arc<Entry>)207 fn clear_entry(&mut self, entry: &Arc<Entry>) { 208 self.wheel.remove(entry, &mut ()); 209 entry.set_when_internal(None); 210 } 211 212 /// Fires the entry if it needs to, otherwise queue it to be processed later. 213 /// 214 /// Returns `None` if the entry was fired. add_entry(&mut self, entry: Arc<Entry>, when: u64)215 fn add_entry(&mut self, entry: Arc<Entry>, when: u64) { 216 use crate::time::wheel::InsertError; 217 218 entry.set_when_internal(Some(when)); 219 220 match self.wheel.insert(when, entry, &mut ()) { 221 Ok(_) => {} 222 Err((entry, InsertError::Elapsed)) => { 223 // The entry's deadline has elapsed, so fire it and update the 224 // internal state accordingly. 225 entry.set_when_internal(None); 226 entry.fire(when); 227 } 228 Err((entry, InsertError::Invalid)) => { 229 // The entry's deadline is invalid, so error it and update the 230 // internal state accordingly. 231 entry.set_when_internal(None); 232 entry.error(Error::invalid()); 233 } 234 } 235 } 236 } 237 238 impl<T> Park for Driver<T> 239 where 240 T: Park, 241 { 242 type Unpark = T::Unpark; 243 type Error = T::Error; 244 unpark(&self) -> Self::Unpark245 fn unpark(&self) -> Self::Unpark { 246 self.park.unpark() 247 } 248 park(&mut self) -> Result<(), Self::Error>249 fn park(&mut self) -> Result<(), Self::Error> { 250 self.process_queue(); 251 252 match self.wheel.poll_at() { 253 Some(when) => { 254 let now = self.clock.now(); 255 let deadline = self.expiration_instant(when); 256 257 if deadline > now { 258 let dur = deadline - now; 259 260 if self.clock.is_paused() { 261 self.park.park_timeout(Duration::from_secs(0))?; 262 self.clock.advance(dur); 263 } else { 264 self.park.park_timeout(dur)?; 265 } 266 } else { 267 self.park.park_timeout(Duration::from_secs(0))?; 268 } 269 } 270 None => { 271 self.park.park()?; 272 } 273 } 274 275 self.process(); 276 277 Ok(()) 278 } 279 park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error>280 fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { 281 self.process_queue(); 282 283 match self.wheel.poll_at() { 284 Some(when) => { 285 let now = self.clock.now(); 286 let deadline = self.expiration_instant(when); 287 288 if deadline > now { 289 let duration = cmp::min(deadline - now, duration); 290 291 if self.clock.is_paused() { 292 self.park.park_timeout(Duration::from_secs(0))?; 293 self.clock.advance(duration); 294 } else { 295 self.park.park_timeout(duration)?; 296 } 297 } else { 298 self.park.park_timeout(Duration::from_secs(0))?; 299 } 300 } 301 None => { 302 self.park.park_timeout(duration)?; 303 } 304 } 305 306 self.process(); 307 308 Ok(()) 309 } 310 shutdown(&mut self)311 fn shutdown(&mut self) { 312 if self.is_shutdown { 313 return; 314 } 315 316 use std::u64; 317 318 // Shutdown the stack of entries to process, preventing any new entries 319 // from being pushed. 320 self.inner.process.shutdown(); 321 322 // Clear the wheel, using u64::MAX allows us to drain everything 323 let mut poll = wheel::Poll::new(u64::MAX); 324 325 while let Some(entry) = self.wheel.poll(&mut poll, &mut ()) { 326 entry.error(Error::shutdown()); 327 } 328 329 self.park.shutdown(); 330 331 self.is_shutdown = true; 332 } 333 } 334 335 impl<T> Drop for Driver<T> 336 where 337 T: Park, 338 { drop(&mut self)339 fn drop(&mut self) { 340 self.shutdown(); 341 } 342 } 343 344 // ===== impl Inner ===== 345 346 impl Inner { new(start: Instant, unpark: Box<dyn Unpark>) -> Inner347 fn new(start: Instant, unpark: Box<dyn Unpark>) -> Inner { 348 Inner { 349 num: AtomicUsize::new(0), 350 elapsed: AtomicU64::new(0), 351 process: AtomicStack::new(), 352 start, 353 unpark, 354 } 355 } 356 elapsed(&self) -> u64357 fn elapsed(&self) -> u64 { 358 self.elapsed.load(SeqCst) 359 } 360 361 #[cfg(all(test, loom))] num(&self, ordering: std::sync::atomic::Ordering) -> usize362 fn num(&self, ordering: std::sync::atomic::Ordering) -> usize { 363 self.num.load(ordering) 364 } 365 366 /// Increments the number of active timeouts increment(&self) -> Result<(), Error>367 fn increment(&self) -> Result<(), Error> { 368 let mut curr = self.num.load(Relaxed); 369 loop { 370 if curr == MAX_TIMEOUTS { 371 return Err(Error::at_capacity()); 372 } 373 374 match self 375 .num 376 .compare_exchange_weak(curr, curr + 1, Release, Relaxed) 377 { 378 Ok(_) => return Ok(()), 379 Err(next) => curr = next, 380 } 381 } 382 } 383 384 /// Decrements the number of active timeouts decrement(&self)385 fn decrement(&self) { 386 let prev = self.num.fetch_sub(1, Acquire); 387 debug_assert!(prev <= MAX_TIMEOUTS); 388 } 389 queue(&self, entry: &Arc<Entry>) -> Result<(), Error>390 fn queue(&self, entry: &Arc<Entry>) -> Result<(), Error> { 391 if self.process.push(entry)? { 392 // The timer is notified so that it can process the timeout 393 self.unpark.unpark(); 394 } 395 396 Ok(()) 397 } 398 normalize_deadline(&self, deadline: Instant) -> u64399 fn normalize_deadline(&self, deadline: Instant) -> u64 { 400 if deadline < self.start { 401 return 0; 402 } 403 404 crate::time::ms(deadline - self.start, crate::time::Round::Up) 405 } 406 } 407 408 impl fmt::Debug for Inner { fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result409 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { 410 fmt.debug_struct("Inner").finish() 411 } 412 } 413 414 #[cfg(all(test, loom))] 415 mod tests; 416