1 // Currently, rust warns when an unsafe fn contains an unsafe {} block. However, 2 // in the future, this will change to the reverse. For now, suppress this 3 // warning and generally stick with being explicit about unsafety. 4 #![allow(unused_unsafe)] 5 #![cfg_attr(not(feature = "rt"), allow(dead_code))] 6 7 //! Time driver 8 9 mod entry; 10 pub(self) use self::entry::{EntryList, TimerEntry, TimerHandle, TimerShared}; 11 12 mod handle; 13 pub(crate) use self::handle::Handle; 14 15 mod wheel; 16 17 pub(super) mod sleep; 18 19 use crate::loom::sync::atomic::{AtomicBool, Ordering}; 20 use crate::loom::sync::{Arc, Mutex}; 21 use crate::park::{Park, Unpark}; 22 use crate::time::error::Error; 23 use crate::time::{Clock, Duration, Instant}; 24 25 use std::convert::TryInto; 26 use std::fmt; 27 use std::{num::NonZeroU64, ptr::NonNull, task::Waker}; 28 29 /// Time implementation that drives [`Sleep`][sleep], [`Interval`][interval], and [`Timeout`][timeout]. 30 /// 31 /// A `Driver` instance tracks the state necessary for managing time and 32 /// notifying the [`Sleep`][sleep] instances once their deadlines are reached. 33 /// 34 /// It is expected that a single instance manages many individual [`Sleep`][sleep] 35 /// instances. The `Driver` implementation is thread-safe and, as such, is able 36 /// to handle callers from across threads. 37 /// 38 /// After creating the `Driver` instance, the caller must repeatedly call `park` 39 /// or `park_timeout`. The time driver will perform no work unless `park` or 40 /// `park_timeout` is called repeatedly. 41 /// 42 /// The driver has a resolution of one millisecond. Any unit of time that falls 43 /// between milliseconds are rounded up to the next millisecond. 44 /// 45 /// When an instance is dropped, any outstanding [`Sleep`][sleep] instance that has not 46 /// elapsed will be notified with an error. At this point, calling `poll` on the 47 /// [`Sleep`][sleep] instance will result in panic. 48 /// 49 /// # Implementation 50 /// 51 /// The time driver is based on the [paper by Varghese and Lauck][paper]. 52 /// 53 /// A hashed timing wheel is a vector of slots, where each slot handles a time 54 /// slice. As time progresses, the timer walks over the slot for the current 55 /// instant, and processes each entry for that slot. When the timer reaches the 56 /// end of the wheel, it starts again at the beginning. 57 /// 58 /// The implementation maintains six wheels arranged in a set of levels. As the 59 /// levels go up, the slots of the associated wheel represent larger intervals 60 /// of time. At each level, the wheel has 64 slots. Each slot covers a range of 61 /// time equal to the wheel at the lower level. At level zero, each slot 62 /// represents one millisecond of time. 63 /// 64 /// The wheels are: 65 /// 66 /// * Level 0: 64 x 1 millisecond slots. 67 /// * Level 1: 64 x 64 millisecond slots. 68 /// * Level 2: 64 x ~4 second slots. 69 /// * Level 3: 64 x ~4 minute slots. 70 /// * Level 4: 64 x ~4 hour slots. 71 /// * Level 5: 64 x ~12 day slots. 72 /// 73 /// When the timer processes entries at level zero, it will notify all the 74 /// `Sleep` instances as their deadlines have been reached. For all higher 75 /// levels, all entries will be redistributed across the wheel at the next level 76 /// down. Eventually, as time progresses, entries with [`Sleep`][sleep] instances will 77 /// either be canceled (dropped) or their associated entries will reach level 78 /// zero and be notified. 79 /// 80 /// [paper]: http://www.cs.columbia.edu/~nahum/w6998/papers/ton97-timing-wheels.pdf 81 /// [sleep]: crate::time::Sleep 82 /// [timeout]: crate::time::Timeout 83 /// [interval]: crate::time::Interval 84 #[derive(Debug)] 85 pub(crate) struct Driver<P: Park + 'static> { 86 /// Timing backend in use 87 time_source: ClockTime, 88 89 /// Shared state 90 handle: Handle, 91 92 /// Parker to delegate to 93 park: P, 94 95 // When `true`, a call to `park_timeout` should immediately return and time 96 // should not advance. One reason for this to be `true` is if the task 97 // passed to `Runtime::block_on` called `task::yield_now()`. 98 // 99 // While it may look racy, it only has any effect when the clock is paused 100 // and pausing the clock is restricted to a single-threaded runtime. 101 #[cfg(feature = "test-util")] 102 did_wake: Arc<AtomicBool>, 103 } 104 105 /// A structure which handles conversion from Instants to u64 timestamps. 106 #[derive(Debug, Clone)] 107 pub(self) struct ClockTime { 108 clock: super::clock::Clock, 109 start_time: Instant, 110 } 111 112 impl ClockTime { new(clock: Clock) -> Self113 pub(self) fn new(clock: Clock) -> Self { 114 Self { 115 start_time: clock.now(), 116 clock, 117 } 118 } 119 deadline_to_tick(&self, t: Instant) -> u64120 pub(self) fn deadline_to_tick(&self, t: Instant) -> u64 { 121 // Round up to the end of a ms 122 self.instant_to_tick(t + Duration::from_nanos(999_999)) 123 } 124 instant_to_tick(&self, t: Instant) -> u64125 pub(self) fn instant_to_tick(&self, t: Instant) -> u64 { 126 // round up 127 let dur: Duration = t 128 .checked_duration_since(self.start_time) 129 .unwrap_or_else(|| Duration::from_secs(0)); 130 let ms = dur.as_millis(); 131 132 ms.try_into().expect("Duration too far into the future") 133 } 134 tick_to_duration(&self, t: u64) -> Duration135 pub(self) fn tick_to_duration(&self, t: u64) -> Duration { 136 Duration::from_millis(t) 137 } 138 now(&self) -> u64139 pub(self) fn now(&self) -> u64 { 140 self.instant_to_tick(self.clock.now()) 141 } 142 } 143 144 /// Timer state shared between `Driver`, `Handle`, and `Registration`. 145 struct Inner { 146 // The state is split like this so `Handle` can access `is_shutdown` without locking the mutex 147 pub(super) state: Mutex<InnerState>, 148 149 /// True if the driver is being shutdown 150 pub(super) is_shutdown: AtomicBool, 151 } 152 153 /// Time state shared which must be protected by a `Mutex` 154 struct InnerState { 155 /// Timing backend in use 156 time_source: ClockTime, 157 158 /// The last published timer `elapsed` value. 159 elapsed: u64, 160 161 /// The earliest time at which we promise to wake up without unparking 162 next_wake: Option<NonZeroU64>, 163 164 /// Timer wheel 165 wheel: wheel::Wheel, 166 167 /// Unparker that can be used to wake the time driver 168 unpark: Box<dyn Unpark>, 169 } 170 171 // ===== impl Driver ===== 172 173 impl<P> Driver<P> 174 where 175 P: Park + 'static, 176 { 177 /// Creates a new `Driver` instance that uses `park` to block the current 178 /// thread and `time_source` to get the current time and convert to ticks. 179 /// 180 /// Specifying the source of time is useful when testing. new(park: P, clock: Clock) -> Driver<P>181 pub(crate) fn new(park: P, clock: Clock) -> Driver<P> { 182 let time_source = ClockTime::new(clock); 183 184 let inner = Inner::new(time_source.clone(), Box::new(park.unpark())); 185 186 Driver { 187 time_source, 188 handle: Handle::new(Arc::new(inner)), 189 park, 190 #[cfg(feature = "test-util")] 191 did_wake: Arc::new(AtomicBool::new(false)), 192 } 193 } 194 195 /// Returns a handle to the timer. 196 /// 197 /// The `Handle` is how `Sleep` instances are created. The `Sleep` instances 198 /// can either be created directly or the `Handle` instance can be passed to 199 /// `with_default`, setting the timer as the default timer for the execution 200 /// context. handle(&self) -> Handle201 pub(crate) fn handle(&self) -> Handle { 202 self.handle.clone() 203 } 204 park_internal(&mut self, limit: Option<Duration>) -> Result<(), P::Error>205 fn park_internal(&mut self, limit: Option<Duration>) -> Result<(), P::Error> { 206 let mut lock = self.handle.get().state.lock(); 207 208 assert!(!self.handle.is_shutdown()); 209 210 let next_wake = lock.wheel.next_expiration_time(); 211 lock.next_wake = 212 next_wake.map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap())); 213 214 drop(lock); 215 216 match next_wake { 217 Some(when) => { 218 let now = self.time_source.now(); 219 // Note that we effectively round up to 1ms here - this avoids 220 // very short-duration microsecond-resolution sleeps that the OS 221 // might treat as zero-length. 222 let mut duration = self.time_source.tick_to_duration(when.saturating_sub(now)); 223 224 if duration > Duration::from_millis(0) { 225 if let Some(limit) = limit { 226 duration = std::cmp::min(limit, duration); 227 } 228 229 self.park_timeout(duration)?; 230 } else { 231 self.park.park_timeout(Duration::from_secs(0))?; 232 } 233 } 234 None => { 235 if let Some(duration) = limit { 236 self.park_timeout(duration)?; 237 } else { 238 self.park.park()?; 239 } 240 } 241 } 242 243 // Process pending timers after waking up 244 self.handle.process(); 245 246 Ok(()) 247 } 248 249 cfg_test_util! { 250 fn park_timeout(&mut self, duration: Duration) -> Result<(), P::Error> { 251 let clock = &self.time_source.clock; 252 253 if clock.is_paused() { 254 self.park.park_timeout(Duration::from_secs(0))?; 255 256 // If the time driver was woken, then the park completed 257 // before the "duration" elapsed (usually caused by a 258 // yield in `Runtime::block_on`). In this case, we don't 259 // advance the clock. 260 if !self.did_wake() { 261 // Simulate advancing time 262 clock.advance(duration); 263 } 264 } else { 265 self.park.park_timeout(duration)?; 266 } 267 268 Ok(()) 269 } 270 271 fn did_wake(&self) -> bool { 272 self.did_wake.swap(false, Ordering::SeqCst) 273 } 274 } 275 276 cfg_not_test_util! { 277 fn park_timeout(&mut self, duration: Duration) -> Result<(), P::Error> { 278 self.park.park_timeout(duration) 279 } 280 } 281 } 282 283 impl Handle { 284 /// Runs timer related logic, and returns the next wakeup time process(&self)285 pub(self) fn process(&self) { 286 let now = self.time_source().now(); 287 288 self.process_at_time(now) 289 } 290 process_at_time(&self, mut now: u64)291 pub(self) fn process_at_time(&self, mut now: u64) { 292 let mut waker_list: [Option<Waker>; 32] = Default::default(); 293 let mut waker_idx = 0; 294 295 let mut lock = self.get().lock(); 296 297 if now < lock.elapsed { 298 // Time went backwards! This normally shouldn't happen as the Rust language 299 // guarantees that an Instant is monotonic, but can happen when running 300 // Linux in a VM on a Windows host due to std incorrectly trusting the 301 // hardware clock to be monotonic. 302 // 303 // See <https://github.com/tokio-rs/tokio/issues/3619> for more information. 304 now = lock.elapsed; 305 } 306 307 while let Some(entry) = lock.wheel.poll(now) { 308 debug_assert!(unsafe { entry.is_pending() }); 309 310 // SAFETY: We hold the driver lock, and just removed the entry from any linked lists. 311 if let Some(waker) = unsafe { entry.fire(Ok(())) } { 312 waker_list[waker_idx] = Some(waker); 313 314 waker_idx += 1; 315 316 if waker_idx == waker_list.len() { 317 // Wake a batch of wakers. To avoid deadlock, we must do this with the lock temporarily dropped. 318 drop(lock); 319 320 for waker in waker_list.iter_mut() { 321 waker.take().unwrap().wake(); 322 } 323 324 waker_idx = 0; 325 326 lock = self.get().lock(); 327 } 328 } 329 } 330 331 // Update the elapsed cache 332 lock.elapsed = lock.wheel.elapsed(); 333 lock.next_wake = lock 334 .wheel 335 .poll_at() 336 .map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap())); 337 338 drop(lock); 339 340 for waker in waker_list[0..waker_idx].iter_mut() { 341 waker.take().unwrap().wake(); 342 } 343 } 344 345 /// Removes a registered timer from the driver. 346 /// 347 /// The timer will be moved to the cancelled state. Wakers will _not_ be 348 /// invoked. If the timer is already completed, this function is a no-op. 349 /// 350 /// This function always acquires the driver lock, even if the entry does 351 /// not appear to be registered. 352 /// 353 /// SAFETY: The timer must not be registered with some other driver, and 354 /// `add_entry` must not be called concurrently. clear_entry(&self, entry: NonNull<TimerShared>)355 pub(self) unsafe fn clear_entry(&self, entry: NonNull<TimerShared>) { 356 unsafe { 357 let mut lock = self.get().lock(); 358 359 if entry.as_ref().might_be_registered() { 360 lock.wheel.remove(entry); 361 } 362 363 entry.as_ref().handle().fire(Ok(())); 364 } 365 } 366 367 /// Removes and re-adds an entry to the driver. 368 /// 369 /// SAFETY: The timer must be either unregistered, or registered with this 370 /// driver. No other threads are allowed to concurrently manipulate the 371 /// timer at all (the current thread should hold an exclusive reference to 372 /// the `TimerEntry`) reregister(&self, new_tick: u64, entry: NonNull<TimerShared>)373 pub(self) unsafe fn reregister(&self, new_tick: u64, entry: NonNull<TimerShared>) { 374 let waker = unsafe { 375 let mut lock = self.get().lock(); 376 377 // We may have raced with a firing/deregistration, so check before 378 // deregistering. 379 if unsafe { entry.as_ref().might_be_registered() } { 380 lock.wheel.remove(entry); 381 } 382 383 // Now that we have exclusive control of this entry, mint a handle to reinsert it. 384 let entry = entry.as_ref().handle(); 385 386 if self.is_shutdown() { 387 unsafe { entry.fire(Err(crate::time::error::Error::shutdown())) } 388 } else { 389 entry.set_expiration(new_tick); 390 391 // Note: We don't have to worry about racing with some other resetting 392 // thread, because add_entry and reregister require exclusive control of 393 // the timer entry. 394 match unsafe { lock.wheel.insert(entry) } { 395 Ok(when) => { 396 if lock 397 .next_wake 398 .map(|next_wake| when < next_wake.get()) 399 .unwrap_or(true) 400 { 401 lock.unpark.unpark(); 402 } 403 404 None 405 } 406 Err((entry, super::error::InsertError::Elapsed)) => unsafe { 407 entry.fire(Ok(())) 408 }, 409 } 410 } 411 412 // Must release lock before invoking waker to avoid the risk of deadlock. 413 }; 414 415 // The timer was fired synchronously as a result of the reregistration. 416 // Wake the waker; this is needed because we might reset _after_ a poll, 417 // and otherwise the task won't be awoken to poll again. 418 if let Some(waker) = waker { 419 waker.wake(); 420 } 421 } 422 } 423 424 impl<P> Park for Driver<P> 425 where 426 P: Park + 'static, 427 { 428 type Unpark = TimerUnpark<P>; 429 type Error = P::Error; 430 unpark(&self) -> Self::Unpark431 fn unpark(&self) -> Self::Unpark { 432 TimerUnpark::new(self) 433 } 434 park(&mut self) -> Result<(), Self::Error>435 fn park(&mut self) -> Result<(), Self::Error> { 436 self.park_internal(None) 437 } 438 park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error>439 fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { 440 self.park_internal(Some(duration)) 441 } 442 shutdown(&mut self)443 fn shutdown(&mut self) { 444 if self.handle.is_shutdown() { 445 return; 446 } 447 448 self.handle.get().is_shutdown.store(true, Ordering::SeqCst); 449 450 // Advance time forward to the end of time. 451 452 self.handle.process_at_time(u64::MAX); 453 454 self.park.shutdown(); 455 } 456 } 457 458 impl<P> Drop for Driver<P> 459 where 460 P: Park + 'static, 461 { drop(&mut self)462 fn drop(&mut self) { 463 self.shutdown(); 464 } 465 } 466 467 pub(crate) struct TimerUnpark<P: Park + 'static> { 468 inner: P::Unpark, 469 470 #[cfg(feature = "test-util")] 471 did_wake: Arc<AtomicBool>, 472 } 473 474 impl<P: Park + 'static> TimerUnpark<P> { new(driver: &Driver<P>) -> TimerUnpark<P>475 fn new(driver: &Driver<P>) -> TimerUnpark<P> { 476 TimerUnpark { 477 inner: driver.park.unpark(), 478 479 #[cfg(feature = "test-util")] 480 did_wake: driver.did_wake.clone(), 481 } 482 } 483 } 484 485 impl<P: Park + 'static> Unpark for TimerUnpark<P> { unpark(&self)486 fn unpark(&self) { 487 #[cfg(feature = "test-util")] 488 self.did_wake.store(true, Ordering::SeqCst); 489 490 self.inner.unpark(); 491 } 492 } 493 494 // ===== impl Inner ===== 495 496 impl Inner { new(time_source: ClockTime, unpark: Box<dyn Unpark>) -> Self497 pub(self) fn new(time_source: ClockTime, unpark: Box<dyn Unpark>) -> Self { 498 Inner { 499 state: Mutex::new(InnerState { 500 time_source, 501 elapsed: 0, 502 next_wake: None, 503 unpark, 504 wheel: wheel::Wheel::new(), 505 }), 506 is_shutdown: AtomicBool::new(false), 507 } 508 } 509 510 /// Locks the driver's inner structure lock(&self) -> crate::loom::sync::MutexGuard<'_, InnerState>511 pub(super) fn lock(&self) -> crate::loom::sync::MutexGuard<'_, InnerState> { 512 self.state.lock() 513 } 514 515 // Check whether the driver has been shutdown is_shutdown(&self) -> bool516 pub(super) fn is_shutdown(&self) -> bool { 517 self.is_shutdown.load(Ordering::SeqCst) 518 } 519 } 520 521 impl fmt::Debug for Inner { fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result522 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { 523 fmt.debug_struct("Inner").finish() 524 } 525 } 526 527 #[cfg(test)] 528 mod tests; 529