1 //! Timer implementation. 2 //! 3 //! This module contains the types needed to run a timer. 4 //! 5 //! The [`Timer`] type runs the timer logic. It holds all the necessary state 6 //! to track all associated [`Delay`] instances and delivering notifications 7 //! once the deadlines are reached. 8 //! 9 //! The [`Handle`] type is a reference to a [`Timer`] instance. This type is 10 //! `Clone`, `Send`, and `Sync`. This type is used to create instances of 11 //! [`Delay`]. 12 //! 13 //! The [`Now`] trait describes how to get an [`Instant`] representing the 14 //! current moment in time. [`SystemNow`] is the default implementation, where 15 //! [`Now::now`] is implemented by calling [`Instant::now`]. 16 //! 17 //! [`Timer`] is generic over [`Now`]. This allows the source of time to be 18 //! customized. This ability is especially useful in tests and any environment 19 //! where determinism is necessary. 20 //! 21 //! Note, when using the Tokio runtime, the [`Timer`] does not need to be manually 22 //! setup as the runtime comes pre-configured with a [`Timer`] instance. 23 //! 24 //! [`Timer`]: struct.Timer.html 25 //! [`Handle`]: struct.Handle.html 26 //! [`Delay`]: ../struct.Delay.html 27 //! [`Now`]: ../clock/trait.Now.html 28 //! [`Now::now`]: ../clock/trait.Now.html#method.now 29 //! [`SystemNow`]: struct.SystemNow.html 30 //! [`Instant`]: https://doc.rust-lang.org/std/time/struct.Instant.html 31 //! [`Instant::now`]: https://doc.rust-lang.org/std/time/struct.Instant.html#method.now 32 33 // This allows the usage of the old `Now` trait. 34 #![allow(deprecated)] 35 36 mod atomic_stack; 37 mod entry; 38 mod handle; 39 mod now; 40 mod registration; 41 mod stack; 42 43 use self::atomic_stack::AtomicStack; 44 use self::entry::Entry; 45 use self::stack::Stack; 46 47 pub(crate) use self::handle::HandlePriv; 48 pub use self::handle::{set_default, with_default, DefaultGuard, Handle}; 49 pub use self::now::{Now, SystemNow}; 50 pub(crate) use self::registration::Registration; 51 52 use atomic::AtomicU64; 53 use wheel; 54 use Error; 55 56 use tokio_executor::park::{Park, ParkThread, Unpark}; 57 58 use std::sync::atomic::AtomicUsize; 59 use std::sync::atomic::Ordering::SeqCst; 60 use std::sync::Arc; 61 use std::time::{Duration, Instant}; 62 use std::usize; 63 use std::{cmp, fmt}; 64 65 /// Timer implementation that drives [`Delay`], [`Interval`], and [`Timeout`]. 66 /// 67 /// A `Timer` instance tracks the state necessary for managing time and 68 /// notifying the [`Delay`] instances once their deadlines are reached. 69 /// 70 /// It is expected that a single `Timer` instance manages many individual 71 /// [`Delay`] instances. The `Timer` implementation is thread-safe and, as such, 72 /// is able to handle callers from across threads. 73 /// 74 /// Callers do not use `Timer` directly to create [`Delay`] instances. Instead, 75 /// [`Handle`][Handle.struct] is used. A handle for the timer instance is obtained by calling 76 /// [`handle`]. [`Handle`][Handle.struct] is the type that implements `Clone` and is `Send + 77 /// Sync`. 78 /// 79 /// After creating the `Timer` instance, the caller must repeatedly call 80 /// [`turn`]. The timer will perform no work unless [`turn`] is called 81 /// repeatedly. 82 /// 83 /// The `Timer` has a resolution of one millisecond. Any unit of time that falls 84 /// between milliseconds are rounded up to the next millisecond. 85 /// 86 /// When the `Timer` instance is dropped, any outstanding [`Delay`] instance that 87 /// has not elapsed will be notified with an error. At this point, calling 88 /// `poll` on the [`Delay`] instance will result in `Err` being returned. 89 /// 90 /// # Implementation 91 /// 92 /// `Timer` is based on the [paper by Varghese and Lauck][paper]. 93 /// 94 /// A hashed timing wheel is a vector of slots, where each slot handles a time 95 /// slice. As time progresses, the timer walks over the slot for the current 96 /// instant, and processes each entry for that slot. When the timer reaches the 97 /// end of the wheel, it starts again at the beginning. 98 /// 99 /// The `Timer` implementation maintains six wheels arranged in a set of levels. 100 /// As the levels go up, the slots of the associated wheel represent larger 101 /// intervals of time. At each level, the wheel has 64 slots. Each slot covers a 102 /// range of time equal to the wheel at the lower level. At level zero, each 103 /// slot represents one millisecond of time. 104 /// 105 /// The wheels are: 106 /// 107 /// * Level 0: 64 x 1 millisecond slots. 108 /// * Level 1: 64 x 64 millisecond slots. 109 /// * Level 2: 64 x ~4 second slots. 110 /// * Level 3: 64 x ~4 minute slots. 111 /// * Level 4: 64 x ~4 hour slots. 112 /// * Level 5: 64 x ~12 day slots. 113 /// 114 /// When the timer processes entries at level zero, it will notify all the 115 /// [`Delay`] instances as their deadlines have been reached. For all higher 116 /// levels, all entries will be redistributed across the wheel at the next level 117 /// down. Eventually, as time progresses, entries will [`Delay`] instances will 118 /// either be canceled (dropped) or their associated entries will reach level 119 /// zero and be notified. 120 /// 121 /// [`Delay`]: ../struct.Delay.html 122 /// [`Interval`]: ../struct.Interval.html 123 /// [`Timeout`]: ../struct.Timeout.html 124 /// [paper]: http://www.cs.columbia.edu/~nahum/w6998/papers/ton97-timing-wheels.pdf 125 /// [`handle`]: #method.handle 126 /// [`turn`]: #method.turn 127 /// [Handle.struct]: struct.Handle.html 128 #[derive(Debug)] 129 pub struct Timer<T, N = SystemNow> { 130 /// Shared state 131 inner: Arc<Inner>, 132 133 /// Timer wheel 134 wheel: wheel::Wheel<Stack>, 135 136 /// Thread parker. The `Timer` park implementation delegates to this. 137 park: T, 138 139 /// Source of "now" instances 140 now: N, 141 } 142 143 /// Return value from the `turn` method on `Timer`. 144 /// 145 /// Currently this value doesn't actually provide any functionality, but it may 146 /// in the future give insight into what happened during `turn`. 147 #[derive(Debug)] 148 pub struct Turn(()); 149 150 /// Timer state shared between `Timer`, `Handle`, and `Registration`. 151 pub(crate) struct Inner { 152 /// The instant at which the timer started running. 153 start: Instant, 154 155 /// The last published timer `elapsed` value. 156 elapsed: AtomicU64, 157 158 /// Number of active timeouts 159 num: AtomicUsize, 160 161 /// Head of the "process" linked list. 162 process: AtomicStack, 163 164 /// Unparks the timer thread. 165 unpark: Box<dyn Unpark>, 166 } 167 168 /// Maximum number of timeouts the system can handle concurrently. 169 const MAX_TIMEOUTS: usize = usize::MAX >> 1; 170 171 // ===== impl Timer ===== 172 173 impl<T> Timer<T> 174 where 175 T: Park, 176 { 177 /// Create a new `Timer` instance that uses `park` to block the current 178 /// thread. 179 /// 180 /// Once the timer has been created, a handle can be obtained using 181 /// [`handle`]. The handle is used to create `Delay` instances. 182 /// 183 /// Use `default` when constructing a `Timer` using the default `park` 184 /// instance. 185 /// 186 /// [`handle`]: #method.handle new(park: T) -> Self187 pub fn new(park: T) -> Self { 188 Timer::new_with_now(park, SystemNow::new()) 189 } 190 } 191 192 impl<T, N> Timer<T, N> { 193 /// Returns a reference to the underlying `Park` instance. get_park(&self) -> &T194 pub fn get_park(&self) -> &T { 195 &self.park 196 } 197 198 /// Returns a mutable reference to the underlying `Park` instance. get_park_mut(&mut self) -> &mut T199 pub fn get_park_mut(&mut self) -> &mut T { 200 &mut self.park 201 } 202 } 203 204 impl<T, N> Timer<T, N> 205 where 206 T: Park, 207 N: Now, 208 { 209 /// Create a new `Timer` instance that uses `park` to block the current 210 /// thread and `now` to get the current `Instant`. 211 /// 212 /// Specifying the source of time is useful when testing. new_with_now(park: T, mut now: N) -> Self213 pub fn new_with_now(park: T, mut now: N) -> Self { 214 let unpark = Box::new(park.unpark()); 215 216 Timer { 217 inner: Arc::new(Inner::new(now.now(), unpark)), 218 wheel: wheel::Wheel::new(), 219 park, 220 now, 221 } 222 } 223 224 /// Returns a handle to the timer. 225 /// 226 /// The `Handle` is how `Delay` instances are created. The `Delay` instances 227 /// can either be created directly or the `Handle` instance can be passed to 228 /// `with_default`, setting the timer as the default timer for the execution 229 /// context. handle(&self) -> Handle230 pub fn handle(&self) -> Handle { 231 Handle::new(Arc::downgrade(&self.inner)) 232 } 233 234 /// Performs one iteration of the timer loop. 235 /// 236 /// This function must be called repeatedly in order for the `Timer` 237 /// instance to make progress. This is where the work happens. 238 /// 239 /// The `Timer` will use the `Park` instance that was specified in [`new`] 240 /// to block the current thread until the next `Delay` instance elapses. One 241 /// call to `turn` results in at most one call to `park.park()`. 242 /// 243 /// # Return 244 /// 245 /// On success, `Ok(Turn)` is returned, where `Turn` is a placeholder type 246 /// that currently does nothing but may, in the future, have functions add 247 /// to provide information about the call to `turn`. 248 /// 249 /// If the call to `park.park()` fails, then `Err` is returned with the 250 /// error. 251 /// 252 /// [`new`]: #method.new turn(&mut self, max_wait: Option<Duration>) -> Result<Turn, T::Error>253 pub fn turn(&mut self, max_wait: Option<Duration>) -> Result<Turn, T::Error> { 254 match max_wait { 255 Some(timeout) => self.park_timeout(timeout)?, 256 None => self.park()?, 257 } 258 259 Ok(Turn(())) 260 } 261 262 /// Converts an `Expiration` to an `Instant`. expiration_instant(&self, when: u64) -> Instant263 fn expiration_instant(&self, when: u64) -> Instant { 264 self.inner.start + Duration::from_millis(when) 265 } 266 267 /// Run timer related logic process(&mut self)268 fn process(&mut self) { 269 let now = ::ms(self.now.now() - self.inner.start, ::Round::Down); 270 let mut poll = wheel::Poll::new(now); 271 272 while let Some(entry) = self.wheel.poll(&mut poll, &mut ()) { 273 let when = entry.when_internal().expect("invalid internal entry state"); 274 275 // Fire the entry 276 entry.fire(when); 277 278 // Track that the entry has been fired 279 entry.set_when_internal(None); 280 } 281 282 // Update the elapsed cache 283 self.inner.elapsed.store(self.wheel.elapsed(), SeqCst); 284 } 285 286 /// Process the entry queue 287 /// 288 /// This handles adding and canceling timeouts. process_queue(&mut self)289 fn process_queue(&mut self) { 290 for entry in self.inner.process.take() { 291 match (entry.when_internal(), entry.load_state()) { 292 (None, None) => { 293 // Nothing to do 294 } 295 (Some(_), None) => { 296 // Remove the entry 297 self.clear_entry(&entry); 298 } 299 (None, Some(when)) => { 300 // Queue the entry 301 self.add_entry(entry, when); 302 } 303 (Some(_), Some(next)) => { 304 self.clear_entry(&entry); 305 self.add_entry(entry, next); 306 } 307 } 308 } 309 } 310 clear_entry(&mut self, entry: &Arc<Entry>)311 fn clear_entry(&mut self, entry: &Arc<Entry>) { 312 self.wheel.remove(entry, &mut ()); 313 entry.set_when_internal(None); 314 } 315 316 /// Fire the entry if it needs to, otherwise queue it to be processed later. 317 /// 318 /// Returns `None` if the entry was fired. add_entry(&mut self, entry: Arc<Entry>, when: u64)319 fn add_entry(&mut self, entry: Arc<Entry>, when: u64) { 320 use wheel::InsertError; 321 322 entry.set_when_internal(Some(when)); 323 324 match self.wheel.insert(when, entry, &mut ()) { 325 Ok(_) => {} 326 Err((entry, InsertError::Elapsed)) => { 327 // The entry's deadline has elapsed, so fire it and update the 328 // internal state accordingly. 329 entry.set_when_internal(None); 330 entry.fire(when); 331 } 332 Err((entry, InsertError::Invalid)) => { 333 // The entry's deadline is invalid, so error it and update the 334 // internal state accordingly. 335 entry.set_when_internal(None); 336 entry.error(); 337 } 338 } 339 } 340 } 341 342 impl Default for Timer<ParkThread, SystemNow> { default() -> Self343 fn default() -> Self { 344 Timer::new(ParkThread::new()) 345 } 346 } 347 348 impl<T, N> Park for Timer<T, N> 349 where 350 T: Park, 351 N: Now, 352 { 353 type Unpark = T::Unpark; 354 type Error = T::Error; 355 unpark(&self) -> Self::Unpark356 fn unpark(&self) -> Self::Unpark { 357 self.park.unpark() 358 } 359 park(&mut self) -> Result<(), Self::Error>360 fn park(&mut self) -> Result<(), Self::Error> { 361 self.process_queue(); 362 363 match self.wheel.poll_at() { 364 Some(when) => { 365 let now = self.now.now(); 366 let deadline = self.expiration_instant(when); 367 368 if deadline > now { 369 self.park.park_timeout(deadline - now)?; 370 } else { 371 self.park.park_timeout(Duration::from_secs(0))?; 372 } 373 } 374 None => { 375 self.park.park()?; 376 } 377 } 378 379 self.process(); 380 381 Ok(()) 382 } 383 park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error>384 fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { 385 self.process_queue(); 386 387 match self.wheel.poll_at() { 388 Some(when) => { 389 let now = self.now.now(); 390 let deadline = self.expiration_instant(when); 391 392 if deadline > now { 393 self.park.park_timeout(cmp::min(deadline - now, duration))?; 394 } else { 395 self.park.park_timeout(Duration::from_secs(0))?; 396 } 397 } 398 None => { 399 self.park.park_timeout(duration)?; 400 } 401 } 402 403 self.process(); 404 405 Ok(()) 406 } 407 } 408 409 impl<T, N> Drop for Timer<T, N> { drop(&mut self)410 fn drop(&mut self) { 411 use std::u64; 412 413 // Shutdown the stack of entries to process, preventing any new entries 414 // from being pushed. 415 self.inner.process.shutdown(); 416 417 // Clear the wheel, using u64::MAX allows us to drain everything 418 let mut poll = wheel::Poll::new(u64::MAX); 419 420 while let Some(entry) = self.wheel.poll(&mut poll, &mut ()) { 421 entry.error(); 422 } 423 } 424 } 425 426 // ===== impl Inner ===== 427 428 impl Inner { new(start: Instant, unpark: Box<dyn Unpark>) -> Inner429 fn new(start: Instant, unpark: Box<dyn Unpark>) -> Inner { 430 Inner { 431 num: AtomicUsize::new(0), 432 elapsed: AtomicU64::new(0), 433 process: AtomicStack::new(), 434 start, 435 unpark, 436 } 437 } 438 elapsed(&self) -> u64439 fn elapsed(&self) -> u64 { 440 self.elapsed.load(SeqCst) 441 } 442 443 /// Increment the number of active timeouts increment(&self) -> Result<(), Error>444 fn increment(&self) -> Result<(), Error> { 445 let mut curr = self.num.load(SeqCst); 446 447 loop { 448 if curr == MAX_TIMEOUTS { 449 return Err(Error::at_capacity()); 450 } 451 452 let actual = self.num.compare_and_swap(curr, curr + 1, SeqCst); 453 454 if curr == actual { 455 return Ok(()); 456 } 457 458 curr = actual; 459 } 460 } 461 462 /// Decrement the number of active timeouts decrement(&self)463 fn decrement(&self) { 464 let prev = self.num.fetch_sub(1, SeqCst); 465 debug_assert!(prev <= MAX_TIMEOUTS); 466 } 467 queue(&self, entry: &Arc<Entry>) -> Result<(), Error>468 fn queue(&self, entry: &Arc<Entry>) -> Result<(), Error> { 469 if self.process.push(entry)? { 470 // The timer is notified so that it can process the timeout 471 self.unpark.unpark(); 472 } 473 474 Ok(()) 475 } 476 normalize_deadline(&self, deadline: Instant) -> u64477 fn normalize_deadline(&self, deadline: Instant) -> u64 { 478 if deadline < self.start { 479 return 0; 480 } 481 482 ::ms(deadline - self.start, ::Round::Up) 483 } 484 } 485 486 impl fmt::Debug for Inner { fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result487 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { 488 fmt.debug_struct("Inner").finish() 489 } 490 } 491