1 //! Time driver
2 
3 mod atomic_stack;
4 use self::atomic_stack::AtomicStack;
5 
6 mod entry;
7 pub(super) use self::entry::Entry;
8 
9 mod handle;
10 pub(crate) use self::handle::Handle;
11 
12 mod registration;
13 pub(crate) use self::registration::Registration;
14 
15 mod stack;
16 use self::stack::Stack;
17 
18 use crate::loom::sync::atomic::{AtomicU64, AtomicUsize};
19 use crate::park::{Park, Unpark};
20 use crate::time::{wheel, Error};
21 use crate::time::{Clock, Duration, Instant};
22 
23 use std::sync::atomic::Ordering::{Acquire, Relaxed, Release, SeqCst};
24 
25 use std::sync::Arc;
26 use std::usize;
27 use std::{cmp, fmt};
28 
29 /// Time implementation that drives [`Delay`][delay], [`Interval`][interval], and [`Timeout`][timeout].
30 ///
31 /// A `Driver` instance tracks the state necessary for managing time and
32 /// notifying the [`Delay`][delay] instances once their deadlines are reached.
33 ///
34 /// It is expected that a single instance manages many individual [`Delay`][delay]
35 /// instances. The `Driver` implementation is thread-safe and, as such, is able
36 /// to handle callers from across threads.
37 ///
38 /// After creating the `Driver` instance, the caller must repeatedly call `park`
39 /// or `park_timeout`. The time driver will perform no work unless `park` or
40 /// `park_timeout` is called repeatedly.
41 ///
42 /// The driver has a resolution of one millisecond. Any unit of time that falls
43 /// between milliseconds are rounded up to the next millisecond.
44 ///
45 /// When an instance is dropped, any outstanding [`Delay`][delay] instance that has not
46 /// elapsed will be notified with an error. At this point, calling `poll` on the
47 /// [`Delay`][delay] instance will result in panic.
48 ///
49 /// # Implementation
50 ///
51 /// The time driver is based on the [paper by Varghese and Lauck][paper].
52 ///
53 /// A hashed timing wheel is a vector of slots, where each slot handles a time
54 /// slice. As time progresses, the timer walks over the slot for the current
55 /// instant, and processes each entry for that slot. When the timer reaches the
56 /// end of the wheel, it starts again at the beginning.
57 ///
58 /// The implementation maintains six wheels arranged in a set of levels. As the
59 /// levels go up, the slots of the associated wheel represent larger intervals
60 /// of time. At each level, the wheel has 64 slots. Each slot covers a range of
61 /// time equal to the wheel at the lower level. At level zero, each slot
62 /// represents one millisecond of time.
63 ///
64 /// The wheels are:
65 ///
66 /// * Level 0: 64 x 1 millisecond slots.
67 /// * Level 1: 64 x 64 millisecond slots.
68 /// * Level 2: 64 x ~4 second slots.
69 /// * Level 3: 64 x ~4 minute slots.
70 /// * Level 4: 64 x ~4 hour slots.
71 /// * Level 5: 64 x ~12 day slots.
72 ///
73 /// When the timer processes entries at level zero, it will notify all the
74 /// `Delay` instances as their deadlines have been reached. For all higher
75 /// levels, all entries will be redistributed across the wheel at the next level
76 /// down. Eventually, as time progresses, entries will [`Delay`][delay] instances will
77 /// either be canceled (dropped) or their associated entries will reach level
78 /// zero and be notified.
79 ///
80 /// [paper]: http://www.cs.columbia.edu/~nahum/w6998/papers/ton97-timing-wheels.pdf
81 /// [delay]: crate::time::Delay
82 /// [timeout]: crate::time::Timeout
83 /// [interval]: crate::time::Interval
84 #[derive(Debug)]
85 pub(crate) struct Driver<T: Park> {
86     /// Shared state
87     inner: Arc<Inner>,
88 
89     /// Timer wheel
90     wheel: wheel::Wheel<Stack>,
91 
92     /// Thread parker. The `Driver` park implementation delegates to this.
93     park: T,
94 
95     /// Source of "now" instances
96     clock: Clock,
97 
98     /// True if the driver is being shutdown
99     is_shutdown: bool,
100 }
101 
102 /// Timer state shared between `Driver`, `Handle`, and `Registration`.
103 pub(crate) struct Inner {
104     /// The instant at which the timer started running.
105     start: Instant,
106 
107     /// The last published timer `elapsed` value.
108     elapsed: AtomicU64,
109 
110     /// Number of active timeouts
111     num: AtomicUsize,
112 
113     /// Head of the "process" linked list.
114     process: AtomicStack,
115 
116     /// Unparks the timer thread.
117     unpark: Box<dyn Unpark>,
118 }
119 
120 /// Maximum number of timeouts the system can handle concurrently.
121 const MAX_TIMEOUTS: usize = usize::MAX >> 1;
122 
123 // ===== impl Driver =====
124 
125 impl<T> Driver<T>
126 where
127     T: Park,
128 {
129     /// Creates a new `Driver` instance that uses `park` to block the current
130     /// thread and `clock` to get the current `Instant`.
131     ///
132     /// Specifying the source of time is useful when testing.
new(park: T, clock: Clock) -> Driver<T>133     pub(crate) fn new(park: T, clock: Clock) -> Driver<T> {
134         let unpark = Box::new(park.unpark());
135 
136         Driver {
137             inner: Arc::new(Inner::new(clock.now(), unpark)),
138             wheel: wheel::Wheel::new(),
139             park,
140             clock,
141             is_shutdown: false,
142         }
143     }
144 
145     /// Returns a handle to the timer.
146     ///
147     /// The `Handle` is how `Delay` instances are created. The `Delay` instances
148     /// can either be created directly or the `Handle` instance can be passed to
149     /// `with_default`, setting the timer as the default timer for the execution
150     /// context.
handle(&self) -> Handle151     pub(crate) fn handle(&self) -> Handle {
152         Handle::new(Arc::downgrade(&self.inner))
153     }
154 
155     /// Converts an `Expiration` to an `Instant`.
expiration_instant(&self, when: u64) -> Instant156     fn expiration_instant(&self, when: u64) -> Instant {
157         self.inner.start + Duration::from_millis(when)
158     }
159 
160     /// Runs timer related logic
process(&mut self)161     fn process(&mut self) {
162         let now = crate::time::ms(
163             self.clock.now() - self.inner.start,
164             crate::time::Round::Down,
165         );
166         let mut poll = wheel::Poll::new(now);
167 
168         while let Some(entry) = self.wheel.poll(&mut poll, &mut ()) {
169             let when = entry.when_internal().expect("invalid internal entry state");
170 
171             // Fire the entry
172             entry.fire(when);
173 
174             // Track that the entry has been fired
175             entry.set_when_internal(None);
176         }
177 
178         // Update the elapsed cache
179         self.inner.elapsed.store(self.wheel.elapsed(), SeqCst);
180     }
181 
182     /// Processes the entry queue
183     ///
184     /// This handles adding and canceling timeouts.
process_queue(&mut self)185     fn process_queue(&mut self) {
186         for entry in self.inner.process.take() {
187             match (entry.when_internal(), entry.load_state()) {
188                 (None, None) => {
189                     // Nothing to do
190                 }
191                 (Some(_), None) => {
192                     // Remove the entry
193                     self.clear_entry(&entry);
194                 }
195                 (None, Some(when)) => {
196                     // Queue the entry
197                     self.add_entry(entry, when);
198                 }
199                 (Some(_), Some(next)) => {
200                     self.clear_entry(&entry);
201                     self.add_entry(entry, next);
202                 }
203             }
204         }
205     }
206 
clear_entry(&mut self, entry: &Arc<Entry>)207     fn clear_entry(&mut self, entry: &Arc<Entry>) {
208         self.wheel.remove(entry, &mut ());
209         entry.set_when_internal(None);
210     }
211 
212     /// Fires the entry if it needs to, otherwise queue it to be processed later.
213     ///
214     /// Returns `None` if the entry was fired.
add_entry(&mut self, entry: Arc<Entry>, when: u64)215     fn add_entry(&mut self, entry: Arc<Entry>, when: u64) {
216         use crate::time::wheel::InsertError;
217 
218         entry.set_when_internal(Some(when));
219 
220         match self.wheel.insert(when, entry, &mut ()) {
221             Ok(_) => {}
222             Err((entry, InsertError::Elapsed)) => {
223                 // The entry's deadline has elapsed, so fire it and update the
224                 // internal state accordingly.
225                 entry.set_when_internal(None);
226                 entry.fire(when);
227             }
228             Err((entry, InsertError::Invalid)) => {
229                 // The entry's deadline is invalid, so error it and update the
230                 // internal state accordingly.
231                 entry.set_when_internal(None);
232                 entry.error(Error::invalid());
233             }
234         }
235     }
236 }
237 
238 impl<T> Park for Driver<T>
239 where
240     T: Park,
241 {
242     type Unpark = T::Unpark;
243     type Error = T::Error;
244 
unpark(&self) -> Self::Unpark245     fn unpark(&self) -> Self::Unpark {
246         self.park.unpark()
247     }
248 
park(&mut self) -> Result<(), Self::Error>249     fn park(&mut self) -> Result<(), Self::Error> {
250         self.process_queue();
251 
252         match self.wheel.poll_at() {
253             Some(when) => {
254                 let now = self.clock.now();
255                 let deadline = self.expiration_instant(when);
256 
257                 if deadline > now {
258                     let dur = deadline - now;
259 
260                     if self.clock.is_paused() {
261                         self.park.park_timeout(Duration::from_secs(0))?;
262                         self.clock.advance(dur);
263                     } else {
264                         self.park.park_timeout(dur)?;
265                     }
266                 } else {
267                     self.park.park_timeout(Duration::from_secs(0))?;
268                 }
269             }
270             None => {
271                 self.park.park()?;
272             }
273         }
274 
275         self.process();
276 
277         Ok(())
278     }
279 
park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error>280     fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
281         self.process_queue();
282 
283         match self.wheel.poll_at() {
284             Some(when) => {
285                 let now = self.clock.now();
286                 let deadline = self.expiration_instant(when);
287 
288                 if deadline > now {
289                     let duration = cmp::min(deadline - now, duration);
290 
291                     if self.clock.is_paused() {
292                         self.park.park_timeout(Duration::from_secs(0))?;
293                         self.clock.advance(duration);
294                     } else {
295                         self.park.park_timeout(duration)?;
296                     }
297                 } else {
298                     self.park.park_timeout(Duration::from_secs(0))?;
299                 }
300             }
301             None => {
302                 self.park.park_timeout(duration)?;
303             }
304         }
305 
306         self.process();
307 
308         Ok(())
309     }
310 
shutdown(&mut self)311     fn shutdown(&mut self) {
312         if self.is_shutdown {
313             return;
314         }
315 
316         use std::u64;
317 
318         // Shutdown the stack of entries to process, preventing any new entries
319         // from being pushed.
320         self.inner.process.shutdown();
321 
322         // Clear the wheel, using u64::MAX allows us to drain everything
323         let mut poll = wheel::Poll::new(u64::MAX);
324 
325         while let Some(entry) = self.wheel.poll(&mut poll, &mut ()) {
326             entry.error(Error::shutdown());
327         }
328 
329         self.park.shutdown();
330 
331         self.is_shutdown = true;
332     }
333 }
334 
335 impl<T> Drop for Driver<T>
336 where
337     T: Park,
338 {
drop(&mut self)339     fn drop(&mut self) {
340         self.shutdown();
341     }
342 }
343 
344 // ===== impl Inner =====
345 
346 impl Inner {
new(start: Instant, unpark: Box<dyn Unpark>) -> Inner347     fn new(start: Instant, unpark: Box<dyn Unpark>) -> Inner {
348         Inner {
349             num: AtomicUsize::new(0),
350             elapsed: AtomicU64::new(0),
351             process: AtomicStack::new(),
352             start,
353             unpark,
354         }
355     }
356 
elapsed(&self) -> u64357     fn elapsed(&self) -> u64 {
358         self.elapsed.load(SeqCst)
359     }
360 
361     #[cfg(all(test, loom))]
num(&self, ordering: std::sync::atomic::Ordering) -> usize362     fn num(&self, ordering: std::sync::atomic::Ordering) -> usize {
363         self.num.load(ordering)
364     }
365 
366     /// Increments the number of active timeouts
increment(&self) -> Result<(), Error>367     fn increment(&self) -> Result<(), Error> {
368         let mut curr = self.num.load(Relaxed);
369         loop {
370             if curr == MAX_TIMEOUTS {
371                 return Err(Error::at_capacity());
372             }
373 
374             match self
375                 .num
376                 .compare_exchange_weak(curr, curr + 1, Release, Relaxed)
377             {
378                 Ok(_) => return Ok(()),
379                 Err(next) => curr = next,
380             }
381         }
382     }
383 
384     /// Decrements the number of active timeouts
decrement(&self)385     fn decrement(&self) {
386         let prev = self.num.fetch_sub(1, Acquire);
387         debug_assert!(prev <= MAX_TIMEOUTS);
388     }
389 
queue(&self, entry: &Arc<Entry>) -> Result<(), Error>390     fn queue(&self, entry: &Arc<Entry>) -> Result<(), Error> {
391         if self.process.push(entry)? {
392             // The timer is notified so that it can process the timeout
393             self.unpark.unpark();
394         }
395 
396         Ok(())
397     }
398 
normalize_deadline(&self, deadline: Instant) -> u64399     fn normalize_deadline(&self, deadline: Instant) -> u64 {
400         if deadline < self.start {
401             return 0;
402         }
403 
404         crate::time::ms(deadline - self.start, crate::time::Round::Up)
405     }
406 }
407 
408 impl fmt::Debug for Inner {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result409     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
410         fmt.debug_struct("Inner").finish()
411     }
412 }
413 
414 #[cfg(all(test, loom))]
415 mod tests;
416